cengal.unittest.behavior_stabilizer.versions.v_0.behavior_stabilizer

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = ['CallState', 'ResultAlreadyRegisteredError', 'ExceptionAlreadyRegisteredError', 'CallStackIsNotEqualError', 
 20           'ExpectedTestCaseStateIsNotLoadedError', 'TEST_CASE_STATE', 'get_test_case_state', 'TestCaseState']
 21
 22
 23import datetime
 24import inspect
 25import json
 26import logging
 27import os
 28import traceback
 29import unittest
 30import pickle
 31from contextlib import contextmanager
 32from types import FrameType
 33from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, Union, Hashable, cast, ContextManager, Callable, Awaitable, OrderedDict
 34from cengal.code_flow_control.smart_values import ValueHolder
 35from cengal.introspection.inspect import CodeParamsWithValues, intro_func_params_with_values, find_current_entity, find_entity, is_async, \
 36    intro_frame_params_with_values, entity_class, get_exception, func_params_with_values
 37from cengal.file_system.path_manager import relative_to_src, RelativePath
 38from collections import namedtuple, OrderedDict
 39from functools import wraps, update_wrapper
 40
 41
 42"""
 43Module Docstring
 44Docstrings: http://www.python.org/dev/peps/pep-0257/
 45"""
 46
 47__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 48__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 49__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 50__license__ = "Apache License, Version 2.0"
 51__version__ = "4.4.1"
 52__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 53__email__ = "gtalk@butenkoms.space"
 54# __status__ = "Prototype"
 55__status__ = "Development"
 56# __status__ = "Production"
 57
 58
 59VarName = str
 60VarValue = Any
 61MeasurementId = int
 62StageId = Hashable
 63
 64
 65def param(param_name: str) -> str:
 66    """
 67    Returns a formatted full name of the parameter: 'ClassName__m__MethodName__p__ParameterName'
 68
 69    Returns:
 70    str: furmatted full name of the parameter
 71    """
 72    frame = cast(FrameType, inspect.currentframe()).f_back
 73    real_func = cast(FrameType, inspect.currentframe()).f_back.f_code
 74    func_name = real_func.co_name
 75    args, _, _, values = inspect.getargvalues(frame)
 76    parent_obj = None
 77    for par_name, par_val in [(i, values[i]) for i in args]:
 78        if 'self' == par_name:
 79            parent_obj = par_val
 80            break  # TODO: check if it is needed
 81    
 82    class_name = str()
 83    if parent_obj is not None:
 84        class_name = type(parent_obj).__name__
 85    
 86    param_full_name = f'{class_name}__m__{func_name}__p__{param_name}'
 87    return param_full_name
 88
 89
 90def param2(param_name: str) -> str:
 91    raise NotImplementedError
 92
 93
 94FAKE_RESULTS: 'FakeResults' = cast('FakeResults', None)
 95
 96
 97def get_fake_results() -> 'FakeResults':
 98    global FAKE_RESULTS
 99    if FAKE_RESULTS is None:
100        FAKE_RESULTS = FakeResults()
101    return FAKE_RESULTS
102
103
104class ParameterIsNotInNotInTheListOfExpectedResultsError(Exception):
105    """
106    Will be raised by FakeResults.check_result() when `param_name not in self.expected_results`
107    """
108    pass
109
110
111class ResultNumberIsNotValidError(Exception):
112    """
113    Will be raised by FakeResults.check_result() when `result_number not in self.expected_results[param_name]`
114    """
115    pass
116
117
118class FakeResults:
119    """
120    Holds a both an actual and a refference (an expected) results and compares them
121
122    Saves a taken results as a refference 'expected_results' to a json file on FakeResults.try_to_save_expected_results() if 
123    this file was not already loaded by the FakeResults.try_to_load_expected_results() method.
124
125    So if FakeResults.try_to_load_expected_results() will be called before all tests and FakeResults.try_to_save_expected_results()
126    will be called after all tests:
127        1) At a first iteration it will take all generated results and will save them to the json file as a refference (as an expected_results).
128        This step must be made on a developers' machine. Resulting json file must be added to the repository.
129        2) At all further calls it will find json file exists and will load it's content as a refference (as an expected_results).
130    
131    Can be used as a 'with' context manager which will try to load json file at the start and will try to save a refference to the json file
132    at the end (if json file wasn't loaded at the start).
133
134    Raises:
135        NotImplementedError: _description_
136        NotImplementedError: _description_
137        NotImplementedError: _description_
138        NotImplementedError: _description_
139        NotImplementedError: _description_
140        NotImplementedError: _description_
141        NotImplementedError: _description_
142    """
143    
144    def __init__(self, content_full_file_name: str) -> None:
145        self.was_loaded = False
146        self.results: Dict[str, Dict[int, str]] = dict()
147        self.expected_results: Dict[str, Dict[int, str]] = dict()
148        self.content_full_file_name = content_full_file_name
149        self.old_global_fake_result: 'FakeResults' = cast('FakeResults', None)
150    
151    def add_result(self, param_name: str, result: Any) -> None:
152        """
153        Will add a result to a list of the results for a requested parameter
154
155        For example we have a function:
156            def add_int(a: int, b: int) -> int:
157                return a + b
158        
159        Then we can change it in the next way:
160            def add_int(a: int, b: int) -> int:
161                result = a + b
162                get_fake_results().add_result(param('a'), a)
163                get_fake_results().add_result(param('b'), b)
164                # get_fake_results().add_result(param('->'), result)  # TODO: check if it is needed
165                return result
166        
167        As result we will save all 'a' and 'b' parameters of an each 'add_int()' call to an ordered sequence and will
168        be able to compare them with a refference values by the calling FakeResults.check_result(), FakeResults.check_results_range() or FakeResults.check_all_results()
169
170        Parameters:
171        param_name (str): your parameter name
172        result (Any): current parameters' content
173        """
174        result = str(result)
175        if param_name not in self.results:
176            self.results[param_name] = dict()
177        
178        index = 0
179        if self.results[param_name]:
180            index = max(self.results[param_name].keys()) + 1
181        
182        self.results[param_name][index] = result
183    
184    def add_expected_result(self, param_name: str, expected_result: Any) -> None:
185        """
186        Will add single expected result for the parameter
187        
188        Parameters:
189        param_name (str): your parameter name
190        expected_result (Any): expected parameters' content
191        """
192        expected_result = str(expected_result)
193        if param_name not in self.expected_results:
194            self.expected_results[param_name] = dict()
195        
196        index = 0
197        if self.expected_results[param_name]:
198            index = max(self.expected_results[param_name].keys()) + 1
199        
200        self.expected_results[param_name][index] = expected_result
201
202    def register_param_expected_results(self, param_name: str, expected_results: List[Any]) -> None:
203        """
204        Will register a list of expected results for the parameter
205        Will add a list of expected results for the parameter
206        
207        Parameters:
208        param_name (str): your parameter name
209        expected_results (List[Any]): list of expected parameters' content
210        """
211        for expected_result in expected_results:
212            self.add_expected_result(param_name, expected_result)
213    
214    def register_expected_results(self, expected_results: Dict[str, Dict[int, str]]) -> None:
215        """
216        Will register a dict of expected results for all requested parameters
217
218        Parameters:
219        expected_results (Dict[str, Dict[int, str]]): key - parameter name; value.key - index of the expected parameter's
220        content; value.value - expected parameter's content
221        """
222        self.expected_results = expected_results
223    
224    def check_result(self, param_name: str, result_number: Optional[int] = None) -> bool:
225        """
226        Will compare requested parameter's result with an expected result. if result_number is not None -  will compare
227        exact items in a sequence instead of comparing all expected results.
228        Will return True to an each call until json file will be loaded with a 'FakeResults.try_to_load_expected_results()' call.
229
230        Returns:
231        bool: True if result is equal to an expected result
232        """
233        if not self.was_loaded:
234            return True
235        
236        if param_name not in self.expected_results:
237            raise ParameterIsNotInNotInTheListOfExpectedResultsError(param_name)
238        
239        if param_name not in self.results:
240            print(f'ERROR: PARAM NOT IN RESULTS: {param_name}')
241            return False
242
243        if result_number is not None:
244            if result_number not in self.expected_results[param_name]:
245                raise ResultNumberIsNotValidError((param_name, result_number))
246            
247            if result_number not in self.results[param_name]:
248                print(f'ERROR: PARAM\'S RESULT NUMBER {result_number} NOT IN RESULTS: {param_name}')
249                return False
250
251        if result_number is None:
252            returned_val: Any = self.results[param_name]
253            expected_val: Any = self.expected_results[param_name]
254        else:
255            returned_val: Any = self.results[param_name][result_number]
256            expected_val: Any = self.expected_results[param_name][result_number]
257        
258        if returned_val == expected_val:
259            return True
260        else:
261            print(f'ERROR: PARAM IS NOT EQUAL TO EXPECTED: {param_name}\n\tRETURNED VAL: {returned_val}\n\tEXPECTED VAL: {expected_val}\n~~~~~~~~~~~~~~~~~~~~~~~~~')
262            # return False  # TODO: check if it is needed
263    
264    def check_results_range(self, param_name: str, result_number_from: int, result_number_up_to: int) -> bool:
265        """
266        Will compare a range of requested parameters' results with an expected results.
267        Will return True to an each call until json file will be loaded with a 'FakeResults.try_to_load_expected_results()' call.
268        Behavior of the 'results_number_from' and 'results_number_up_to' parameters is the same as in the 'range()' call.
269
270        Parameters:
271        param_name (str): your parameter name
272        result_number_from (int): start index of the results range
273        result_number_up_to (int): end index of the results range
274
275        Returns:
276        bool: True if all results in the range are equal to an expected results
277        """
278        for result_number in range(result_number_from, result_number_up_to):
279            if not self.check_result(param_name, result_number=result_number):
280                return False
281        
282        return True
283
284    def check_current_results(self) -> bool:
285        """
286        Will compare all requested parameters' results with an expected results.
287        Will return True to an each call until json file will be loaded with a 'FakeResults.try_to_load_expected_results()' call.
288
289        Returns:
290        bool: True if all results are equal to an expected results
291        """
292        for param_name, _ in self.results.items():
293            if not self.check_result(param_name):
294                return False
295        
296        return True
297
298    def check_all_results(self) -> bool:
299        """
300        Will compare all requested parameters' results with an expected results.
301        Will return True to an each call until json file will be loaded with a 'FakeResults.try_to_load_expected_results()' call.
302
303        Returns:
304        bool: True if all results are equal to an expected results
305        """
306        for param_name, _ in self.expected_results.items():
307            if not self.check_result(param_name):
308                return False
309        
310        return True
311
312    def try_to_load_expected_results(self, full_file_name: str) -> None:
313        """
314        Will try to load a json file with a refference results (an expected_results)
315        
316        Parameters:
317        full_file_name (str): full file name of the json file with a refference results (an expected_results)
318        """
319        if os.path.exists(full_file_name) and os.path.isfile(full_file_name):
320            content = None
321            with open(full_file_name, 'rb') as file:
322                content = file.read()
323            
324            if content:
325                content_str = content.decode('utf-8')
326                expected_results = json.loads(content_str)
327                for param, values in expected_results.items():
328                    if param not in self.expected_results:
329                        self.expected_results[param] = dict()
330                    
331                    for index_str, value in values.items():
332                        index = int(index_str)
333                        self.expected_results[param][index] = value
334                
335                self.was_loaded = True
336    
337    def try_to_save_expected_results(self, full_file_name: str) -> None:
338        """
339        Will try to save a refference results (an expected_results) to a json file. Will not save them if 
340        json file was already successfully loaded with a 'FakeResults.try_to_load_expected_results()' call.
341
342        Parameters:
343        full_file_name (str): full file name of the json file with a refference results (an expected_results)
344        """
345        if self.was_loaded:
346            return
347        
348        if not (os.path.exists(full_file_name) and os.path.isfile(full_file_name)):
349            content = json.dumps(self.results)
350            content = json.dumps(json.loads(content), indent=4)
351            content_bytes = content.encode('utf-8')
352            with open(full_file_name, 'wb+') as file:  # TODO: check if '+' in 'wb+' is needed
353                file.write(content_bytes)
354    
355    def register(self):
356        """
357        Will register current instance to a global 'FAKE_RESULTS' variable. Will save a previous value
358        """
359        self.try_to_load_expected_results(self.content_full_file_name)
360
361        global FAKE_RESULTS
362        self.old_global_fake_result = FAKE_RESULTS
363        FAKE_RESULTS = self
364    
365    def __enter__(self):
366        self.register()
367        return self
368
369    def unregister(self, should_be_saved: bool):
370        """
371        Will restore a previous value of the global 'FAKE_RESULTS' variable
372        """
373        global FAKE_RESULTS
374        FAKE_RESULTS = self.old_global_fake_result
375
376        if should_be_saved:
377            self.try_to_save_expected_results(self.content_full_file_name)
378
379    def __exit__(self, exc_type, exc_val, exc_tb):
380        self.unregister((exc_type is None) and (exc_val is None) and (exc_tb is None))
381        return False
382
383
384CallState = namedtuple("CallState", "entity params_with_values args kwargs result_holder exception_holder")
385
386
387class ResultAlreadyRegisteredError(Exception):
388    pass
389
390
391class ExceptionAlreadyRegisteredError(Exception):
392    pass
393
394
395class CallStackIsNotEqualError(Exception):
396    pass
397
398
399class ExpectedTestCaseStateIsNotLoadedError(Exception):
400    pass
401
402
403TEST_CASE_STATE: 'TestCaseState' = cast('TestCaseState', None)
404
405
406def get_test_case_state() -> 'TestCaseState':
407    global TEST_CASE_STATE
408    if TEST_CASE_STATE is None:
409        TEST_CASE_STATE = TestCaseState('Default')
410    
411    return TEST_CASE_STATE
412
413
414class TestCaseState:
415    def __init__(self, name: Optional[str] = None, raise_exceptions: bool = True, register: bool = True, depth: Optional[int] = 1):
416        if name is None:
417            parent_entity = find_current_entity(depth + 1)
418            parent_class = entity_class(parent_entity)
419            parent_class_name: str = parent_class.__name__
420
421        self.name: str = name or parent_class_name
422        self.raise_exceptions: bool = raise_exceptions
423        self._current_test_id: Hashable = None
424        self.call_stack_per_test: OrderedDict[Hashable, List[CallState]] = OrderedDict({
425            None: list()
426        })
427        self.expected_call_stack_per_test: OrderedDict[Hashable, List[CallState]] = OrderedDict({
428            None: list()
429        })
430        self.loaded: bool = False
431        self.content_file_name: str = f'{self.name}__test_case_state.pickle'
432        self.readable_content_file_name: str = f'{self.name}__test_case_state.md'
433        self.content_dir_rel: RelativePath = relative_to_src(depth + 1)
434        self.content_full_file_name: str = self.content_dir_rel(self.content_file_name)
435        self.readable_content_full_file_name: str = self.content_dir_rel(self.readable_content_file_name)
436        self.old_global_fake_result: 'TestCaseState' = cast('TestCaseState', None)
437        if register:
438            self.register()
439
440    @property
441    def current_test_id(self) -> Hashable:
442        return self._current_test_id
443    
444    @current_test_id.setter
445    def current_test_id(self, value: Hashable) -> None:
446        self._current_test_id = value
447        if value not in self.call_stack_per_test:
448            self.call_stack_per_test[value] = list()
449    
450    @property
451    def call_stack(self) -> List[CallState]:
452        return self.call_stack_per_test[self.current_test_id]
453    
454    @property
455    def expected_call_stack(self) -> List[CallState]:
456        try:
457            return self.expected_call_stack_per_test[self.current_test_id]
458        except KeyError:
459            return list()
460
461    def is_loaded(self) -> bool:
462        if not self.loaded:
463            raise ExpectedTestCaseStateIsNotLoadedError(f'Expected test case state is not loaded: {self.name}')
464    
465    def check_current_state_item(self):
466        self.is_loaded()
467        call_stack_item: CallState = self.call_stack[-1]
468        expected_call_stack_item: CallState = self.expected_call_stack[len(self.call_stack) - 1]
469        if call_stack_item != expected_call_stack_item:
470            raise CallStackIsNotEqualError(f'Expected call stack item: {expected_call_stack_item}\nCurrent call stack item: {call_stack_item}')
471    
472    def check_state_item(self, item_index: int):
473        self.is_loaded()
474        call_stack_item: CallState = self.call_stack[item_index]
475        expected_call_stack_item: CallState = self.expected_call_stack[item_index]
476        if call_stack_item != expected_call_stack_item:
477            raise CallStackIsNotEqualError(f'Index: {item_index}\nExpected call stack item: {expected_call_stack_item}\nActual call stack item: {call_stack_item}')
478    
479    def check_state_range(self, item_start_index: int, item_end_index: int):
480        self.is_loaded()
481        call_stack_part: List[CallState] = self.call_stack[item_start_index: item_end_index]
482        expected_call_stack_part: List[CallState] = self.expected_call_stack[item_start_index: item_end_index]
483        if call_stack_part != expected_call_stack_part:
484            raise CallStackIsNotEqualError(f'Index: [{item_start_index}:{item_end_index}]\nExpected call stack part: {expected_call_stack_part}\nActual call stack part: {call_stack_part}')
485    
486    def check_current_state(self):
487        self.is_loaded()
488        expected_call_stack_part: List[CallState] = self.expected_call_stack[:len(self.call_stack)]
489        if self.call_stack != expected_call_stack_part:
490            raise CallStackIsNotEqualError(f'Expected call stack: {expected_call_stack_part}\nActual call stack: {self.call_stack}')
491
492    def check_whole_state(self):
493        self.is_loaded()
494        if self.call_stack != self.expected_call_stack:
495            raise CallStackIsNotEqualError(f'Expected call stack: {self.expected_call_stack}\nActual call stack: {self.call_stack}')
496
497    def check_all_tests_state(self):
498        self.is_loaded()
499        if self.call_stack_per_test != self.expected_call_stack_per_test:
500            raise CallStackIsNotEqualError(f'Expected call stacks: {self.expected_call_stack_per_test}\nActual call stacks: {self.call_stack_per_test}')
501
502    def register_intro(self, raise_exceptions: Optional[bool] = None) -> ContextManager[ValueHolder]:
503        class RIContextManager:
504            def __init__(self, testcasestate: 'TestCaseState') -> None:
505                self.testcasestate: 'TestCaseState' = testcasestate
506                self.current_entity: Callable = None
507                self.code_params_with_values: CodeParamsWithValues = None
508                self.result_holder: ValueHolder = ValueHolder()
509                self.exception_holder: ValueHolder = ValueHolder()
510                self.args: Tuple = tuple()
511                self.kwargs: Dict = dict()
512            
513            def __enter__(self) -> ValueHolder:
514                self.current_entity: Callable = find_current_entity(2)
515                self.code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2)
516                self.args = self.code_params_with_values.positional + self.code_params_with_values.positional_only
517                self.args = tuple((arg[1] for arg in self.args))
518                self.kwargs = dict(self.code_params_with_values.keyword_only)
519                return self.result_holder
520            
521            def __exit__(self, exc_type, exc_val, exc_tb) -> None:
522                result = None
523                if exc_type is not None:
524                    self.exception_holder.value = exc_val.with_traceback(exc_tb)
525                    if raise_exceptions or ((raise_exceptions is None) and self.testcasestate.raise_exceptions):
526                        result = True
527
528                current_call_state: CallState = CallState(self.current_entity, self.code_params_with_values, self.args, self.kwargs, self.result_holder, self.exception_holder)
529                self.testcasestate.call_stack.append(current_call_state)
530                self.testcasestate.check_current_state_item()
531                return result
532        
533        return RIContextManager(self)
534
535        # def context_manager(self: 'TestCaseState'):
536        #     current_entity: Callable = find_current_entity(2)
537        #     code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2)
538        #     result_holder: ValueHolder = ValueHolder()
539        #     exception_holder: ValueHolder = ValueHolder()
540        #     try:
541        #         yield result_holder
542        #     except:
543        #         exception_holder.value = get_exception()
544        #         if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
545        #             raise
546        #     finally:
547        #         current_call_state: CallState = CallState(current_entity, code_params_with_values, result_holder, exception_holder)
548        #         self.call_stack.append(current_call_state)
549        #         self.check_current_state_item()
550        
551        # return context_manager(self)
552
553    # @contextmanager
554    # def register_intro(self, raise_exceptions: Optional[bool] = None) -> ContextManager[ValueHolder]:
555    #     current_entity: Callable = find_current_entity(2)
556    #     code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2)
557    #     result_holder: ValueHolder = ValueHolder()
558    #     exception_holder: ValueHolder = ValueHolder()
559    #     try:
560    #         yield result_holder
561    #     except:
562    #         exception_holder.value = get_exception()
563    #         if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
564    #             raise
565    #     finally:
566    #         current_call_state: CallState = CallState(current_entity, code_params_with_values, result_holder, exception_holder)
567    #         self.call_stack.append(current_call_state)
568    #         self.check_current_state_item()
569    
570    ri = register_intro
571    
572    def register_outro(self, func: Callable, raise_exceptions: Optional[bool] = None) -> Callable:
573        original_func: Callable = func
574        if hasattr(original_func, 'cr_frame'):
575            original_func = find_entity(original_func.cr_frame)
576
577        if is_async(func):
578            if inspect.isawaitable(func):
579                async def awaitable_wrapper() -> Any:
580                    current_entity: Awaitable = func
581                    if hasattr(current_entity, 'cr_frame'):
582                        # code_params_with_values: CodeParamsWithValues = intro_frame_params_with_values(current_entity.cr_frame)
583                        code_params_with_values, result_args, result_kwargs = func_params_with_values(find_entity(original_func.cr_frame), tuple(), dict())
584                        code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values)
585                    else:
586                        code_params_with_values: CodeParamsWithValues = CodeParamsWithValues()
587                        result_args = tuple()
588                        result_kwargs = dict()
589                    
590                    result_holder: ValueHolder = ValueHolder()
591                    exception_holder: ValueHolder = ValueHolder()
592                    try:
593                        result = await func
594                        result_holder.value = result
595                        return result
596                    except:
597                        exception_holder.value = get_exception()
598                        if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
599                            raise
600                    finally:
601                        current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder)
602                        self.call_stack.append(current_call_state)
603                        self.check_current_state_item()
604                
605                wrapper = awaitable_wrapper()
606            else:
607                async def async_wrapper(*args, **kwargs) -> Any:
608                    current_entity: Callable = func
609                    code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs)
610                    code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values)
611                    result_holder: ValueHolder = ValueHolder()
612                    exception_holder: ValueHolder = ValueHolder()
613                    try:
614                        result = await func(*args, **kwargs)
615                        result_holder.value = result
616                        return result
617                    except:
618                        exception_holder.value = get_exception()
619                        if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
620                            raise
621                    finally:
622                        current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder)
623                        self.call_stack.append(current_call_state)
624                        self.check_current_state_item()
625                
626                wrapper = async_wrapper
627        else:
628            def sync_wrapper(*args, **kwargs) -> Any:
629                current_entity: Callable = func
630                code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs)
631                code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values)
632                result_holder: ValueHolder = ValueHolder()
633                exception_holder: ValueHolder = ValueHolder()
634                try:
635                    result = func(*args, **kwargs)
636                    result_holder.value = result
637                    return result
638                except:
639                    exception_holder.value = get_exception()
640                    if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
641                        raise
642                finally:
643                    current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder)
644                    self.call_stack.append(current_call_state)
645                    self.check_current_state_item()
646            
647            wrapper = sync_wrapper
648        
649        original_func_sign: inspect.Signature = inspect.signature(original_func)
650        update_wrapper(wrapper, original_func)
651        wrapper.__signature__ = original_func_sign.replace(parameters=tuple(original_func_sign.parameters.values()), return_annotation=original_func_sign.return_annotation)
652        return wrapper
653    
654    ro = register_outro
655    
656    def register_last_result(self, result: Any) -> None:
657        last_call_state: CallState = self.call_stack[-1]
658        result_holder: ValueHolder = last_call_state.result_holder
659        if result_holder:
660            raise ResultAlreadyRegisteredError(f'For last call state: {last_call_state}')
661        
662        result_holder.value = result
663
664    rls = register_last_result
665    
666    def register_last_exception(self, result: Any) -> None:
667        last_call_state: CallState = self.call_stack[-1]
668        exception_holder: ValueHolder = last_call_state.result_holder
669        if exception_holder:
670            raise ExceptionAlreadyRegisteredError(f'For last call state: {last_call_state}')
671        
672        exception_holder.value = result
673
674    rle = register_last_exception
675
676    def try_to_load_expected_call_stack(self) -> None:
677        """
678        Will try to load a pickle file with a refference results (an expected_results)
679        """
680        if os.path.exists(self.content_full_file_name) and os.path.isfile(self.content_full_file_name):
681            with open(self.content_full_file_name, 'rb') as file:
682                self.expected_call_stack_per_test = pickle.load(file)
683                self.loaded = True
684    
685    def try_to_save_expected_call_stack(self) -> None:
686        """
687        Will try to save a refference results (an expected_results) to a pickle file. Will not save them if 
688        pickle file was already successfully loaded with a 'FakeResults.try_to_load_expected_call_stack()' call.
689        """
690        if self.loaded:
691            return
692        
693        if not (os.path.exists(self.content_full_file_name) and os.path.isfile(self.content_full_file_name)):
694            with open(self.content_full_file_name, 'wb') as file:
695                pickle.dump(self.call_stack_per_test, file)
696        
697        if not (os.path.exists(self.readable_content_full_file_name) and os.path.isfile(self.readable_content_full_file_name)):
698            with open(self.readable_content_full_file_name, 'wb') as file:
699                file.write(self.prepare_readable_content().encode('utf-8'))
700    
701    def prepare_readable_content(self) -> str:
702        content: str = str()
703        content += f'# Test Case: {self.name}\n'
704        for test_id, call_stack in self.call_stack_per_test.items():
705            content += f'\n\n## Test ID: {test_id}\n'
706            for index, call_state in enumerate(call_stack):
707                content += f'\n\t{index}: {call_state.entity.__name__}\n'
708                content += f'\t\t{call_state.params_with_values}\n'
709                content += f'\t\targs: {call_state.args}\n'
710                content += f'\t\tkwargs: {call_state.kwargs}\n'
711                if call_state.result_holder:
712                    content += f'\t\tresult: {call_state.result_holder.value}\n'
713                if call_state.exception_holder:
714                    content += f'\t\texception: {call_state.exception_holder.value}\n'
715        
716        return content
717    
718    def register(self):
719        """
720        Will register current instance to a global 'TEST_CASE_STATE' variable. Will save a previous value
721        """
722        self.try_to_load_expected_call_stack()
723
724        global TEST_CASE_STATE
725        self.old_global_fake_result = TEST_CASE_STATE
726        TEST_CASE_STATE = self
727    
728    def __enter__(self):
729        self.register()
730        return self
731
732    def unregister(self, should_be_saved: bool = True):
733        """
734        Will restore a previous value of the global 'TEST_CASE_STATE' variable
735        """
736        global TEST_CASE_STATE
737        TEST_CASE_STATE = self.old_global_fake_result
738
739        if should_be_saved:
740            self.try_to_save_expected_call_stack()
741
742    def __exit__(self, exc_type, exc_val, exc_tb):
743        self.unregister((exc_type is None) and (exc_val is None) and (exc_tb is None))
744        return False
745
746
747
748class StateBase:
749    def add(self, var_name: VarName, var_value: VarValue, stage_id: StageId = None):
750        raise NotImplementedError
751    
752    def add_set(self, vars: Dict[VarName, VarValue]):
753        raise NotImplementedError
754    
755    def check_var(self, var_name: VarName):
756        raise NotImplementedError
757    
758    def check_var_measurement(self, var_name: VarName, measurement_id_from: MeasurementId, measuremente_id_up_to: MeasurementId):
759        raise NotImplementedError
760    
761    def check_var_current_state(self, var_name: VarName):
762        raise NotImplementedError
763    
764    def check_all(self):
765        raise NotImplementedError
766    
767    def check_all_current_state(self):
768        raise NotImplementedError
class CallState(builtins.tuple):

CallState(entity, params_with_values, args, kwargs, result_holder, exception_holder)

CallState( entity, params_with_values, args, kwargs, result_holder, exception_holder)

Create new instance of CallState(entity, params_with_values, args, kwargs, result_holder, exception_holder)

entity

Alias for field number 0

params_with_values

Alias for field number 1

args

Alias for field number 2

kwargs

Alias for field number 3

result_holder

Alias for field number 4

exception_holder

Alias for field number 5

Inherited Members
builtins.tuple
index
count
class ResultAlreadyRegisteredError(builtins.Exception):
388class ResultAlreadyRegisteredError(Exception):
389    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ExceptionAlreadyRegisteredError(builtins.Exception):
392class ExceptionAlreadyRegisteredError(Exception):
393    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class CallStackIsNotEqualError(builtins.Exception):
396class CallStackIsNotEqualError(Exception):
397    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ExpectedTestCaseStateIsNotLoadedError(builtins.Exception):
400class ExpectedTestCaseStateIsNotLoadedError(Exception):
401    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
TEST_CASE_STATE: TestCaseState = None
def get_test_case_state() -> TestCaseState:
407def get_test_case_state() -> 'TestCaseState':
408    global TEST_CASE_STATE
409    if TEST_CASE_STATE is None:
410        TEST_CASE_STATE = TestCaseState('Default')
411    
412    return TEST_CASE_STATE
class TestCaseState:
415class TestCaseState:
416    def __init__(self, name: Optional[str] = None, raise_exceptions: bool = True, register: bool = True, depth: Optional[int] = 1):
417        if name is None:
418            parent_entity = find_current_entity(depth + 1)
419            parent_class = entity_class(parent_entity)
420            parent_class_name: str = parent_class.__name__
421
422        self.name: str = name or parent_class_name
423        self.raise_exceptions: bool = raise_exceptions
424        self._current_test_id: Hashable = None
425        self.call_stack_per_test: OrderedDict[Hashable, List[CallState]] = OrderedDict({
426            None: list()
427        })
428        self.expected_call_stack_per_test: OrderedDict[Hashable, List[CallState]] = OrderedDict({
429            None: list()
430        })
431        self.loaded: bool = False
432        self.content_file_name: str = f'{self.name}__test_case_state.pickle'
433        self.readable_content_file_name: str = f'{self.name}__test_case_state.md'
434        self.content_dir_rel: RelativePath = relative_to_src(depth + 1)
435        self.content_full_file_name: str = self.content_dir_rel(self.content_file_name)
436        self.readable_content_full_file_name: str = self.content_dir_rel(self.readable_content_file_name)
437        self.old_global_fake_result: 'TestCaseState' = cast('TestCaseState', None)
438        if register:
439            self.register()
440
441    @property
442    def current_test_id(self) -> Hashable:
443        return self._current_test_id
444    
445    @current_test_id.setter
446    def current_test_id(self, value: Hashable) -> None:
447        self._current_test_id = value
448        if value not in self.call_stack_per_test:
449            self.call_stack_per_test[value] = list()
450    
451    @property
452    def call_stack(self) -> List[CallState]:
453        return self.call_stack_per_test[self.current_test_id]
454    
455    @property
456    def expected_call_stack(self) -> List[CallState]:
457        try:
458            return self.expected_call_stack_per_test[self.current_test_id]
459        except KeyError:
460            return list()
461
462    def is_loaded(self) -> bool:
463        if not self.loaded:
464            raise ExpectedTestCaseStateIsNotLoadedError(f'Expected test case state is not loaded: {self.name}')
465    
466    def check_current_state_item(self):
467        self.is_loaded()
468        call_stack_item: CallState = self.call_stack[-1]
469        expected_call_stack_item: CallState = self.expected_call_stack[len(self.call_stack) - 1]
470        if call_stack_item != expected_call_stack_item:
471            raise CallStackIsNotEqualError(f'Expected call stack item: {expected_call_stack_item}\nCurrent call stack item: {call_stack_item}')
472    
473    def check_state_item(self, item_index: int):
474        self.is_loaded()
475        call_stack_item: CallState = self.call_stack[item_index]
476        expected_call_stack_item: CallState = self.expected_call_stack[item_index]
477        if call_stack_item != expected_call_stack_item:
478            raise CallStackIsNotEqualError(f'Index: {item_index}\nExpected call stack item: {expected_call_stack_item}\nActual call stack item: {call_stack_item}')
479    
480    def check_state_range(self, item_start_index: int, item_end_index: int):
481        self.is_loaded()
482        call_stack_part: List[CallState] = self.call_stack[item_start_index: item_end_index]
483        expected_call_stack_part: List[CallState] = self.expected_call_stack[item_start_index: item_end_index]
484        if call_stack_part != expected_call_stack_part:
485            raise CallStackIsNotEqualError(f'Index: [{item_start_index}:{item_end_index}]\nExpected call stack part: {expected_call_stack_part}\nActual call stack part: {call_stack_part}')
486    
487    def check_current_state(self):
488        self.is_loaded()
489        expected_call_stack_part: List[CallState] = self.expected_call_stack[:len(self.call_stack)]
490        if self.call_stack != expected_call_stack_part:
491            raise CallStackIsNotEqualError(f'Expected call stack: {expected_call_stack_part}\nActual call stack: {self.call_stack}')
492
493    def check_whole_state(self):
494        self.is_loaded()
495        if self.call_stack != self.expected_call_stack:
496            raise CallStackIsNotEqualError(f'Expected call stack: {self.expected_call_stack}\nActual call stack: {self.call_stack}')
497
498    def check_all_tests_state(self):
499        self.is_loaded()
500        if self.call_stack_per_test != self.expected_call_stack_per_test:
501            raise CallStackIsNotEqualError(f'Expected call stacks: {self.expected_call_stack_per_test}\nActual call stacks: {self.call_stack_per_test}')
502
503    def register_intro(self, raise_exceptions: Optional[bool] = None) -> ContextManager[ValueHolder]:
504        class RIContextManager:
505            def __init__(self, testcasestate: 'TestCaseState') -> None:
506                self.testcasestate: 'TestCaseState' = testcasestate
507                self.current_entity: Callable = None
508                self.code_params_with_values: CodeParamsWithValues = None
509                self.result_holder: ValueHolder = ValueHolder()
510                self.exception_holder: ValueHolder = ValueHolder()
511                self.args: Tuple = tuple()
512                self.kwargs: Dict = dict()
513            
514            def __enter__(self) -> ValueHolder:
515                self.current_entity: Callable = find_current_entity(2)
516                self.code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2)
517                self.args = self.code_params_with_values.positional + self.code_params_with_values.positional_only
518                self.args = tuple((arg[1] for arg in self.args))
519                self.kwargs = dict(self.code_params_with_values.keyword_only)
520                return self.result_holder
521            
522            def __exit__(self, exc_type, exc_val, exc_tb) -> None:
523                result = None
524                if exc_type is not None:
525                    self.exception_holder.value = exc_val.with_traceback(exc_tb)
526                    if raise_exceptions or ((raise_exceptions is None) and self.testcasestate.raise_exceptions):
527                        result = True
528
529                current_call_state: CallState = CallState(self.current_entity, self.code_params_with_values, self.args, self.kwargs, self.result_holder, self.exception_holder)
530                self.testcasestate.call_stack.append(current_call_state)
531                self.testcasestate.check_current_state_item()
532                return result
533        
534        return RIContextManager(self)
535
536        # def context_manager(self: 'TestCaseState'):
537        #     current_entity: Callable = find_current_entity(2)
538        #     code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2)
539        #     result_holder: ValueHolder = ValueHolder()
540        #     exception_holder: ValueHolder = ValueHolder()
541        #     try:
542        #         yield result_holder
543        #     except:
544        #         exception_holder.value = get_exception()
545        #         if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
546        #             raise
547        #     finally:
548        #         current_call_state: CallState = CallState(current_entity, code_params_with_values, result_holder, exception_holder)
549        #         self.call_stack.append(current_call_state)
550        #         self.check_current_state_item()
551        
552        # return context_manager(self)
553
554    # @contextmanager
555    # def register_intro(self, raise_exceptions: Optional[bool] = None) -> ContextManager[ValueHolder]:
556    #     current_entity: Callable = find_current_entity(2)
557    #     code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2)
558    #     result_holder: ValueHolder = ValueHolder()
559    #     exception_holder: ValueHolder = ValueHolder()
560    #     try:
561    #         yield result_holder
562    #     except:
563    #         exception_holder.value = get_exception()
564    #         if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
565    #             raise
566    #     finally:
567    #         current_call_state: CallState = CallState(current_entity, code_params_with_values, result_holder, exception_holder)
568    #         self.call_stack.append(current_call_state)
569    #         self.check_current_state_item()
570    
571    ri = register_intro
572    
573    def register_outro(self, func: Callable, raise_exceptions: Optional[bool] = None) -> Callable:
574        original_func: Callable = func
575        if hasattr(original_func, 'cr_frame'):
576            original_func = find_entity(original_func.cr_frame)
577
578        if is_async(func):
579            if inspect.isawaitable(func):
580                async def awaitable_wrapper() -> Any:
581                    current_entity: Awaitable = func
582                    if hasattr(current_entity, 'cr_frame'):
583                        # code_params_with_values: CodeParamsWithValues = intro_frame_params_with_values(current_entity.cr_frame)
584                        code_params_with_values, result_args, result_kwargs = func_params_with_values(find_entity(original_func.cr_frame), tuple(), dict())
585                        code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values)
586                    else:
587                        code_params_with_values: CodeParamsWithValues = CodeParamsWithValues()
588                        result_args = tuple()
589                        result_kwargs = dict()
590                    
591                    result_holder: ValueHolder = ValueHolder()
592                    exception_holder: ValueHolder = ValueHolder()
593                    try:
594                        result = await func
595                        result_holder.value = result
596                        return result
597                    except:
598                        exception_holder.value = get_exception()
599                        if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
600                            raise
601                    finally:
602                        current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder)
603                        self.call_stack.append(current_call_state)
604                        self.check_current_state_item()
605                
606                wrapper = awaitable_wrapper()
607            else:
608                async def async_wrapper(*args, **kwargs) -> Any:
609                    current_entity: Callable = func
610                    code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs)
611                    code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values)
612                    result_holder: ValueHolder = ValueHolder()
613                    exception_holder: ValueHolder = ValueHolder()
614                    try:
615                        result = await func(*args, **kwargs)
616                        result_holder.value = result
617                        return result
618                    except:
619                        exception_holder.value = get_exception()
620                        if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
621                            raise
622                    finally:
623                        current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder)
624                        self.call_stack.append(current_call_state)
625                        self.check_current_state_item()
626                
627                wrapper = async_wrapper
628        else:
629            def sync_wrapper(*args, **kwargs) -> Any:
630                current_entity: Callable = func
631                code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs)
632                code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values)
633                result_holder: ValueHolder = ValueHolder()
634                exception_holder: ValueHolder = ValueHolder()
635                try:
636                    result = func(*args, **kwargs)
637                    result_holder.value = result
638                    return result
639                except:
640                    exception_holder.value = get_exception()
641                    if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
642                        raise
643                finally:
644                    current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder)
645                    self.call_stack.append(current_call_state)
646                    self.check_current_state_item()
647            
648            wrapper = sync_wrapper
649        
650        original_func_sign: inspect.Signature = inspect.signature(original_func)
651        update_wrapper(wrapper, original_func)
652        wrapper.__signature__ = original_func_sign.replace(parameters=tuple(original_func_sign.parameters.values()), return_annotation=original_func_sign.return_annotation)
653        return wrapper
654    
655    ro = register_outro
656    
657    def register_last_result(self, result: Any) -> None:
658        last_call_state: CallState = self.call_stack[-1]
659        result_holder: ValueHolder = last_call_state.result_holder
660        if result_holder:
661            raise ResultAlreadyRegisteredError(f'For last call state: {last_call_state}')
662        
663        result_holder.value = result
664
665    rls = register_last_result
666    
667    def register_last_exception(self, result: Any) -> None:
668        last_call_state: CallState = self.call_stack[-1]
669        exception_holder: ValueHolder = last_call_state.result_holder
670        if exception_holder:
671            raise ExceptionAlreadyRegisteredError(f'For last call state: {last_call_state}')
672        
673        exception_holder.value = result
674
675    rle = register_last_exception
676
677    def try_to_load_expected_call_stack(self) -> None:
678        """
679        Will try to load a pickle file with a refference results (an expected_results)
680        """
681        if os.path.exists(self.content_full_file_name) and os.path.isfile(self.content_full_file_name):
682            with open(self.content_full_file_name, 'rb') as file:
683                self.expected_call_stack_per_test = pickle.load(file)
684                self.loaded = True
685    
686    def try_to_save_expected_call_stack(self) -> None:
687        """
688        Will try to save a refference results (an expected_results) to a pickle file. Will not save them if 
689        pickle file was already successfully loaded with a 'FakeResults.try_to_load_expected_call_stack()' call.
690        """
691        if self.loaded:
692            return
693        
694        if not (os.path.exists(self.content_full_file_name) and os.path.isfile(self.content_full_file_name)):
695            with open(self.content_full_file_name, 'wb') as file:
696                pickle.dump(self.call_stack_per_test, file)
697        
698        if not (os.path.exists(self.readable_content_full_file_name) and os.path.isfile(self.readable_content_full_file_name)):
699            with open(self.readable_content_full_file_name, 'wb') as file:
700                file.write(self.prepare_readable_content().encode('utf-8'))
701    
702    def prepare_readable_content(self) -> str:
703        content: str = str()
704        content += f'# Test Case: {self.name}\n'
705        for test_id, call_stack in self.call_stack_per_test.items():
706            content += f'\n\n## Test ID: {test_id}\n'
707            for index, call_state in enumerate(call_stack):
708                content += f'\n\t{index}: {call_state.entity.__name__}\n'
709                content += f'\t\t{call_state.params_with_values}\n'
710                content += f'\t\targs: {call_state.args}\n'
711                content += f'\t\tkwargs: {call_state.kwargs}\n'
712                if call_state.result_holder:
713                    content += f'\t\tresult: {call_state.result_holder.value}\n'
714                if call_state.exception_holder:
715                    content += f'\t\texception: {call_state.exception_holder.value}\n'
716        
717        return content
718    
719    def register(self):
720        """
721        Will register current instance to a global 'TEST_CASE_STATE' variable. Will save a previous value
722        """
723        self.try_to_load_expected_call_stack()
724
725        global TEST_CASE_STATE
726        self.old_global_fake_result = TEST_CASE_STATE
727        TEST_CASE_STATE = self
728    
729    def __enter__(self):
730        self.register()
731        return self
732
733    def unregister(self, should_be_saved: bool = True):
734        """
735        Will restore a previous value of the global 'TEST_CASE_STATE' variable
736        """
737        global TEST_CASE_STATE
738        TEST_CASE_STATE = self.old_global_fake_result
739
740        if should_be_saved:
741            self.try_to_save_expected_call_stack()
742
743    def __exit__(self, exc_type, exc_val, exc_tb):
744        self.unregister((exc_type is None) and (exc_val is None) and (exc_tb is None))
745        return False
TestCaseState( name: Union[str, NoneType] = None, raise_exceptions: bool = True, register: bool = True, depth: Union[int, NoneType] = 1)
416    def __init__(self, name: Optional[str] = None, raise_exceptions: bool = True, register: bool = True, depth: Optional[int] = 1):
417        if name is None:
418            parent_entity = find_current_entity(depth + 1)
419            parent_class = entity_class(parent_entity)
420            parent_class_name: str = parent_class.__name__
421
422        self.name: str = name or parent_class_name
423        self.raise_exceptions: bool = raise_exceptions
424        self._current_test_id: Hashable = None
425        self.call_stack_per_test: OrderedDict[Hashable, List[CallState]] = OrderedDict({
426            None: list()
427        })
428        self.expected_call_stack_per_test: OrderedDict[Hashable, List[CallState]] = OrderedDict({
429            None: list()
430        })
431        self.loaded: bool = False
432        self.content_file_name: str = f'{self.name}__test_case_state.pickle'
433        self.readable_content_file_name: str = f'{self.name}__test_case_state.md'
434        self.content_dir_rel: RelativePath = relative_to_src(depth + 1)
435        self.content_full_file_name: str = self.content_dir_rel(self.content_file_name)
436        self.readable_content_full_file_name: str = self.content_dir_rel(self.readable_content_file_name)
437        self.old_global_fake_result: 'TestCaseState' = cast('TestCaseState', None)
438        if register:
439            self.register()
name: str
raise_exceptions: bool
call_stack_per_test: 'OrderedDict[(Hashable, List[CallState])]'
expected_call_stack_per_test: 'OrderedDict[(Hashable, List[CallState])]'
loaded: bool
content_file_name: str
readable_content_file_name: str
content_dir_rel: cengal.file_system.path_manager.versions.v_0.path_manager.RelativePath
content_full_file_name: str
readable_content_full_file_name: str
old_global_fake_result: TestCaseState
current_test_id: Hashable
441    @property
442    def current_test_id(self) -> Hashable:
443        return self._current_test_id
call_stack: List[CallState]
451    @property
452    def call_stack(self) -> List[CallState]:
453        return self.call_stack_per_test[self.current_test_id]
expected_call_stack: List[CallState]
455    @property
456    def expected_call_stack(self) -> List[CallState]:
457        try:
458            return self.expected_call_stack_per_test[self.current_test_id]
459        except KeyError:
460            return list()
def is_loaded(self) -> bool:
462    def is_loaded(self) -> bool:
463        if not self.loaded:
464            raise ExpectedTestCaseStateIsNotLoadedError(f'Expected test case state is not loaded: {self.name}')
def check_current_state_item(self):
466    def check_current_state_item(self):
467        self.is_loaded()
468        call_stack_item: CallState = self.call_stack[-1]
469        expected_call_stack_item: CallState = self.expected_call_stack[len(self.call_stack) - 1]
470        if call_stack_item != expected_call_stack_item:
471            raise CallStackIsNotEqualError(f'Expected call stack item: {expected_call_stack_item}\nCurrent call stack item: {call_stack_item}')
def check_state_item(self, item_index: int):
473    def check_state_item(self, item_index: int):
474        self.is_loaded()
475        call_stack_item: CallState = self.call_stack[item_index]
476        expected_call_stack_item: CallState = self.expected_call_stack[item_index]
477        if call_stack_item != expected_call_stack_item:
478            raise CallStackIsNotEqualError(f'Index: {item_index}\nExpected call stack item: {expected_call_stack_item}\nActual call stack item: {call_stack_item}')
def check_state_range(self, item_start_index: int, item_end_index: int):
480    def check_state_range(self, item_start_index: int, item_end_index: int):
481        self.is_loaded()
482        call_stack_part: List[CallState] = self.call_stack[item_start_index: item_end_index]
483        expected_call_stack_part: List[CallState] = self.expected_call_stack[item_start_index: item_end_index]
484        if call_stack_part != expected_call_stack_part:
485            raise CallStackIsNotEqualError(f'Index: [{item_start_index}:{item_end_index}]\nExpected call stack part: {expected_call_stack_part}\nActual call stack part: {call_stack_part}')
def check_current_state(self):
487    def check_current_state(self):
488        self.is_loaded()
489        expected_call_stack_part: List[CallState] = self.expected_call_stack[:len(self.call_stack)]
490        if self.call_stack != expected_call_stack_part:
491            raise CallStackIsNotEqualError(f'Expected call stack: {expected_call_stack_part}\nActual call stack: {self.call_stack}')
def check_whole_state(self):
493    def check_whole_state(self):
494        self.is_loaded()
495        if self.call_stack != self.expected_call_stack:
496            raise CallStackIsNotEqualError(f'Expected call stack: {self.expected_call_stack}\nActual call stack: {self.call_stack}')
def check_all_tests_state(self):
498    def check_all_tests_state(self):
499        self.is_loaded()
500        if self.call_stack_per_test != self.expected_call_stack_per_test:
501            raise CallStackIsNotEqualError(f'Expected call stacks: {self.expected_call_stack_per_test}\nActual call stacks: {self.call_stack_per_test}')
def register_intro( self, raise_exceptions: Union[bool, NoneType] = None) -> AbstractContextManager[cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueHolder]:
503    def register_intro(self, raise_exceptions: Optional[bool] = None) -> ContextManager[ValueHolder]:
504        class RIContextManager:
505            def __init__(self, testcasestate: 'TestCaseState') -> None:
506                self.testcasestate: 'TestCaseState' = testcasestate
507                self.current_entity: Callable = None
508                self.code_params_with_values: CodeParamsWithValues = None
509                self.result_holder: ValueHolder = ValueHolder()
510                self.exception_holder: ValueHolder = ValueHolder()
511                self.args: Tuple = tuple()
512                self.kwargs: Dict = dict()
513            
514            def __enter__(self) -> ValueHolder:
515                self.current_entity: Callable = find_current_entity(2)
516                self.code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2)
517                self.args = self.code_params_with_values.positional + self.code_params_with_values.positional_only
518                self.args = tuple((arg[1] for arg in self.args))
519                self.kwargs = dict(self.code_params_with_values.keyword_only)
520                return self.result_holder
521            
522            def __exit__(self, exc_type, exc_val, exc_tb) -> None:
523                result = None
524                if exc_type is not None:
525                    self.exception_holder.value = exc_val.with_traceback(exc_tb)
526                    if raise_exceptions or ((raise_exceptions is None) and self.testcasestate.raise_exceptions):
527                        result = True
528
529                current_call_state: CallState = CallState(self.current_entity, self.code_params_with_values, self.args, self.kwargs, self.result_holder, self.exception_holder)
530                self.testcasestate.call_stack.append(current_call_state)
531                self.testcasestate.check_current_state_item()
532                return result
533        
534        return RIContextManager(self)
535
536        # def context_manager(self: 'TestCaseState'):
537        #     current_entity: Callable = find_current_entity(2)
538        #     code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2)
539        #     result_holder: ValueHolder = ValueHolder()
540        #     exception_holder: ValueHolder = ValueHolder()
541        #     try:
542        #         yield result_holder
543        #     except:
544        #         exception_holder.value = get_exception()
545        #         if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
546        #             raise
547        #     finally:
548        #         current_call_state: CallState = CallState(current_entity, code_params_with_values, result_holder, exception_holder)
549        #         self.call_stack.append(current_call_state)
550        #         self.check_current_state_item()
551        
552        # return context_manager(self)
def ri( self, raise_exceptions: Union[bool, NoneType] = None) -> AbstractContextManager[cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueHolder]:
503    def register_intro(self, raise_exceptions: Optional[bool] = None) -> ContextManager[ValueHolder]:
504        class RIContextManager:
505            def __init__(self, testcasestate: 'TestCaseState') -> None:
506                self.testcasestate: 'TestCaseState' = testcasestate
507                self.current_entity: Callable = None
508                self.code_params_with_values: CodeParamsWithValues = None
509                self.result_holder: ValueHolder = ValueHolder()
510                self.exception_holder: ValueHolder = ValueHolder()
511                self.args: Tuple = tuple()
512                self.kwargs: Dict = dict()
513            
514            def __enter__(self) -> ValueHolder:
515                self.current_entity: Callable = find_current_entity(2)
516                self.code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2)
517                self.args = self.code_params_with_values.positional + self.code_params_with_values.positional_only
518                self.args = tuple((arg[1] for arg in self.args))
519                self.kwargs = dict(self.code_params_with_values.keyword_only)
520                return self.result_holder
521            
522            def __exit__(self, exc_type, exc_val, exc_tb) -> None:
523                result = None
524                if exc_type is not None:
525                    self.exception_holder.value = exc_val.with_traceback(exc_tb)
526                    if raise_exceptions or ((raise_exceptions is None) and self.testcasestate.raise_exceptions):
527                        result = True
528
529                current_call_state: CallState = CallState(self.current_entity, self.code_params_with_values, self.args, self.kwargs, self.result_holder, self.exception_holder)
530                self.testcasestate.call_stack.append(current_call_state)
531                self.testcasestate.check_current_state_item()
532                return result
533        
534        return RIContextManager(self)
535
536        # def context_manager(self: 'TestCaseState'):
537        #     current_entity: Callable = find_current_entity(2)
538        #     code_params_with_values: CodeParamsWithValues = intro_func_params_with_values(2)
539        #     result_holder: ValueHolder = ValueHolder()
540        #     exception_holder: ValueHolder = ValueHolder()
541        #     try:
542        #         yield result_holder
543        #     except:
544        #         exception_holder.value = get_exception()
545        #         if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
546        #             raise
547        #     finally:
548        #         current_call_state: CallState = CallState(current_entity, code_params_with_values, result_holder, exception_holder)
549        #         self.call_stack.append(current_call_state)
550        #         self.check_current_state_item()
551        
552        # return context_manager(self)
def register_outro( self, func: Callable, raise_exceptions: Union[bool, NoneType] = None) -> Callable:
573    def register_outro(self, func: Callable, raise_exceptions: Optional[bool] = None) -> Callable:
574        original_func: Callable = func
575        if hasattr(original_func, 'cr_frame'):
576            original_func = find_entity(original_func.cr_frame)
577
578        if is_async(func):
579            if inspect.isawaitable(func):
580                async def awaitable_wrapper() -> Any:
581                    current_entity: Awaitable = func
582                    if hasattr(current_entity, 'cr_frame'):
583                        # code_params_with_values: CodeParamsWithValues = intro_frame_params_with_values(current_entity.cr_frame)
584                        code_params_with_values, result_args, result_kwargs = func_params_with_values(find_entity(original_func.cr_frame), tuple(), dict())
585                        code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values)
586                    else:
587                        code_params_with_values: CodeParamsWithValues = CodeParamsWithValues()
588                        result_args = tuple()
589                        result_kwargs = dict()
590                    
591                    result_holder: ValueHolder = ValueHolder()
592                    exception_holder: ValueHolder = ValueHolder()
593                    try:
594                        result = await func
595                        result_holder.value = result
596                        return result
597                    except:
598                        exception_holder.value = get_exception()
599                        if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
600                            raise
601                    finally:
602                        current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder)
603                        self.call_stack.append(current_call_state)
604                        self.check_current_state_item()
605                
606                wrapper = awaitable_wrapper()
607            else:
608                async def async_wrapper(*args, **kwargs) -> Any:
609                    current_entity: Callable = func
610                    code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs)
611                    code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values)
612                    result_holder: ValueHolder = ValueHolder()
613                    exception_holder: ValueHolder = ValueHolder()
614                    try:
615                        result = await func(*args, **kwargs)
616                        result_holder.value = result
617                        return result
618                    except:
619                        exception_holder.value = get_exception()
620                        if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
621                            raise
622                    finally:
623                        current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder)
624                        self.call_stack.append(current_call_state)
625                        self.check_current_state_item()
626                
627                wrapper = async_wrapper
628        else:
629            def sync_wrapper(*args, **kwargs) -> Any:
630                current_entity: Callable = func
631                code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs)
632                code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values)
633                result_holder: ValueHolder = ValueHolder()
634                exception_holder: ValueHolder = ValueHolder()
635                try:
636                    result = func(*args, **kwargs)
637                    result_holder.value = result
638                    return result
639                except:
640                    exception_holder.value = get_exception()
641                    if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
642                        raise
643                finally:
644                    current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder)
645                    self.call_stack.append(current_call_state)
646                    self.check_current_state_item()
647            
648            wrapper = sync_wrapper
649        
650        original_func_sign: inspect.Signature = inspect.signature(original_func)
651        update_wrapper(wrapper, original_func)
652        wrapper.__signature__ = original_func_sign.replace(parameters=tuple(original_func_sign.parameters.values()), return_annotation=original_func_sign.return_annotation)
653        return wrapper
def ro( self, func: Callable, raise_exceptions: Union[bool, NoneType] = None) -> Callable:
573    def register_outro(self, func: Callable, raise_exceptions: Optional[bool] = None) -> Callable:
574        original_func: Callable = func
575        if hasattr(original_func, 'cr_frame'):
576            original_func = find_entity(original_func.cr_frame)
577
578        if is_async(func):
579            if inspect.isawaitable(func):
580                async def awaitable_wrapper() -> Any:
581                    current_entity: Awaitable = func
582                    if hasattr(current_entity, 'cr_frame'):
583                        # code_params_with_values: CodeParamsWithValues = intro_frame_params_with_values(current_entity.cr_frame)
584                        code_params_with_values, result_args, result_kwargs = func_params_with_values(find_entity(original_func.cr_frame), tuple(), dict())
585                        code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values)
586                    else:
587                        code_params_with_values: CodeParamsWithValues = CodeParamsWithValues()
588                        result_args = tuple()
589                        result_kwargs = dict()
590                    
591                    result_holder: ValueHolder = ValueHolder()
592                    exception_holder: ValueHolder = ValueHolder()
593                    try:
594                        result = await func
595                        result_holder.value = result
596                        return result
597                    except:
598                        exception_holder.value = get_exception()
599                        if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
600                            raise
601                    finally:
602                        current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder)
603                        self.call_stack.append(current_call_state)
604                        self.check_current_state_item()
605                
606                wrapper = awaitable_wrapper()
607            else:
608                async def async_wrapper(*args, **kwargs) -> Any:
609                    current_entity: Callable = func
610                    code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs)
611                    code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values)
612                    result_holder: ValueHolder = ValueHolder()
613                    exception_holder: ValueHolder = ValueHolder()
614                    try:
615                        result = await func(*args, **kwargs)
616                        result_holder.value = result
617                        return result
618                    except:
619                        exception_holder.value = get_exception()
620                        if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
621                            raise
622                    finally:
623                        current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder)
624                        self.call_stack.append(current_call_state)
625                        self.check_current_state_item()
626                
627                wrapper = async_wrapper
628        else:
629            def sync_wrapper(*args, **kwargs) -> Any:
630                current_entity: Callable = func
631                code_params_with_values, result_args, result_kwargs = func_params_with_values(func, args, kwargs)
632                code_params_with_values: CodeParamsWithValues = cast(CodeParamsWithValues, code_params_with_values)
633                result_holder: ValueHolder = ValueHolder()
634                exception_holder: ValueHolder = ValueHolder()
635                try:
636                    result = func(*args, **kwargs)
637                    result_holder.value = result
638                    return result
639                except:
640                    exception_holder.value = get_exception()
641                    if raise_exceptions or ((raise_exceptions is None) and self.raise_exceptions):
642                        raise
643                finally:
644                    current_call_state: CallState = CallState(current_entity, code_params_with_values, result_args, result_kwargs, result_holder, exception_holder)
645                    self.call_stack.append(current_call_state)
646                    self.check_current_state_item()
647            
648            wrapper = sync_wrapper
649        
650        original_func_sign: inspect.Signature = inspect.signature(original_func)
651        update_wrapper(wrapper, original_func)
652        wrapper.__signature__ = original_func_sign.replace(parameters=tuple(original_func_sign.parameters.values()), return_annotation=original_func_sign.return_annotation)
653        return wrapper
def register_last_result(self, result: Any) -> None:
657    def register_last_result(self, result: Any) -> None:
658        last_call_state: CallState = self.call_stack[-1]
659        result_holder: ValueHolder = last_call_state.result_holder
660        if result_holder:
661            raise ResultAlreadyRegisteredError(f'For last call state: {last_call_state}')
662        
663        result_holder.value = result
def rls(self, result: Any) -> None:
657    def register_last_result(self, result: Any) -> None:
658        last_call_state: CallState = self.call_stack[-1]
659        result_holder: ValueHolder = last_call_state.result_holder
660        if result_holder:
661            raise ResultAlreadyRegisteredError(f'For last call state: {last_call_state}')
662        
663        result_holder.value = result
def register_last_exception(self, result: Any) -> None:
667    def register_last_exception(self, result: Any) -> None:
668        last_call_state: CallState = self.call_stack[-1]
669        exception_holder: ValueHolder = last_call_state.result_holder
670        if exception_holder:
671            raise ExceptionAlreadyRegisteredError(f'For last call state: {last_call_state}')
672        
673        exception_holder.value = result
def rle(self, result: Any) -> None:
667    def register_last_exception(self, result: Any) -> None:
668        last_call_state: CallState = self.call_stack[-1]
669        exception_holder: ValueHolder = last_call_state.result_holder
670        if exception_holder:
671            raise ExceptionAlreadyRegisteredError(f'For last call state: {last_call_state}')
672        
673        exception_holder.value = result
def try_to_load_expected_call_stack(self) -> None:
677    def try_to_load_expected_call_stack(self) -> None:
678        """
679        Will try to load a pickle file with a refference results (an expected_results)
680        """
681        if os.path.exists(self.content_full_file_name) and os.path.isfile(self.content_full_file_name):
682            with open(self.content_full_file_name, 'rb') as file:
683                self.expected_call_stack_per_test = pickle.load(file)
684                self.loaded = True

Will try to load a pickle file with a refference results (an expected_results)

def try_to_save_expected_call_stack(self) -> None:
686    def try_to_save_expected_call_stack(self) -> None:
687        """
688        Will try to save a refference results (an expected_results) to a pickle file. Will not save them if 
689        pickle file was already successfully loaded with a 'FakeResults.try_to_load_expected_call_stack()' call.
690        """
691        if self.loaded:
692            return
693        
694        if not (os.path.exists(self.content_full_file_name) and os.path.isfile(self.content_full_file_name)):
695            with open(self.content_full_file_name, 'wb') as file:
696                pickle.dump(self.call_stack_per_test, file)
697        
698        if not (os.path.exists(self.readable_content_full_file_name) and os.path.isfile(self.readable_content_full_file_name)):
699            with open(self.readable_content_full_file_name, 'wb') as file:
700                file.write(self.prepare_readable_content().encode('utf-8'))

Will try to save a refference results (an expected_results) to a pickle file. Will not save them if pickle file was already successfully loaded with a 'FakeResults.try_to_load_expected_call_stack()' call.

def prepare_readable_content(self) -> str:
702    def prepare_readable_content(self) -> str:
703        content: str = str()
704        content += f'# Test Case: {self.name}\n'
705        for test_id, call_stack in self.call_stack_per_test.items():
706            content += f'\n\n## Test ID: {test_id}\n'
707            for index, call_state in enumerate(call_stack):
708                content += f'\n\t{index}: {call_state.entity.__name__}\n'
709                content += f'\t\t{call_state.params_with_values}\n'
710                content += f'\t\targs: {call_state.args}\n'
711                content += f'\t\tkwargs: {call_state.kwargs}\n'
712                if call_state.result_holder:
713                    content += f'\t\tresult: {call_state.result_holder.value}\n'
714                if call_state.exception_holder:
715                    content += f'\t\texception: {call_state.exception_holder.value}\n'
716        
717        return content
def register(self):
719    def register(self):
720        """
721        Will register current instance to a global 'TEST_CASE_STATE' variable. Will save a previous value
722        """
723        self.try_to_load_expected_call_stack()
724
725        global TEST_CASE_STATE
726        self.old_global_fake_result = TEST_CASE_STATE
727        TEST_CASE_STATE = self

Will register current instance to a global 'TEST_CASE_STATE' variable. Will save a previous value

def unregister(self, should_be_saved: bool = True):
733    def unregister(self, should_be_saved: bool = True):
734        """
735        Will restore a previous value of the global 'TEST_CASE_STATE' variable
736        """
737        global TEST_CASE_STATE
738        TEST_CASE_STATE = self.old_global_fake_result
739
740        if should_be_saved:
741            self.try_to_save_expected_call_stack()

Will restore a previous value of the global 'TEST_CASE_STATE' variable