cengal.unittest.behavior_stabilizer.versions.v_0.behavior_stabilizer
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['CallState', 'ResultAlreadyRegisteredError', 'ExceptionAlreadyRegisteredError', 'CallStackIsNotEqualError', 20 'ExpectedTestCaseStateIsNotLoadedError', 'TEST_CASE_STATE', 'get_test_case_state', 'TestCaseState'] 21 22 23import datetime 24import inspect 25import json 26import logging 27import os 28import traceback 29import unittest 30import pickle 31from contextlib import contextmanager 32from types import FrameType 33from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, Union, Hashable, cast, ContextManager, Callable, Awaitable, OrderedDict 34from cengal.code_flow_control.smart_values import ValueHolder 35from cengal.introspection.inspect import CodeParamsWithValues, intro_func_params_with_values, find_current_entity, find_entity, is_async, \ 36 intro_frame_params_with_values, entity_class, get_exception, func_params_with_values 37from cengal.file_system.path_manager import relative_to_src, RelativePath 38from collections import namedtuple, OrderedDict 39from functools import wraps, update_wrapper 40 41 42""" 43Module Docstring 44Docstrings: http://www.python.org/dev/peps/pep-0257/ 45""" 46 47__author__ = "ButenkoMS <gtalk@butenkoms.space>" 48__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 49__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 50__license__ = "Apache License, Version 2.0" 51__version__ = "4.4.1" 52__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 53__email__ = "gtalk@butenkoms.space" 54# __status__ = "Prototype" 55__status__ = "Development" 56# __status__ = "Production" 57 58 59VarName = str 60VarValue = Any 61MeasurementId = int 62StageId = Hashable 63 64 65def param(param_name: str) -> str: 66 """ 67 Returns a formatted full name of the parameter: 'ClassName__m__MethodName__p__ParameterName' 68 69 Returns: 70 str: furmatted full name of the parameter 71 """ 72 frame = cast(FrameType, inspect.currentframe()).f_back 73 real_func = cast(FrameType, inspect.currentframe()).f_back.f_code 74 func_name = real_func.co_name 75 args, _, _, values = inspect.getargvalues(frame) 76 parent_obj = None 77 for par_name, par_val in [(i, values[i]) for i in args]: 78 if 'self' == par_name: 79 parent_obj = par_val 80 break # TODO: check if it is needed 81 82 class_name = str() 83 if parent_obj is not None: 84 class_name = type(parent_obj).__name__ 85 86 param_full_name = f'{class_name}__m__{func_name}__p__{param_name}' 87 return param_full_name 88 89 90def param2(param_name: str) -> str: 91 raise NotImplementedError 92 93 94FAKE_RESULTS: 'FakeResults' = cast('FakeResults', None) 95 96 97def get_fake_results() -> 'FakeResults': 98 global FAKE_RESULTS 99 if FAKE_RESULTS is None: 100 FAKE_RESULTS = FakeResults() 101 return FAKE_RESULTS 102 103 104class ParameterIsNotInNotInTheListOfExpectedResultsError(Exception): 105 """ 106 Will be raised by FakeResults.check_result() when `param_name not in self.expected_results` 107 """ 108 pass 109 110 111class ResultNumberIsNotValidError(Exception): 112 """ 113 Will be raised by FakeResults.check_result() when `result_number not in self.expected_results[param_name]` 114 """ 115 pass 116 117 118class FakeResults: 119 """ 120 Holds a both an actual and a refference (an expected) results and compares them 121 122 Saves a taken results as a refference 'expected_results' to a json file on FakeResults.try_to_save_expected_results() if 123 this file was not already loaded by the FakeResults.try_to_load_expected_results() method. 124 125 So if FakeResults.try_to_load_expected_results() will be called before all tests and FakeResults.try_to_save_expected_results() 126 will be called after all tests: 127 1) At a first iteration it will take all generated results and will save them to the json file as a refference (as an expected_results). 128 This step must be made on a developers' machine. Resulting json file must be added to the repository. 129 2) At all further calls it will find json file exists and will load it's content as a refference (as an expected_results). 130 131 Can be used as a 'with' context manager which will try to load json file at the start and will try to save a refference to the json file 132 at the end (if json file wasn't loaded at the start). 133 134 Raises: 135 NotImplementedError: _description_ 136 NotImplementedError: _description_ 137 NotImplementedError: _description_ 138 NotImplementedError: _description_ 139 NotImplementedError: _description_ 140 NotImplementedError: _description_ 141 NotImplementedError: _description_ 142 """ 143 144 def __init__(self, content_full_file_name: str) -> None: 145 self.was_loaded = False 146 self.results: Dict[str, Dict[int, str]] = dict() 147 self.expected_results: Dict[str, Dict[int, str]] = dict() 148 self.content_full_file_name = content_full_file_name 149 self.old_global_fake_result: 'FakeResults' = cast('FakeResults', None) 150 151 def add_result(self, param_name: str, result: Any) -> None: 152 """ 153 Will add a result to a list of the results for a requested parameter 154 155 For example we have a function: 156 def add_int(a: int, b: int) -> int: 157 return a + b 158 159 Then we can change it in the next way: 160 def add_int(a: int, b: int) -> int: 161 result = a + b 162 get_fake_results().add_result(param('a'), a) 163 get_fake_results().add_result(param('b'), b) 164 # get_fake_results().add_result(param('->'), result) # TODO: check if it is needed 165 return result 166 167 As result we will save all 'a' and 'b' parameters of an each 'add_int()' call to an ordered sequence and will 168 be able to compare them with a refference values by the calling FakeResults.check_result(), FakeResults.check_results_range() or FakeResults.check_all_results() 169 170 Parameters: 171 param_name (str): your parameter name 172 result (Any): current parameters' content 173 """ 174 result = str(result) 175 if param_name not in self.results: 176 self.results[param_name] = dict() 177 178 index = 0 179 if self.results[param_name]: 180 index = max(self.results[param_name].keys()) + 1 181 182 self.results[param_name][index] = result 183 184 def add_expected_result(self, param_name: str, expected_result: Any) -> None: 185 """ 186 Will add single expected result for the parameter 187 188 Parameters: 189 param_name (str): your parameter name 190 expected_result (Any): expected parameters' content 191 """ 192 expected_result = str(expected_result) 193 if param_name not in self.expected_results: 194 self.expected_results[param_name] = dict() 195 196 index = 0 197 if self.expected_results[param_name]: 198 index = max(self.expected_results[param_name].keys()) + 1 199 200 self.expected_results[param_name][index] = expected_result 201 202 def register_param_expected_results(self, param_name: str, expected_results: List[Any]) -> None: 203 """ 204 Will register a list of expected results for the parameter 205 Will add a list of expected results for the parameter 206 207 Parameters: 208 param_name (str): your parameter name 209 expected_results (List[Any]): list of expected parameters' content 210 """ 211 for expected_result in expected_results: 212 self.add_expected_result(param_name, expected_result) 213 214 def register_expected_results(self, expected_results: Dict[str, Dict[int, str]]) -> None: 215 """ 216 Will register a dict of expected results for all requested parameters 217 218 Parameters: 219 expected_results (Dict[str, Dict[int, str]]): key - parameter name; value.key - index of the expected parameter's 220 content; value.value - expected parameter's content 221 """ 222 self.expected_results = expected_results 223 224 def check_result(self, param_name: str, result_number: Optional[int] = None) -> bool: 225 """ 226 Will compare requested parameter's result with an expected result. if result_number is not None - will compare 227 exact items in a sequence instead of comparing all expected results. 228 Will return True to an each call until json file will be loaded with a 'FakeResults.try_to_load_expected_results()' call. 229 230 Returns: 231 bool: True if result is equal to an expected result 232 """ 233 if not self.was_loaded: 234 return True 235 236 if param_name not in self.expected_results: 237 raise ParameterIsNotInNotInTheListOfExpectedResultsError(param_name) 238 239 if param_name not in self.results: 240 print(f'ERROR: PARAM NOT IN RESULTS: {param_name}') 241 return False 242 243 if result_number is not None: 244 if result_number not in self.expected_results[param_name]: 245 raise ResultNumberIsNotValidError((param_name, result_number)) 246 247 if result_number not in self.results[param_name]: 248 print(f'ERROR: PARAM\'S RESULT NUMBER {result_number} NOT IN RESULTS: {param_name}') 249 return False 250 251 if result_number is None: 252 returned_val: Any = self.results[param_name] 253 expected_val: Any = self.expected_results[param_name] 254 else: 255 returned_val: Any = self.results[param_name][result_number] 256 expected_val: Any = self.expected_results[param_name][result_number] 257 258 if returned_val == expected_val: 259 return True 260 else: 261 print(f'ERROR: PARAM IS NOT EQUAL TO EXPECTED: {param_name}\n\tRETURNED VAL: {returned_val}\n\tEXPECTED VAL: {expected_val}\n~~~~~~~~~~~~~~~~~~~~~~~~~') 262 # return False # TODO: check if it is needed 263 264 def check_results_range(self, param_name: str, result_number_from: int, result_number_up_to: int) -> bool: 265 """ 266 Will compare a range of requested parameters' results with an expected results. 267 Will return True to an each call until json file will be loaded with a 'FakeResults.try_to_load_expected_results()' call. 268 Behavior of the 'results_number_from' and 'results_number_up_to' parameters is the same as in the 'range()' call. 269 270 Parameters: 271 param_name (str): your parameter name 272 result_number_from (int): start index of the results range 273 result_number_up_to (int): end index of the results range 274 275 Returns: 276 bool: True if all results in the range are equal to an expected results 277 """ 278 for result_number in range(result_number_from, result_number_up_to): 279 if not self.check_result(param_name, result_number=result_number): 280 return False 281 282 return True 283 284 def check_current_results(self) -> bool: 285 """ 286 Will compare all requested parameters' results with an expected results. 287 Will return True to an each call until json file will be loaded with a 'FakeResults.try_to_load_expected_results()' call. 288 289 Returns: 290 bool: True if all results are equal to an expected results 291 """ 292 for param_name, _ in self.results.items(): 293 if not self.check_result(param_name): 294 return False 295 296 return True 297 298 def check_all_results(self) -> bool: 299 """ 300 Will compare all requested parameters' results with an expected results. 301 Will return True to an each call until json file will be loaded with a 'FakeResults.try_to_load_expected_results()' call. 302 303 Returns: 304 bool: True if all results are equal to an expected results 305 """ 306 for param_name, _ in self.expected_results.items(): 307 if not self.check_result(param_name): 308 return False 309 310 return True 311 312 def try_to_load_expected_results(self, full_file_name: str) -> None: 313 """ 314 Will try to load a json file with a refference results (an expected_results) 315 316 Parameters: 317 full_file_name (str): full file name of the json file with a refference results (an expected_results) 318 """ 319 if os.path.exists(full_file_name) and os.path.isfile(full_file_name): 320 content = None 321 with open(full_file_name, 'rb') as file: 322 content = file.read() 323 324 if content: 325 content_str = content.decode('utf-8') 326 expected_results = json.loads(content_str) 327 for param, values in expected_results.items(): 328 if param not in self.expected_results: 329 self.expected_results[param] = dict() 330 331 for index_str, value in values.items(): 332 index = int(index_str) 333 self.expected_results[param][index] = value 334 335 self.was_loaded = True 336 337 def try_to_save_expected_results(self, full_file_name: str) -> None: 338 """ 339 Will try to save a refference results (an expected_results) to a json file. Will not save them if 340 json file was already successfully loaded with a 'FakeResults.try_to_load_expected_results()' call. 341 342 Parameters: 343 full_file_name (str): full file name of the json file with a refference results (an expected_results) 344 """ 345 if self.was_loaded: 346 return 347 348 if not (os.path.exists(full_file_name) and os.path.isfile(full_file_name)): 349 content = json.dumps(self.results) 350 content = json.dumps(json.loads(content), indent=4) 351 content_bytes = content.encode('utf-8') 352 with open(full_file_name, 'wb+') as file: # TODO: check if '+' in 'wb+' is needed 353 file.write(content_bytes) 354 355 def register(self): 356 """ 357 Will register current instance to a global 'FAKE_RESULTS' variable. Will save a previous value 358 """ 359 self.try_to_load_expected_results(self.content_full_file_name) 360 361 global FAKE_RESULTS 362 self.old_global_fake_result = FAKE_RESULTS 363 FAKE_RESULTS = self 364 365 def __enter__(self): 366 self.register() 367 return self 368 369 def unregister(self, should_be_saved: bool): 370 """ 371 Will restore a previous value of the global 'FAKE_RESULTS' variable 372 """ 373 global FAKE_RESULTS 374 FAKE_RESULTS = self.old_global_fake_result 375 376 if should_be_saved: 377 self.try_to_save_expected_results(self.content_full_file_name) 378 379 def __exit__(self, exc_type, exc_val, exc_tb): 380 self.unregister((exc_type is None) and (exc_val is None) and (exc_tb is None)) 381 return False 382 383 384CallState = namedtuple("CallState", "entity params_with_values args kwargs result_holder exception_holder") 385 386 387class ResultAlreadyRegisteredError(Exception): 388 pass 389 390 391class ExceptionAlreadyRegisteredError(Exception): 392 pass 393 394 395class CallStackIsNotEqualError(Exception): 396 pass 397 398 399class ExpectedTestCaseStateIsNotLoadedError(Exception): 400 pass 401 402 403TEST_CASE_STATE: 'TestCaseState' = cast('TestCaseState', None) 404 405 406def get_test_case_state() -> 'TestCaseState': 407 global TEST_CASE_STATE 408 if TEST_CASE_STATE is None: 409 TEST_CASE_STATE = TestCaseState('Default') 410 411 return TEST_CASE_STATE 412 413 414class TestCaseState: 415 def __init__(self, name: Optional[str] = None, raise_exceptions: bool = True, register: bool = True, depth: Optional[int] = 1): 416 if name is None: 417 parent_entity = find_current_entity(depth + 1) 418 parent_class = entity_class(parent_entity) 419 parent_class_name: str = parent_class.__name__ 420 421 self.name: str = name or parent_class_name 422 self.raise_exceptions: bool = raise_exceptions 423 self._current_test_id: Hashable = None 424 self.call_stack_per_test: OrderedDict[Hashable, List[CallState]] = OrderedDict({ 425 None: list() 426 }) 427 self.expected_call_stack_per_test: OrderedDict[Hashable, List[CallState]] = OrderedDict({ 428 None: list() 429 }) 430 self.loaded: bool = False 431 self.content_file_name: str = f'{self.name}__test_case_state.pickle' 432 self.readable_content_file_name: str = f'{self.name}__test_case_state.md' 433 self.content_dir_rel: RelativePath = relative_to_src(depth + 1) 434 self.content_full_file_name: str = self.content_dir_rel(self.content_file_name) 435 self.readable_content_full_file_name: str = self.content_dir_rel(self.readable_content_file_name) 436 self.old_global_fake_result: 'TestCaseState' = cast('TestCaseState', None) 437 if register: 438 self.register() 439 440 @property 441 def current_test_id(self) -> Hashable: 442 return self._current_test_id 443 444 @current_test_id.setter 445 def current_test_id(self, value: Hashable) -> None: 446 self._current_test_id = value 447 if value not in self.call_stack_per_test: 448 self.call_stack_per_test[value] = list() 449 450 @property 451 def call_stack(self) -> List[CallState]: 452 return self.call_stack_per_test[self.current_test_id] 453 454 @property 455 def expected_call_stack(self) -> List[CallState]: 456 try: 457 return self.expected_call_stack_per_test[self.current_test_id] 458 except KeyError: 459 return list() 460 461 def is_loaded(self) -> bool: 462 if not self.loaded: 463 raise ExpectedTestCaseStateIsNotLoadedError(f'Expected test case state is not loaded: {self.name}') 464 465 def check_current_state_item(self): 466 self.is_loaded() 467 call_stack_item: CallState = self.call_stack[-1] 468 expected_call_stack_item: CallState = self.expected_call_stack[len(self.call_stack) - 1] 469 if call_stack_item != expected_call_stack_item: 470 raise CallStackIsNotEqualError(f'Expected call stack item: {expected_call_stack_item}\nCurrent call stack item: {call_stack_item}') 471 472 def check_state_item(self, item_index: int): 473 self.is_loaded() 474 call_stack_item: CallState = self.call_stack[item_index] 475 expected_call_stack_item: CallState = self.expected_call_stack[item_index] 476 if call_stack_item != expected_call_stack_item: 477 raise CallStackIsNotEqualError(f'Index: {item_index}\nExpected call stack item: {expected_call_stack_item}\nActual call stack item: {call_stack_item}') 478 479 def check_state_range(self, item_start_index: int, item_end_index: int): 480 self.is_loaded() 481 call_stack_part: List[CallState] = self.call_stack[item_start_index: item_end_index] 482 expected_call_stack_part: List[CallState] = self.expected_call_stack[item_start_index: item_end_index] 483 if call_stack_part != expected_call_stack_part: 484 raise CallStackIsNotEqualError(f'Index: [{item_start_index}:{item_end_index}]\nExpected call stack part: {expected_call_stack_part}\nActual call stack part: {call_stack_part}') 485 486 def check_current_state(self): 487 self.is_loaded() 488 expected_call_stack_part: List[CallState] = self.expected_call_stack[:len(self.call_stack)] 489 if self.call_stack != expected_call_stack_part: 490 raise CallStackIsNotEqualError(f'Expected call stack: {expected_call_stack_part}\nActual call stack: {self.call_stack}') 491 492 def check_whole_state(self): 493 self.is_loaded() 494 if self.call_stack != self.expected_call_stack: 495 raise CallStackIsNotEqualError(f'Expected call stack: {self.expected_call_stack}\nActual call stack: {self.call_stack}') 496 497 def check_all_tests_state(self): 498 self.is_loaded() 499 if self.call_stack_per_test != self.expected_call_stack_per_test: 500 raise CallStackIsNotEqualError(f'Expected call stacks: {self.expected_call_stack_per_test}\nActual call stacks: {self.call_stack_per_test}') 501 502 def register_intro(self, raise_exceptions: Optional[bool] = None) -> ContextManager[ValueHolder]: 503 class RIContextManager: 504 def __init__(self, testcasestate: 'TestCaseState') -> None: 505 self.testcasestate: 'TestCaseState' = testcasestate 506 self.current_entity: Callable = None 507 self.code_params_with_values: CodeParamsWithValues = None 508 self.result_holder: ValueHolder = ValueHolder() 509 self.exception_holder: ValueHolder = ValueHolder() 510 self.args: Tuple = tuple() 511 self.kwargs: Dict = dict() 512 513 def __enter__(self) -> ValueHolder: 514 self.current_entity: Callable = find_current_entity(2) 515 self.code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2) 516 self.args = self.code_params_with_values.positional + self.code_params_with_values.positional_only 517 self.args = tuple((arg[1] for arg in self.args)) 518 self.kwargs = dict(self.code_params_with_values.keyword_only) 519 return self.result_holder 520 521 def __exit__(self, exc_type, exc_val, exc_tb) -> None: 522 result = None 523 if exc_type is not None: 524 self.exception_holder.value = exc_val.with_traceback(exc_tb) 525 if raise_exceptions or ((raise_exceptions is None) and self.testcasestate.raise_exceptions): 526 result = True 527 528 current_call_state: CallState = CallState(self.current_entity, self.code_params_with_values, self.args, self.kwargs, self.result_holder, self.exception_holder) 529 self.testcasestate.call_stack.append(current_call_state) 530 self.testcasestate.check_current_state_item() 531 return result 532 533 return RIContextManager(self) 534 535 # def context_manager(self: 'TestCaseState'): 536 # current_entity: Callable = find_current_entity(2) 537 # code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2) 538 # result_holder: ValueHolder = ValueHolder() 539 # exception_holder: ValueHolder = ValueHolder() 540 # try: 541 # yield result_holder 542 # except: 543 # exception_holder.value = get_exception() 544 # if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 545 # raise 546 # finally: 547 # current_call_state: CallState = CallState(current_entity, code_params_with_values, result_holder, exception_holder) 548 # self.call_stack.append(current_call_state) 549 # self.check_current_state_item() 550 551 # return context_manager(self) 552 553 # @contextmanager 554 # def register_intro(self, raise_exceptions: Optional[bool] = None) -> ContextManager[ValueHolder]: 555 # current_entity: Callable = find_current_entity(2) 556 # code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2) 557 # result_holder: ValueHolder = ValueHolder() 558 # exception_holder: ValueHolder = ValueHolder() 559 # try: 560 # yield result_holder 561 # except: 562 # exception_holder.value = get_exception() 563 # if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 564 # raise 565 # finally: 566 # current_call_state: CallState = CallState(current_entity, code_params_with_values, result_holder, exception_holder) 567 # self.call_stack.append(current_call_state) 568 # self.check_current_state_item() 569 570 ri = register_intro 571 572 def register_outro(self, func: Callable, raise_exceptions: Optional[bool] = None) -> Callable: 573 original_func: Callable = func 574 if hasattr(original_func, 'cr_frame'): 575 original_func = find_entity(original_func.cr_frame) 576 577 if is_async(func): 578 if inspect.isawaitable(func): 579 async def awaitable_wrapper() -> Any: 580 current_entity: Awaitable = func 581 if hasattr(current_entity, 'cr_frame'): 582 # code_params_with_values: CodeParamsWithValues = intro_frame_params_with_values(current_entity.cr_frame) 583 code_params_with_values, result_args, result_kwargs = func_params_with_values(find_entity(original_func.cr_frame), tuple(), dict()) 584 code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values) 585 else: 586 code_params_with_values: CodeParamsWithValues = CodeParamsWithValues() 587 result_args = tuple() 588 result_kwargs = dict() 589 590 result_holder: ValueHolder = ValueHolder() 591 exception_holder: ValueHolder = ValueHolder() 592 try: 593 result = await func 594 result_holder.value = result 595 return result 596 except: 597 exception_holder.value = get_exception() 598 if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 599 raise 600 finally: 601 current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder) 602 self.call_stack.append(current_call_state) 603 self.check_current_state_item() 604 605 wrapper = awaitable_wrapper() 606 else: 607 async def async_wrapper(*args, **kwargs) -> Any: 608 current_entity: Callable = func 609 code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs) 610 code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values) 611 result_holder: ValueHolder = ValueHolder() 612 exception_holder: ValueHolder = ValueHolder() 613 try: 614 result = await func(*args, **kwargs) 615 result_holder.value = result 616 return result 617 except: 618 exception_holder.value = get_exception() 619 if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 620 raise 621 finally: 622 current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder) 623 self.call_stack.append(current_call_state) 624 self.check_current_state_item() 625 626 wrapper = async_wrapper 627 else: 628 def sync_wrapper(*args, **kwargs) -> Any: 629 current_entity: Callable = func 630 code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs) 631 code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values) 632 result_holder: ValueHolder = ValueHolder() 633 exception_holder: ValueHolder = ValueHolder() 634 try: 635 result = func(*args, **kwargs) 636 result_holder.value = result 637 return result 638 except: 639 exception_holder.value = get_exception() 640 if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 641 raise 642 finally: 643 current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder) 644 self.call_stack.append(current_call_state) 645 self.check_current_state_item() 646 647 wrapper = sync_wrapper 648 649 original_func_sign: inspect.Signature = inspect.signature(original_func) 650 update_wrapper(wrapper, original_func) 651 wrapper.__signature__ = original_func_sign.replace(parameters=tuple(original_func_sign.parameters.values()), return_annotation=original_func_sign.return_annotation) 652 return wrapper 653 654 ro = register_outro 655 656 def register_last_result(self, result: Any) -> None: 657 last_call_state: CallState = self.call_stack[-1] 658 result_holder: ValueHolder = last_call_state.result_holder 659 if result_holder: 660 raise ResultAlreadyRegisteredError(f'For last call state: {last_call_state}') 661 662 result_holder.value = result 663 664 rls = register_last_result 665 666 def register_last_exception(self, result: Any) -> None: 667 last_call_state: CallState = self.call_stack[-1] 668 exception_holder: ValueHolder = last_call_state.result_holder 669 if exception_holder: 670 raise ExceptionAlreadyRegisteredError(f'For last call state: {last_call_state}') 671 672 exception_holder.value = result 673 674 rle = register_last_exception 675 676 def try_to_load_expected_call_stack(self) -> None: 677 """ 678 Will try to load a pickle file with a refference results (an expected_results) 679 """ 680 if os.path.exists(self.content_full_file_name) and os.path.isfile(self.content_full_file_name): 681 with open(self.content_full_file_name, 'rb') as file: 682 self.expected_call_stack_per_test = pickle.load(file) 683 self.loaded = True 684 685 def try_to_save_expected_call_stack(self) -> None: 686 """ 687 Will try to save a refference results (an expected_results) to a pickle file. Will not save them if 688 pickle file was already successfully loaded with a 'FakeResults.try_to_load_expected_call_stack()' call. 689 """ 690 if self.loaded: 691 return 692 693 if not (os.path.exists(self.content_full_file_name) and os.path.isfile(self.content_full_file_name)): 694 with open(self.content_full_file_name, 'wb') as file: 695 pickle.dump(self.call_stack_per_test, file) 696 697 if not (os.path.exists(self.readable_content_full_file_name) and os.path.isfile(self.readable_content_full_file_name)): 698 with open(self.readable_content_full_file_name, 'wb') as file: 699 file.write(self.prepare_readable_content().encode('utf-8')) 700 701 def prepare_readable_content(self) -> str: 702 content: str = str() 703 content += f'# Test Case: {self.name}\n' 704 for test_id, call_stack in self.call_stack_per_test.items(): 705 content += f'\n\n## Test ID: {test_id}\n' 706 for index, call_state in enumerate(call_stack): 707 content += f'\n\t{index}: {call_state.entity.__name__}\n' 708 content += f'\t\t{call_state.params_with_values}\n' 709 content += f'\t\targs: {call_state.args}\n' 710 content += f'\t\tkwargs: {call_state.kwargs}\n' 711 if call_state.result_holder: 712 content += f'\t\tresult: {call_state.result_holder.value}\n' 713 if call_state.exception_holder: 714 content += f'\t\texception: {call_state.exception_holder.value}\n' 715 716 return content 717 718 def register(self): 719 """ 720 Will register current instance to a global 'TEST_CASE_STATE' variable. Will save a previous value 721 """ 722 self.try_to_load_expected_call_stack() 723 724 global TEST_CASE_STATE 725 self.old_global_fake_result = TEST_CASE_STATE 726 TEST_CASE_STATE = self 727 728 def __enter__(self): 729 self.register() 730 return self 731 732 def unregister(self, should_be_saved: bool = True): 733 """ 734 Will restore a previous value of the global 'TEST_CASE_STATE' variable 735 """ 736 global TEST_CASE_STATE 737 TEST_CASE_STATE = self.old_global_fake_result 738 739 if should_be_saved: 740 self.try_to_save_expected_call_stack() 741 742 def __exit__(self, exc_type, exc_val, exc_tb): 743 self.unregister((exc_type is None) and (exc_val is None) and (exc_tb is None)) 744 return False 745 746 747 748class StateBase: 749 def add(self, var_name: VarName, var_value: VarValue, stage_id: StageId = None): 750 raise NotImplementedError 751 752 def add_set(self, vars: Dict[VarName, VarValue]): 753 raise NotImplementedError 754 755 def check_var(self, var_name: VarName): 756 raise NotImplementedError 757 758 def check_var_measurement(self, var_name: VarName, measurement_id_from: MeasurementId, measuremente_id_up_to: MeasurementId): 759 raise NotImplementedError 760 761 def check_var_current_state(self, var_name: VarName): 762 raise NotImplementedError 763 764 def check_all(self): 765 raise NotImplementedError 766 767 def check_all_current_state(self): 768 raise NotImplementedError
CallState(entity, params_with_values, args, kwargs, result_holder, exception_holder)
Create new instance of CallState(entity, params_with_values, args, kwargs, result_holder, exception_holder)
Inherited Members
- builtins.tuple
- index
- count
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
415class TestCaseState: 416 def __init__(self, name: Optional[str] = None, raise_exceptions: bool = True, register: bool = True, depth: Optional[int] = 1): 417 if name is None: 418 parent_entity = find_current_entity(depth + 1) 419 parent_class = entity_class(parent_entity) 420 parent_class_name: str = parent_class.__name__ 421 422 self.name: str = name or parent_class_name 423 self.raise_exceptions: bool = raise_exceptions 424 self._current_test_id: Hashable = None 425 self.call_stack_per_test: OrderedDict[Hashable, List[CallState]] = OrderedDict({ 426 None: list() 427 }) 428 self.expected_call_stack_per_test: OrderedDict[Hashable, List[CallState]] = OrderedDict({ 429 None: list() 430 }) 431 self.loaded: bool = False 432 self.content_file_name: str = f'{self.name}__test_case_state.pickle' 433 self.readable_content_file_name: str = f'{self.name}__test_case_state.md' 434 self.content_dir_rel: RelativePath = relative_to_src(depth + 1) 435 self.content_full_file_name: str = self.content_dir_rel(self.content_file_name) 436 self.readable_content_full_file_name: str = self.content_dir_rel(self.readable_content_file_name) 437 self.old_global_fake_result: 'TestCaseState' = cast('TestCaseState', None) 438 if register: 439 self.register() 440 441 @property 442 def current_test_id(self) -> Hashable: 443 return self._current_test_id 444 445 @current_test_id.setter 446 def current_test_id(self, value: Hashable) -> None: 447 self._current_test_id = value 448 if value not in self.call_stack_per_test: 449 self.call_stack_per_test[value] = list() 450 451 @property 452 def call_stack(self) -> List[CallState]: 453 return self.call_stack_per_test[self.current_test_id] 454 455 @property 456 def expected_call_stack(self) -> List[CallState]: 457 try: 458 return self.expected_call_stack_per_test[self.current_test_id] 459 except KeyError: 460 return list() 461 462 def is_loaded(self) -> bool: 463 if not self.loaded: 464 raise ExpectedTestCaseStateIsNotLoadedError(f'Expected test case state is not loaded: {self.name}') 465 466 def check_current_state_item(self): 467 self.is_loaded() 468 call_stack_item: CallState = self.call_stack[-1] 469 expected_call_stack_item: CallState = self.expected_call_stack[len(self.call_stack) - 1] 470 if call_stack_item != expected_call_stack_item: 471 raise CallStackIsNotEqualError(f'Expected call stack item: {expected_call_stack_item}\nCurrent call stack item: {call_stack_item}') 472 473 def check_state_item(self, item_index: int): 474 self.is_loaded() 475 call_stack_item: CallState = self.call_stack[item_index] 476 expected_call_stack_item: CallState = self.expected_call_stack[item_index] 477 if call_stack_item != expected_call_stack_item: 478 raise CallStackIsNotEqualError(f'Index: {item_index}\nExpected call stack item: {expected_call_stack_item}\nActual call stack item: {call_stack_item}') 479 480 def check_state_range(self, item_start_index: int, item_end_index: int): 481 self.is_loaded() 482 call_stack_part: List[CallState] = self.call_stack[item_start_index: item_end_index] 483 expected_call_stack_part: List[CallState] = self.expected_call_stack[item_start_index: item_end_index] 484 if call_stack_part != expected_call_stack_part: 485 raise CallStackIsNotEqualError(f'Index: [{item_start_index}:{item_end_index}]\nExpected call stack part: {expected_call_stack_part}\nActual call stack part: {call_stack_part}') 486 487 def check_current_state(self): 488 self.is_loaded() 489 expected_call_stack_part: List[CallState] = self.expected_call_stack[:len(self.call_stack)] 490 if self.call_stack != expected_call_stack_part: 491 raise CallStackIsNotEqualError(f'Expected call stack: {expected_call_stack_part}\nActual call stack: {self.call_stack}') 492 493 def check_whole_state(self): 494 self.is_loaded() 495 if self.call_stack != self.expected_call_stack: 496 raise CallStackIsNotEqualError(f'Expected call stack: {self.expected_call_stack}\nActual call stack: {self.call_stack}') 497 498 def check_all_tests_state(self): 499 self.is_loaded() 500 if self.call_stack_per_test != self.expected_call_stack_per_test: 501 raise CallStackIsNotEqualError(f'Expected call stacks: {self.expected_call_stack_per_test}\nActual call stacks: {self.call_stack_per_test}') 502 503 def register_intro(self, raise_exceptions: Optional[bool] = None) -> ContextManager[ValueHolder]: 504 class RIContextManager: 505 def __init__(self, testcasestate: 'TestCaseState') -> None: 506 self.testcasestate: 'TestCaseState' = testcasestate 507 self.current_entity: Callable = None 508 self.code_params_with_values: CodeParamsWithValues = None 509 self.result_holder: ValueHolder = ValueHolder() 510 self.exception_holder: ValueHolder = ValueHolder() 511 self.args: Tuple = tuple() 512 self.kwargs: Dict = dict() 513 514 def __enter__(self) -> ValueHolder: 515 self.current_entity: Callable = find_current_entity(2) 516 self.code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2) 517 self.args = self.code_params_with_values.positional + self.code_params_with_values.positional_only 518 self.args = tuple((arg[1] for arg in self.args)) 519 self.kwargs = dict(self.code_params_with_values.keyword_only) 520 return self.result_holder 521 522 def __exit__(self, exc_type, exc_val, exc_tb) -> None: 523 result = None 524 if exc_type is not None: 525 self.exception_holder.value = exc_val.with_traceback(exc_tb) 526 if raise_exceptions or ((raise_exceptions is None) and self.testcasestate.raise_exceptions): 527 result = True 528 529 current_call_state: CallState = CallState(self.current_entity, self.code_params_with_values, self.args, self.kwargs, self.result_holder, self.exception_holder) 530 self.testcasestate.call_stack.append(current_call_state) 531 self.testcasestate.check_current_state_item() 532 return result 533 534 return RIContextManager(self) 535 536 # def context_manager(self: 'TestCaseState'): 537 # current_entity: Callable = find_current_entity(2) 538 # code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2) 539 # result_holder: ValueHolder = ValueHolder() 540 # exception_holder: ValueHolder = ValueHolder() 541 # try: 542 # yield result_holder 543 # except: 544 # exception_holder.value = get_exception() 545 # if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 546 # raise 547 # finally: 548 # current_call_state: CallState = CallState(current_entity, code_params_with_values, result_holder, exception_holder) 549 # self.call_stack.append(current_call_state) 550 # self.check_current_state_item() 551 552 # return context_manager(self) 553 554 # @contextmanager 555 # def register_intro(self, raise_exceptions: Optional[bool] = None) -> ContextManager[ValueHolder]: 556 # current_entity: Callable = find_current_entity(2) 557 # code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2) 558 # result_holder: ValueHolder = ValueHolder() 559 # exception_holder: ValueHolder = ValueHolder() 560 # try: 561 # yield result_holder 562 # except: 563 # exception_holder.value = get_exception() 564 # if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 565 # raise 566 # finally: 567 # current_call_state: CallState = CallState(current_entity, code_params_with_values, result_holder, exception_holder) 568 # self.call_stack.append(current_call_state) 569 # self.check_current_state_item() 570 571 ri = register_intro 572 573 def register_outro(self, func: Callable, raise_exceptions: Optional[bool] = None) -> Callable: 574 original_func: Callable = func 575 if hasattr(original_func, 'cr_frame'): 576 original_func = find_entity(original_func.cr_frame) 577 578 if is_async(func): 579 if inspect.isawaitable(func): 580 async def awaitable_wrapper() -> Any: 581 current_entity: Awaitable = func 582 if hasattr(current_entity, 'cr_frame'): 583 # code_params_with_values: CodeParamsWithValues = intro_frame_params_with_values(current_entity.cr_frame) 584 code_params_with_values, result_args, result_kwargs = func_params_with_values(find_entity(original_func.cr_frame), tuple(), dict()) 585 code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values) 586 else: 587 code_params_with_values: CodeParamsWithValues = CodeParamsWithValues() 588 result_args = tuple() 589 result_kwargs = dict() 590 591 result_holder: ValueHolder = ValueHolder() 592 exception_holder: ValueHolder = ValueHolder() 593 try: 594 result = await func 595 result_holder.value = result 596 return result 597 except: 598 exception_holder.value = get_exception() 599 if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 600 raise 601 finally: 602 current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder) 603 self.call_stack.append(current_call_state) 604 self.check_current_state_item() 605 606 wrapper = awaitable_wrapper() 607 else: 608 async def async_wrapper(*args, **kwargs) -> Any: 609 current_entity: Callable = func 610 code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs) 611 code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values) 612 result_holder: ValueHolder = ValueHolder() 613 exception_holder: ValueHolder = ValueHolder() 614 try: 615 result = await func(*args, **kwargs) 616 result_holder.value = result 617 return result 618 except: 619 exception_holder.value = get_exception() 620 if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 621 raise 622 finally: 623 current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder) 624 self.call_stack.append(current_call_state) 625 self.check_current_state_item() 626 627 wrapper = async_wrapper 628 else: 629 def sync_wrapper(*args, **kwargs) -> Any: 630 current_entity: Callable = func 631 code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs) 632 code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values) 633 result_holder: ValueHolder = ValueHolder() 634 exception_holder: ValueHolder = ValueHolder() 635 try: 636 result = func(*args, **kwargs) 637 result_holder.value = result 638 return result 639 except: 640 exception_holder.value = get_exception() 641 if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 642 raise 643 finally: 644 current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder) 645 self.call_stack.append(current_call_state) 646 self.check_current_state_item() 647 648 wrapper = sync_wrapper 649 650 original_func_sign: inspect.Signature = inspect.signature(original_func) 651 update_wrapper(wrapper, original_func) 652 wrapper.__signature__ = original_func_sign.replace(parameters=tuple(original_func_sign.parameters.values()), return_annotation=original_func_sign.return_annotation) 653 return wrapper 654 655 ro = register_outro 656 657 def register_last_result(self, result: Any) -> None: 658 last_call_state: CallState = self.call_stack[-1] 659 result_holder: ValueHolder = last_call_state.result_holder 660 if result_holder: 661 raise ResultAlreadyRegisteredError(f'For last call state: {last_call_state}') 662 663 result_holder.value = result 664 665 rls = register_last_result 666 667 def register_last_exception(self, result: Any) -> None: 668 last_call_state: CallState = self.call_stack[-1] 669 exception_holder: ValueHolder = last_call_state.result_holder 670 if exception_holder: 671 raise ExceptionAlreadyRegisteredError(f'For last call state: {last_call_state}') 672 673 exception_holder.value = result 674 675 rle = register_last_exception 676 677 def try_to_load_expected_call_stack(self) -> None: 678 """ 679 Will try to load a pickle file with a refference results (an expected_results) 680 """ 681 if os.path.exists(self.content_full_file_name) and os.path.isfile(self.content_full_file_name): 682 with open(self.content_full_file_name, 'rb') as file: 683 self.expected_call_stack_per_test = pickle.load(file) 684 self.loaded = True 685 686 def try_to_save_expected_call_stack(self) -> None: 687 """ 688 Will try to save a refference results (an expected_results) to a pickle file. Will not save them if 689 pickle file was already successfully loaded with a 'FakeResults.try_to_load_expected_call_stack()' call. 690 """ 691 if self.loaded: 692 return 693 694 if not (os.path.exists(self.content_full_file_name) and os.path.isfile(self.content_full_file_name)): 695 with open(self.content_full_file_name, 'wb') as file: 696 pickle.dump(self.call_stack_per_test, file) 697 698 if not (os.path.exists(self.readable_content_full_file_name) and os.path.isfile(self.readable_content_full_file_name)): 699 with open(self.readable_content_full_file_name, 'wb') as file: 700 file.write(self.prepare_readable_content().encode('utf-8')) 701 702 def prepare_readable_content(self) -> str: 703 content: str = str() 704 content += f'# Test Case: {self.name}\n' 705 for test_id, call_stack in self.call_stack_per_test.items(): 706 content += f'\n\n## Test ID: {test_id}\n' 707 for index, call_state in enumerate(call_stack): 708 content += f'\n\t{index}: {call_state.entity.__name__}\n' 709 content += f'\t\t{call_state.params_with_values}\n' 710 content += f'\t\targs: {call_state.args}\n' 711 content += f'\t\tkwargs: {call_state.kwargs}\n' 712 if call_state.result_holder: 713 content += f'\t\tresult: {call_state.result_holder.value}\n' 714 if call_state.exception_holder: 715 content += f'\t\texception: {call_state.exception_holder.value}\n' 716 717 return content 718 719 def register(self): 720 """ 721 Will register current instance to a global 'TEST_CASE_STATE' variable. Will save a previous value 722 """ 723 self.try_to_load_expected_call_stack() 724 725 global TEST_CASE_STATE 726 self.old_global_fake_result = TEST_CASE_STATE 727 TEST_CASE_STATE = self 728 729 def __enter__(self): 730 self.register() 731 return self 732 733 def unregister(self, should_be_saved: bool = True): 734 """ 735 Will restore a previous value of the global 'TEST_CASE_STATE' variable 736 """ 737 global TEST_CASE_STATE 738 TEST_CASE_STATE = self.old_global_fake_result 739 740 if should_be_saved: 741 self.try_to_save_expected_call_stack() 742 743 def __exit__(self, exc_type, exc_val, exc_tb): 744 self.unregister((exc_type is None) and (exc_val is None) and (exc_tb is None)) 745 return False
416 def __init__(self, name: Optional[str] = None, raise_exceptions: bool = True, register: bool = True, depth: Optional[int] = 1): 417 if name is None: 418 parent_entity = find_current_entity(depth + 1) 419 parent_class = entity_class(parent_entity) 420 parent_class_name: str = parent_class.__name__ 421 422 self.name: str = name or parent_class_name 423 self.raise_exceptions: bool = raise_exceptions 424 self._current_test_id: Hashable = None 425 self.call_stack_per_test: OrderedDict[Hashable, List[CallState]] = OrderedDict({ 426 None: list() 427 }) 428 self.expected_call_stack_per_test: OrderedDict[Hashable, List[CallState]] = OrderedDict({ 429 None: list() 430 }) 431 self.loaded: bool = False 432 self.content_file_name: str = f'{self.name}__test_case_state.pickle' 433 self.readable_content_file_name: str = f'{self.name}__test_case_state.md' 434 self.content_dir_rel: RelativePath = relative_to_src(depth + 1) 435 self.content_full_file_name: str = self.content_dir_rel(self.content_file_name) 436 self.readable_content_full_file_name: str = self.content_dir_rel(self.readable_content_file_name) 437 self.old_global_fake_result: 'TestCaseState' = cast('TestCaseState', None) 438 if register: 439 self.register()
466 def check_current_state_item(self): 467 self.is_loaded() 468 call_stack_item: CallState = self.call_stack[-1] 469 expected_call_stack_item: CallState = self.expected_call_stack[len(self.call_stack) - 1] 470 if call_stack_item != expected_call_stack_item: 471 raise CallStackIsNotEqualError(f'Expected call stack item: {expected_call_stack_item}\nCurrent call stack item: {call_stack_item}')
473 def check_state_item(self, item_index: int): 474 self.is_loaded() 475 call_stack_item: CallState = self.call_stack[item_index] 476 expected_call_stack_item: CallState = self.expected_call_stack[item_index] 477 if call_stack_item != expected_call_stack_item: 478 raise CallStackIsNotEqualError(f'Index: {item_index}\nExpected call stack item: {expected_call_stack_item}\nActual call stack item: {call_stack_item}')
480 def check_state_range(self, item_start_index: int, item_end_index: int): 481 self.is_loaded() 482 call_stack_part: List[CallState] = self.call_stack[item_start_index: item_end_index] 483 expected_call_stack_part: List[CallState] = self.expected_call_stack[item_start_index: item_end_index] 484 if call_stack_part != expected_call_stack_part: 485 raise CallStackIsNotEqualError(f'Index: [{item_start_index}:{item_end_index}]\nExpected call stack part: {expected_call_stack_part}\nActual call stack part: {call_stack_part}')
487 def check_current_state(self): 488 self.is_loaded() 489 expected_call_stack_part: List[CallState] = self.expected_call_stack[:len(self.call_stack)] 490 if self.call_stack != expected_call_stack_part: 491 raise CallStackIsNotEqualError(f'Expected call stack: {expected_call_stack_part}\nActual call stack: {self.call_stack}')
503 def register_intro(self, raise_exceptions: Optional[bool] = None) -> ContextManager[ValueHolder]: 504 class RIContextManager: 505 def __init__(self, testcasestate: 'TestCaseState') -> None: 506 self.testcasestate: 'TestCaseState' = testcasestate 507 self.current_entity: Callable = None 508 self.code_params_with_values: CodeParamsWithValues = None 509 self.result_holder: ValueHolder = ValueHolder() 510 self.exception_holder: ValueHolder = ValueHolder() 511 self.args: Tuple = tuple() 512 self.kwargs: Dict = dict() 513 514 def __enter__(self) -> ValueHolder: 515 self.current_entity: Callable = find_current_entity(2) 516 self.code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2) 517 self.args = self.code_params_with_values.positional + self.code_params_with_values.positional_only 518 self.args = tuple((arg[1] for arg in self.args)) 519 self.kwargs = dict(self.code_params_with_values.keyword_only) 520 return self.result_holder 521 522 def __exit__(self, exc_type, exc_val, exc_tb) -> None: 523 result = None 524 if exc_type is not None: 525 self.exception_holder.value = exc_val.with_traceback(exc_tb) 526 if raise_exceptions or ((raise_exceptions is None) and self.testcasestate.raise_exceptions): 527 result = True 528 529 current_call_state: CallState = CallState(self.current_entity, self.code_params_with_values, self.args, self.kwargs, self.result_holder, self.exception_holder) 530 self.testcasestate.call_stack.append(current_call_state) 531 self.testcasestate.check_current_state_item() 532 return result 533 534 return RIContextManager(self) 535 536 # def context_manager(self: 'TestCaseState'): 537 # current_entity: Callable = find_current_entity(2) 538 # code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2) 539 # result_holder: ValueHolder = ValueHolder() 540 # exception_holder: ValueHolder = ValueHolder() 541 # try: 542 # yield result_holder 543 # except: 544 # exception_holder.value = get_exception() 545 # if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 546 # raise 547 # finally: 548 # current_call_state: CallState = CallState(current_entity, code_params_with_values, result_holder, exception_holder) 549 # self.call_stack.append(current_call_state) 550 # self.check_current_state_item() 551 552 # return context_manager(self)
503 def register_intro(self, raise_exceptions: Optional[bool] = None) -> ContextManager[ValueHolder]: 504 class RIContextManager: 505 def __init__(self, testcasestate: 'TestCaseState') -> None: 506 self.testcasestate: 'TestCaseState' = testcasestate 507 self.current_entity: Callable = None 508 self.code_params_with_values: CodeParamsWithValues = None 509 self.result_holder: ValueHolder = ValueHolder() 510 self.exception_holder: ValueHolder = ValueHolder() 511 self.args: Tuple = tuple() 512 self.kwargs: Dict = dict() 513 514 def __enter__(self) -> ValueHolder: 515 self.current_entity: Callable = find_current_entity(2) 516 self.code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2) 517 self.args = self.code_params_with_values.positional + self.code_params_with_values.positional_only 518 self.args = tuple((arg[1] for arg in self.args)) 519 self.kwargs = dict(self.code_params_with_values.keyword_only) 520 return self.result_holder 521 522 def __exit__(self, exc_type, exc_val, exc_tb) -> None: 523 result = None 524 if exc_type is not None: 525 self.exception_holder.value = exc_val.with_traceback(exc_tb) 526 if raise_exceptions or ((raise_exceptions is None) and self.testcasestate.raise_exceptions): 527 result = True 528 529 current_call_state: CallState = CallState(self.current_entity, self.code_params_with_values, self.args, self.kwargs, self.result_holder, self.exception_holder) 530 self.testcasestate.call_stack.append(current_call_state) 531 self.testcasestate.check_current_state_item() 532 return result 533 534 return RIContextManager(self) 535 536 # def context_manager(self: 'TestCaseState'): 537 # current_entity: Callable = find_current_entity(2) 538 # code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2) 539 # result_holder: ValueHolder = ValueHolder() 540 # exception_holder: ValueHolder = ValueHolder() 541 # try: 542 # yield result_holder 543 # except: 544 # exception_holder.value = get_exception() 545 # if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 546 # raise 547 # finally: 548 # current_call_state: CallState = CallState(current_entity, code_params_with_values, result_holder, exception_holder) 549 # self.call_stack.append(current_call_state) 550 # self.check_current_state_item() 551 552 # return context_manager(self)
573 def register_outro(self, func: Callable, raise_exceptions: Optional[bool] = None) -> Callable: 574 original_func: Callable = func 575 if hasattr(original_func, 'cr_frame'): 576 original_func = find_entity(original_func.cr_frame) 577 578 if is_async(func): 579 if inspect.isawaitable(func): 580 async def awaitable_wrapper() -> Any: 581 current_entity: Awaitable = func 582 if hasattr(current_entity, 'cr_frame'): 583 # code_params_with_values: CodeParamsWithValues = intro_frame_params_with_values(current_entity.cr_frame) 584 code_params_with_values, result_args, result_kwargs = func_params_with_values(find_entity(original_func.cr_frame), tuple(), dict()) 585 code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values) 586 else: 587 code_params_with_values: CodeParamsWithValues = CodeParamsWithValues() 588 result_args = tuple() 589 result_kwargs = dict() 590 591 result_holder: ValueHolder = ValueHolder() 592 exception_holder: ValueHolder = ValueHolder() 593 try: 594 result = await func 595 result_holder.value = result 596 return result 597 except: 598 exception_holder.value = get_exception() 599 if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 600 raise 601 finally: 602 current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder) 603 self.call_stack.append(current_call_state) 604 self.check_current_state_item() 605 606 wrapper = awaitable_wrapper() 607 else: 608 async def async_wrapper(*args, **kwargs) -> Any: 609 current_entity: Callable = func 610 code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs) 611 code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values) 612 result_holder: ValueHolder = ValueHolder() 613 exception_holder: ValueHolder = ValueHolder() 614 try: 615 result = await func(*args, **kwargs) 616 result_holder.value = result 617 return result 618 except: 619 exception_holder.value = get_exception() 620 if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 621 raise 622 finally: 623 current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder) 624 self.call_stack.append(current_call_state) 625 self.check_current_state_item() 626 627 wrapper = async_wrapper 628 else: 629 def sync_wrapper(*args, **kwargs) -> Any: 630 current_entity: Callable = func 631 code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs) 632 code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values) 633 result_holder: ValueHolder = ValueHolder() 634 exception_holder: ValueHolder = ValueHolder() 635 try: 636 result = func(*args, **kwargs) 637 result_holder.value = result 638 return result 639 except: 640 exception_holder.value = get_exception() 641 if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 642 raise 643 finally: 644 current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder) 645 self.call_stack.append(current_call_state) 646 self.check_current_state_item() 647 648 wrapper = sync_wrapper 649 650 original_func_sign: inspect.Signature = inspect.signature(original_func) 651 update_wrapper(wrapper, original_func) 652 wrapper.__signature__ = original_func_sign.replace(parameters=tuple(original_func_sign.parameters.values()), return_annotation=original_func_sign.return_annotation) 653 return wrapper
573 def register_outro(self, func: Callable, raise_exceptions: Optional[bool] = None) -> Callable: 574 original_func: Callable = func 575 if hasattr(original_func, 'cr_frame'): 576 original_func = find_entity(original_func.cr_frame) 577 578 if is_async(func): 579 if inspect.isawaitable(func): 580 async def awaitable_wrapper() -> Any: 581 current_entity: Awaitable = func 582 if hasattr(current_entity, 'cr_frame'): 583 # code_params_with_values: CodeParamsWithValues = intro_frame_params_with_values(current_entity.cr_frame) 584 code_params_with_values, result_args, result_kwargs = func_params_with_values(find_entity(original_func.cr_frame), tuple(), dict()) 585 code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values) 586 else: 587 code_params_with_values: CodeParamsWithValues = CodeParamsWithValues() 588 result_args = tuple() 589 result_kwargs = dict() 590 591 result_holder: ValueHolder = ValueHolder() 592 exception_holder: ValueHolder = ValueHolder() 593 try: 594 result = await func 595 result_holder.value = result 596 return result 597 except: 598 exception_holder.value = get_exception() 599 if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 600 raise 601 finally: 602 current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder) 603 self.call_stack.append(current_call_state) 604 self.check_current_state_item() 605 606 wrapper = awaitable_wrapper() 607 else: 608 async def async_wrapper(*args, **kwargs) -> Any: 609 current_entity: Callable = func 610 code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs) 611 code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values) 612 result_holder: ValueHolder = ValueHolder() 613 exception_holder: ValueHolder = ValueHolder() 614 try: 615 result = await func(*args, **kwargs) 616 result_holder.value = result 617 return result 618 except: 619 exception_holder.value = get_exception() 620 if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 621 raise 622 finally: 623 current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder) 624 self.call_stack.append(current_call_state) 625 self.check_current_state_item() 626 627 wrapper = async_wrapper 628 else: 629 def sync_wrapper(*args, **kwargs) -> Any: 630 current_entity: Callable = func 631 code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs) 632 code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values) 633 result_holder: ValueHolder = ValueHolder() 634 exception_holder: ValueHolder = ValueHolder() 635 try: 636 result = func(*args, **kwargs) 637 result_holder.value = result 638 return result 639 except: 640 exception_holder.value = get_exception() 641 if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions): 642 raise 643 finally: 644 current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder) 645 self.call_stack.append(current_call_state) 646 self.check_current_state_item() 647 648 wrapper = sync_wrapper 649 650 original_func_sign: inspect.Signature = inspect.signature(original_func) 651 update_wrapper(wrapper, original_func) 652 wrapper.__signature__ = original_func_sign.replace(parameters=tuple(original_func_sign.parameters.values()), return_annotation=original_func_sign.return_annotation) 653 return wrapper
657 def register_last_result(self, result: Any) -> None: 658 last_call_state: CallState = self.call_stack[-1] 659 result_holder: ValueHolder = last_call_state.result_holder 660 if result_holder: 661 raise ResultAlreadyRegisteredError(f'For last call state: {last_call_state}') 662 663 result_holder.value = result
657 def register_last_result(self, result: Any) -> None: 658 last_call_state: CallState = self.call_stack[-1] 659 result_holder: ValueHolder = last_call_state.result_holder 660 if result_holder: 661 raise ResultAlreadyRegisteredError(f'For last call state: {last_call_state}') 662 663 result_holder.value = result
667 def register_last_exception(self, result: Any) -> None: 668 last_call_state: CallState = self.call_stack[-1] 669 exception_holder: ValueHolder = last_call_state.result_holder 670 if exception_holder: 671 raise ExceptionAlreadyRegisteredError(f'For last call state: {last_call_state}') 672 673 exception_holder.value = result
667 def register_last_exception(self, result: Any) -> None: 668 last_call_state: CallState = self.call_stack[-1] 669 exception_holder: ValueHolder = last_call_state.result_holder 670 if exception_holder: 671 raise ExceptionAlreadyRegisteredError(f'For last call state: {last_call_state}') 672 673 exception_holder.value = result
677 def try_to_load_expected_call_stack(self) -> None: 678 """ 679 Will try to load a pickle file with a refference results (an expected_results) 680 """ 681 if os.path.exists(self.content_full_file_name) and os.path.isfile(self.content_full_file_name): 682 with open(self.content_full_file_name, 'rb') as file: 683 self.expected_call_stack_per_test = pickle.load(file) 684 self.loaded = True
Will try to load a pickle file with a refference results (an expected_results)
686 def try_to_save_expected_call_stack(self) -> None: 687 """ 688 Will try to save a refference results (an expected_results) to a pickle file. Will not save them if 689 pickle file was already successfully loaded with a 'FakeResults.try_to_load_expected_call_stack()' call. 690 """ 691 if self.loaded: 692 return 693 694 if not (os.path.exists(self.content_full_file_name) and os.path.isfile(self.content_full_file_name)): 695 with open(self.content_full_file_name, 'wb') as file: 696 pickle.dump(self.call_stack_per_test, file) 697 698 if not (os.path.exists(self.readable_content_full_file_name) and os.path.isfile(self.readable_content_full_file_name)): 699 with open(self.readable_content_full_file_name, 'wb') as file: 700 file.write(self.prepare_readable_content().encode('utf-8'))
Will try to save a refference results (an expected_results) to a pickle file. Will not save them if pickle file was already successfully loaded with a 'FakeResults.try_to_load_expected_call_stack()' call.
702 def prepare_readable_content(self) -> str: 703 content: str = str() 704 content += f'# Test Case: {self.name}\n' 705 for test_id, call_stack in self.call_stack_per_test.items(): 706 content += f'\n\n## Test ID: {test_id}\n' 707 for index, call_state in enumerate(call_stack): 708 content += f'\n\t{index}: {call_state.entity.__name__}\n' 709 content += f'\t\t{call_state.params_with_values}\n' 710 content += f'\t\targs: {call_state.args}\n' 711 content += f'\t\tkwargs: {call_state.kwargs}\n' 712 if call_state.result_holder: 713 content += f'\t\tresult: {call_state.result_holder.value}\n' 714 if call_state.exception_holder: 715 content += f'\t\texception: {call_state.exception_holder.value}\n' 716 717 return content
719 def register(self): 720 """ 721 Will register current instance to a global 'TEST_CASE_STATE' variable. Will save a previous value 722 """ 723 self.try_to_load_expected_call_stack() 724 725 global TEST_CASE_STATE 726 self.old_global_fake_result = TEST_CASE_STATE 727 TEST_CASE_STATE = self
Will register current instance to a global 'TEST_CASE_STATE' variable. Will save a previous value
733 def unregister(self, should_be_saved: bool = True): 734 """ 735 Will restore a previous value of the global 'TEST_CASE_STATE' variable 736 """ 737 global TEST_CASE_STATE 738 TEST_CASE_STATE = self.old_global_fake_result 739 740 if should_be_saved: 741 self.try_to_save_expected_call_stack()
Will restore a previous value of the global 'TEST_CASE_STATE' variable