cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.bytecode_patcher

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37from typing import Callable, List
 38from types import CodeType
 39from cengal.code_flow_control.python_bytecode_manipulator import *
 40from cengal.introspection.inspect import is_async, is_callable
 41
 42
 43def gly_patch(entity: Callable) -> CodeType:
 44    # TODO: try to complise strings using _get_code_object() instead of get_code()
 45    entity_code_type: CodeTypeEnum = code_type(entity)
 46    if entity_code_type is None:
 47        raise RuntimeError('Entity {entity} cann not be patched')
 48    
 49    fn_code = get_code(entity)
 50    op_sequence, labels, op_by_label = OpSequence.from_bytecode_sequence(fn_code.co_code)
 51    op_sequence: OpSequence = op_sequence
 52    co_consts = list(fn_code.co_consts)
 53    first_new_const_index: int = len(co_consts)
 54    co_names = list(fn_code.co_names)
 55    first_new_name_index: int = len(co_names)
 56    co_varnames = list(fn_code.co_varnames)
 57    first_new_varname_index: int = len(co_varnames)
 58    co_nlocals = fn_code.co_nlocals
 59
 60
 61    co_consts.extend((0, ('gly',)))
 62    co_names.extend(('cengal.parallel_execution.coroutines.coro_standard_services.loop_yield', 'gly'))
 63    co_varnames.extend(('gly', 'ly'))
 64    co_nlocals = len(co_varnames)
 65
 66    initialization: List[Instruction] = [
 67        mi('LOAD_CONST', first_new_const_index + 0),
 68        mi('LOAD_CONST', first_new_const_index + 1),
 69        mi('IMPORT_NAME', first_new_name_index + 0),
 70        mi('IMPORT_FROM', first_new_name_index + 1),
 71        mi('STORE_FAST', first_new_varname_index + 0),
 72        mi('POP_TOP'),
 73        mi('LOAD_FAST', first_new_varname_index + 0),
 74        mi('CALL_FUNCTION', 0),
 75        mi('STORE_FAST', first_new_varname_index + 1),
 76    ]
 77    initialization_op_sequence, _, _ = OpSequence.from_instructions(initialization)
 78    initialization_op_sequence: OpSequence = initialization_op_sequence
 79
 80    ly_call: List[Instruction] = [
 81        mi('LOAD_FAST', first_new_varname_index + 1),
 82        mi('CALL_FUNCTION', 0),
 83        mi('POP_TOP'),
 84    ]
 85    ly_call_op_sequence, _, _ = OpSequence.from_instructions(ly_call)
 86    ly_call_op_sequence: OpSequence = ly_call_op_sequence
 87
 88    op_sequence.insert_op_sequence(0, initialization_op_sequence)
 89
 90    # for
 91    FOR_ITER = opcode('FOR_ITER')
 92
 93    index = len(initialization) - 1
 94    while True:
 95        index += 1
 96        if index >= len(op_sequence.op_sequence):
 97            break
 98
 99        op, _, _, _, _, _ = op_sequence.op_sequence[index]
100        if FOR_ITER == op:
101            place_to_insert = index + 2
102            op_sequence.insert_op_sequence(place_to_insert, ly_call_op_sequence)
103
104    JUMP_ABSOLUTE = opcode('JUMP_ABSOLUTE')
105
106    index = len(initialization) - 1
107    while True:
108        index += 1
109        if index >= len(op_sequence.op_sequence):
110            break
111
112        op, _, _, _, _, _ = op_sequence.op_sequence[index]
113        arg = op_sequence.get_arg(index)
114        if JUMP_ABSOLUTE == op:
115            place_to_insert = op_sequence.op_sequence_offset_map.new_by_original[arg_to_op_index(arg)]
116            place_to_insert_op, place_to_insert_arg, _, _, _, _ = op_sequence.op_sequence[place_to_insert]
117            if FOR_ITER == place_to_insert_op:
118                continue
119
120            op_sequence.insert_op_sequence(index, ly_call_op_sequence)
121            index += len(ly_call_op_sequence)
122    
123    fix_labels(op_sequence, op_by_label)
124
125    new_code_type = CodeType(
126        fn_code.co_argcount,
127        fn_code.co_posonlyargcount,
128        fn_code.co_kwonlyargcount,
129        co_nlocals,
130        fn_code.co_stacksize,
131        fn_code.co_flags,
132        op_sequence.to_bytes(),
133        tuple(co_consts),
134        tuple(co_names),
135        tuple(co_varnames),
136        fn_code.co_filename,
137        fn_code.co_name,
138        fn_code.co_firstlineno,
139        fn_code.co_lnotab,
140        fn_code.co_freevars,
141        fn_code.co_cellvars,
142    )
143    set_code(entity, new_code_type)
144    return fn_code
145
146
147def agly_patch(entity: Callable) -> Callable:
148    # TODO: try to complise strings using _get_code_object() instead of get_code()
149    entity_code_type: CodeTypeEnum = code_type(entity)
150    if entity_code_type is None:
151        raise RuntimeError( 'Entity {entity} cann not be patched')
152    
153    fn_code = get_code(entity)
154    return fn_code
155
156
157def gly_patched(func: Callable) -> Callable:
158    """Example:
159        @gly_patched
160        def func():
161            for i in range(200):
162                print(i)
163                for j in range(100):
164                    print(j)
165            
166            i = 100
167            while i > 0:
168                print(i)
169                i -= 1
170
171    Is equivalent to:
172        from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly
173        def func():
174            ly = gly()
175            for i in range(200):
176                print(i)
177                for j in range(100):
178                    print(j)
179                    ly()
180                
181                ly()
182            
183            i = 100
184            while i > 0:
185                print(i)
186                i -= 1
187                ly()
188
189    Args:
190        func (Callable): _description_
191
192    Returns:
193        Callable: _description_
194    """    
195    gly_patch(func)
196    return func
197
198
199glyp = gly_patched
200gp = gly_patched
201
202
203def agly_patched(func: Callable) -> Callable:
204    agly_patch(func)
205    return func
206
207
208aglyp = agly_patched
209agp = agly_patched
210
211
212class GlyPatchManagerError(Exception):
213    pass
214
215
216class GlyPatchManager:
217    patched_functions = dict()
218    call_tree = dict()
219
220    def __call__(self, func: Callable, tree: bool = True) -> Callable:
221        if func not in self.patched_functions:
222            if tree:
223                if is_async(func):
224                    agly_patched_tree(func, self)
225                elif is_callable(func):
226                    gly_patched_tree(func, self)
227                else:
228                    raise GlyPatchManagerError(f'Entity {func} can not be patched: it must be a callable (either async or not)')
229            else:
230                if is_async(func):
231                    fn_code = agly_patch(func)
232                elif is_callable(func):
233                    fn_code = gly_patch(func)
234                else:
235                    raise GlyPatchManagerError(f'Entity {func} can not be patched: it must be a callable (either async or not)')
236            
237                self.patched_functions[func] = fn_code
238        
239        return func
240
241    def restore(self, func: Callable, tree: bool = True) -> Callable:
242        if func in self.patched_functions:
243            if tree:
244                self._unpatch_tree(func)
245            else:
246                set_code(func, self.patched_functions[func])
247
248        return func
249    
250    def _unpatch_tree(self, func: Callable) -> Callable:
251        raise NotImplementedError
252
253
254def gly_patched_tree(entity: Callable, gly_patch_manager: Optional[GlyPatchManager] = None) -> Callable:
255    raise NotImplementedError
256
257
258def agly_patched_tree(entity: Callable, gly_patch_manager: Optional[GlyPatchManager] = None) -> Callable:
259    raise NotImplementedError
def gly_patch(entity: Callable) -> code:
 44def gly_patch(entity: Callable) -> CodeType:
 45    # TODO: try to complise strings using _get_code_object() instead of get_code()
 46    entity_code_type: CodeTypeEnum = code_type(entity)
 47    if entity_code_type is None:
 48        raise RuntimeError('Entity {entity} cann not be patched')
 49    
 50    fn_code = get_code(entity)
 51    op_sequence, labels, op_by_label = OpSequence.from_bytecode_sequence(fn_code.co_code)
 52    op_sequence: OpSequence = op_sequence
 53    co_consts = list(fn_code.co_consts)
 54    first_new_const_index: int = len(co_consts)
 55    co_names = list(fn_code.co_names)
 56    first_new_name_index: int = len(co_names)
 57    co_varnames = list(fn_code.co_varnames)
 58    first_new_varname_index: int = len(co_varnames)
 59    co_nlocals = fn_code.co_nlocals
 60
 61
 62    co_consts.extend((0, ('gly',)))
 63    co_names.extend(('cengal.parallel_execution.coroutines.coro_standard_services.loop_yield', 'gly'))
 64    co_varnames.extend(('gly', 'ly'))
 65    co_nlocals = len(co_varnames)
 66
 67    initialization: List[Instruction] = [
 68        mi('LOAD_CONST', first_new_const_index + 0),
 69        mi('LOAD_CONST', first_new_const_index + 1),
 70        mi('IMPORT_NAME', first_new_name_index + 0),
 71        mi('IMPORT_FROM', first_new_name_index + 1),
 72        mi('STORE_FAST', first_new_varname_index + 0),
 73        mi('POP_TOP'),
 74        mi('LOAD_FAST', first_new_varname_index + 0),
 75        mi('CALL_FUNCTION', 0),
 76        mi('STORE_FAST', first_new_varname_index + 1),
 77    ]
 78    initialization_op_sequence, _, _ = OpSequence.from_instructions(initialization)
 79    initialization_op_sequence: OpSequence = initialization_op_sequence
 80
 81    ly_call: List[Instruction] = [
 82        mi('LOAD_FAST', first_new_varname_index + 1),
 83        mi('CALL_FUNCTION', 0),
 84        mi('POP_TOP'),
 85    ]
 86    ly_call_op_sequence, _, _ = OpSequence.from_instructions(ly_call)
 87    ly_call_op_sequence: OpSequence = ly_call_op_sequence
 88
 89    op_sequence.insert_op_sequence(0, initialization_op_sequence)
 90
 91    # for
 92    FOR_ITER = opcode('FOR_ITER')
 93
 94    index = len(initialization) - 1
 95    while True:
 96        index += 1
 97        if index >= len(op_sequence.op_sequence):
 98            break
 99
100        op, _, _, _, _, _ = op_sequence.op_sequence[index]
101        if FOR_ITER == op:
102            place_to_insert = index + 2
103            op_sequence.insert_op_sequence(place_to_insert, ly_call_op_sequence)
104
105    JUMP_ABSOLUTE = opcode('JUMP_ABSOLUTE')
106
107    index = len(initialization) - 1
108    while True:
109        index += 1
110        if index >= len(op_sequence.op_sequence):
111            break
112
113        op, _, _, _, _, _ = op_sequence.op_sequence[index]
114        arg = op_sequence.get_arg(index)
115        if JUMP_ABSOLUTE == op:
116            place_to_insert = op_sequence.op_sequence_offset_map.new_by_original[arg_to_op_index(arg)]
117            place_to_insert_op, place_to_insert_arg, _, _, _, _ = op_sequence.op_sequence[place_to_insert]
118            if FOR_ITER == place_to_insert_op:
119                continue
120
121            op_sequence.insert_op_sequence(index, ly_call_op_sequence)
122            index += len(ly_call_op_sequence)
123    
124    fix_labels(op_sequence, op_by_label)
125
126    new_code_type = CodeType(
127        fn_code.co_argcount,
128        fn_code.co_posonlyargcount,
129        fn_code.co_kwonlyargcount,
130        co_nlocals,
131        fn_code.co_stacksize,
132        fn_code.co_flags,
133        op_sequence.to_bytes(),
134        tuple(co_consts),
135        tuple(co_names),
136        tuple(co_varnames),
137        fn_code.co_filename,
138        fn_code.co_name,
139        fn_code.co_firstlineno,
140        fn_code.co_lnotab,
141        fn_code.co_freevars,
142        fn_code.co_cellvars,
143    )
144    set_code(entity, new_code_type)
145    return fn_code
def agly_patch(entity: Callable) -> Callable:
148def agly_patch(entity: Callable) -> Callable:
149    # TODO: try to complise strings using _get_code_object() instead of get_code()
150    entity_code_type: CodeTypeEnum = code_type(entity)
151    if entity_code_type is None:
152        raise RuntimeError( 'Entity {entity} cann not be patched')
153    
154    fn_code = get_code(entity)
155    return fn_code
def gly_patched(func: Callable) -> Callable:
158def gly_patched(func: Callable) -> Callable:
159    """Example:
160        @gly_patched
161        def func():
162            for i in range(200):
163                print(i)
164                for j in range(100):
165                    print(j)
166            
167            i = 100
168            while i > 0:
169                print(i)
170                i -= 1
171
172    Is equivalent to:
173        from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly
174        def func():
175            ly = gly()
176            for i in range(200):
177                print(i)
178                for j in range(100):
179                    print(j)
180                    ly()
181                
182                ly()
183            
184            i = 100
185            while i > 0:
186                print(i)
187                i -= 1
188                ly()
189
190    Args:
191        func (Callable): _description_
192
193    Returns:
194        Callable: _description_
195    """    
196    gly_patch(func)
197    return func

Example: @gly_patched def func(): for i in range(200): print(i) for j in range(100): print(j)

    i = 100
    while i > 0:
        print(i)
        i -= 1

Is equivalent to: from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly def func(): ly = gly() for i in range(200): print(i) for j in range(100): print(j) ly()

        ly()

    i = 100
    while i > 0:
        print(i)
        i -= 1
        ly()

Args: func (Callable): _description_

Returns: Callable: _description_

def glyp(func: Callable) -> Callable:
158def gly_patched(func: Callable) -> Callable:
159    """Example:
160        @gly_patched
161        def func():
162            for i in range(200):
163                print(i)
164                for j in range(100):
165                    print(j)
166            
167            i = 100
168            while i > 0:
169                print(i)
170                i -= 1
171
172    Is equivalent to:
173        from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly
174        def func():
175            ly = gly()
176            for i in range(200):
177                print(i)
178                for j in range(100):
179                    print(j)
180                    ly()
181                
182                ly()
183            
184            i = 100
185            while i > 0:
186                print(i)
187                i -= 1
188                ly()
189
190    Args:
191        func (Callable): _description_
192
193    Returns:
194        Callable: _description_
195    """    
196    gly_patch(func)
197    return func

Example: @gly_patched def func(): for i in range(200): print(i) for j in range(100): print(j)

    i = 100
    while i > 0:
        print(i)
        i -= 1

Is equivalent to: from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly def func(): ly = gly() for i in range(200): print(i) for j in range(100): print(j) ly()

        ly()

    i = 100
    while i > 0:
        print(i)
        i -= 1
        ly()

Args: func (Callable): _description_

Returns: Callable: _description_

def gp(func: Callable) -> Callable:
158def gly_patched(func: Callable) -> Callable:
159    """Example:
160        @gly_patched
161        def func():
162            for i in range(200):
163                print(i)
164                for j in range(100):
165                    print(j)
166            
167            i = 100
168            while i > 0:
169                print(i)
170                i -= 1
171
172    Is equivalent to:
173        from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly
174        def func():
175            ly = gly()
176            for i in range(200):
177                print(i)
178                for j in range(100):
179                    print(j)
180                    ly()
181                
182                ly()
183            
184            i = 100
185            while i > 0:
186                print(i)
187                i -= 1
188                ly()
189
190    Args:
191        func (Callable): _description_
192
193    Returns:
194        Callable: _description_
195    """    
196    gly_patch(func)
197    return func

Example: @gly_patched def func(): for i in range(200): print(i) for j in range(100): print(j)

    i = 100
    while i > 0:
        print(i)
        i -= 1

Is equivalent to: from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly def func(): ly = gly() for i in range(200): print(i) for j in range(100): print(j) ly()

        ly()

    i = 100
    while i > 0:
        print(i)
        i -= 1
        ly()

Args: func (Callable): _description_

Returns: Callable: _description_

def agly_patched(func: Callable) -> Callable:
204def agly_patched(func: Callable) -> Callable:
205    agly_patch(func)
206    return func
def aglyp(func: Callable) -> Callable:
204def agly_patched(func: Callable) -> Callable:
205    agly_patch(func)
206    return func
def agp(func: Callable) -> Callable:
204def agly_patched(func: Callable) -> Callable:
205    agly_patch(func)
206    return func
class GlyPatchManagerError(builtins.Exception):
213class GlyPatchManagerError(Exception):
214    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class GlyPatchManager:
217class GlyPatchManager:
218    patched_functions = dict()
219    call_tree = dict()
220
221    def __call__(self, func: Callable, tree: bool = True) -> Callable:
222        if func not in self.patched_functions:
223            if tree:
224                if is_async(func):
225                    agly_patched_tree(func, self)
226                elif is_callable(func):
227                    gly_patched_tree(func, self)
228                else:
229                    raise GlyPatchManagerError(f'Entity {func} can not be patched: it must be a callable (either async or not)')
230            else:
231                if is_async(func):
232                    fn_code = agly_patch(func)
233                elif is_callable(func):
234                    fn_code = gly_patch(func)
235                else:
236                    raise GlyPatchManagerError(f'Entity {func} can not be patched: it must be a callable (either async or not)')
237            
238                self.patched_functions[func] = fn_code
239        
240        return func
241
242    def restore(self, func: Callable, tree: bool = True) -> Callable:
243        if func in self.patched_functions:
244            if tree:
245                self._unpatch_tree(func)
246            else:
247                set_code(func, self.patched_functions[func])
248
249        return func
250    
251    def _unpatch_tree(self, func: Callable) -> Callable:
252        raise NotImplementedError
patched_functions = {}
call_tree = {}
def restore(self, func: Callable, tree: bool = True) -> Callable:
242    def restore(self, func: Callable, tree: bool = True) -> Callable:
243        if func in self.patched_functions:
244            if tree:
245                self._unpatch_tree(func)
246            else:
247                set_code(func, self.patched_functions[func])
248
249        return func
def gly_patched_tree( entity: Callable, gly_patch_manager: Union[GlyPatchManager, NoneType] = None) -> Callable:
255def gly_patched_tree(entity: Callable, gly_patch_manager: Optional[GlyPatchManager] = None) -> Callable:
256    raise NotImplementedError
def agly_patched_tree( entity: Callable, gly_patch_manager: Union[GlyPatchManager, NoneType] = None) -> Callable:
259def agly_patched_tree(entity: Callable, gly_patch_manager: Optional[GlyPatchManager] = None) -> Callable:
260    raise NotImplementedError