cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.bytecode_patcher
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37from typing import Callable, List 38from types import CodeType 39from cengal.code_flow_control.python_bytecode_manipulator import * 40from cengal.introspection.inspect import is_async, is_callable 41 42 43def gly_patch(entity: Callable) -> CodeType: 44 # TODO: try to complise strings using _get_code_object() instead of get_code() 45 entity_code_type: CodeTypeEnum = code_type(entity) 46 if entity_code_type is None: 47 raise RuntimeError('Entity {entity} cann not be patched') 48 49 fn_code = get_code(entity) 50 op_sequence, labels, op_by_label = OpSequence.from_bytecode_sequence(fn_code.co_code) 51 op_sequence: OpSequence = op_sequence 52 co_consts = list(fn_code.co_consts) 53 first_new_const_index: int = len(co_consts) 54 co_names = list(fn_code.co_names) 55 first_new_name_index: int = len(co_names) 56 co_varnames = list(fn_code.co_varnames) 57 first_new_varname_index: int = len(co_varnames) 58 co_nlocals = fn_code.co_nlocals 59 60 61 co_consts.extend((0, ('gly',))) 62 co_names.extend(('cengal.parallel_execution.coroutines.coro_standard_services.loop_yield', 'gly')) 63 co_varnames.extend(('gly', 'ly')) 64 co_nlocals = len(co_varnames) 65 66 initialization: List[Instruction] = [ 67 mi('LOAD_CONST', first_new_const_index + 0), 68 mi('LOAD_CONST', first_new_const_index + 1), 69 mi('IMPORT_NAME', first_new_name_index + 0), 70 mi('IMPORT_FROM', first_new_name_index + 1), 71 mi('STORE_FAST', first_new_varname_index + 0), 72 mi('POP_TOP'), 73 mi('LOAD_FAST', first_new_varname_index + 0), 74 mi('CALL_FUNCTION', 0), 75 mi('STORE_FAST', first_new_varname_index + 1), 76 ] 77 initialization_op_sequence, _, _ = OpSequence.from_instructions(initialization) 78 initialization_op_sequence: OpSequence = initialization_op_sequence 79 80 ly_call: List[Instruction] = [ 81 mi('LOAD_FAST', first_new_varname_index + 1), 82 mi('CALL_FUNCTION', 0), 83 mi('POP_TOP'), 84 ] 85 ly_call_op_sequence, _, _ = OpSequence.from_instructions(ly_call) 86 ly_call_op_sequence: OpSequence = ly_call_op_sequence 87 88 op_sequence.insert_op_sequence(0, initialization_op_sequence) 89 90 # for 91 FOR_ITER = opcode('FOR_ITER') 92 93 index = len(initialization) - 1 94 while True: 95 index += 1 96 if index >= len(op_sequence.op_sequence): 97 break 98 99 op, _, _, _, _, _ = op_sequence.op_sequence[index] 100 if FOR_ITER == op: 101 place_to_insert = index + 2 102 op_sequence.insert_op_sequence(place_to_insert, ly_call_op_sequence) 103 104 JUMP_ABSOLUTE = opcode('JUMP_ABSOLUTE') 105 106 index = len(initialization) - 1 107 while True: 108 index += 1 109 if index >= len(op_sequence.op_sequence): 110 break 111 112 op, _, _, _, _, _ = op_sequence.op_sequence[index] 113 arg = op_sequence.get_arg(index) 114 if JUMP_ABSOLUTE == op: 115 place_to_insert = op_sequence.op_sequence_offset_map.new_by_original[arg_to_op_index(arg)] 116 place_to_insert_op, place_to_insert_arg, _, _, _, _ = op_sequence.op_sequence[place_to_insert] 117 if FOR_ITER == place_to_insert_op: 118 continue 119 120 op_sequence.insert_op_sequence(index, ly_call_op_sequence) 121 index += len(ly_call_op_sequence) 122 123 fix_labels(op_sequence, op_by_label) 124 125 new_code_type = CodeType( 126 fn_code.co_argcount, 127 fn_code.co_posonlyargcount, 128 fn_code.co_kwonlyargcount, 129 co_nlocals, 130 fn_code.co_stacksize, 131 fn_code.co_flags, 132 op_sequence.to_bytes(), 133 tuple(co_consts), 134 tuple(co_names), 135 tuple(co_varnames), 136 fn_code.co_filename, 137 fn_code.co_name, 138 fn_code.co_firstlineno, 139 fn_code.co_lnotab, 140 fn_code.co_freevars, 141 fn_code.co_cellvars, 142 ) 143 set_code(entity, new_code_type) 144 return fn_code 145 146 147def agly_patch(entity: Callable) -> Callable: 148 # TODO: try to complise strings using _get_code_object() instead of get_code() 149 entity_code_type: CodeTypeEnum = code_type(entity) 150 if entity_code_type is None: 151 raise RuntimeError( 'Entity {entity} cann not be patched') 152 153 fn_code = get_code(entity) 154 return fn_code 155 156 157def gly_patched(func: Callable) -> Callable: 158 """Example: 159 @gly_patched 160 def func(): 161 for i in range(200): 162 print(i) 163 for j in range(100): 164 print(j) 165 166 i = 100 167 while i > 0: 168 print(i) 169 i -= 1 170 171 Is equivalent to: 172 from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly 173 def func(): 174 ly = gly() 175 for i in range(200): 176 print(i) 177 for j in range(100): 178 print(j) 179 ly() 180 181 ly() 182 183 i = 100 184 while i > 0: 185 print(i) 186 i -= 1 187 ly() 188 189 Args: 190 func (Callable): _description_ 191 192 Returns: 193 Callable: _description_ 194 """ 195 gly_patch(func) 196 return func 197 198 199glyp = gly_patched 200gp = gly_patched 201 202 203def agly_patched(func: Callable) -> Callable: 204 agly_patch(func) 205 return func 206 207 208aglyp = agly_patched 209agp = agly_patched 210 211 212class GlyPatchManagerError(Exception): 213 pass 214 215 216class GlyPatchManager: 217 patched_functions = dict() 218 call_tree = dict() 219 220 def __call__(self, func: Callable, tree: bool = True) -> Callable: 221 if func not in self.patched_functions: 222 if tree: 223 if is_async(func): 224 agly_patched_tree(func, self) 225 elif is_callable(func): 226 gly_patched_tree(func, self) 227 else: 228 raise GlyPatchManagerError(f'Entity {func} can not be patched: it must be a callable (either async or not)') 229 else: 230 if is_async(func): 231 fn_code = agly_patch(func) 232 elif is_callable(func): 233 fn_code = gly_patch(func) 234 else: 235 raise GlyPatchManagerError(f'Entity {func} can not be patched: it must be a callable (either async or not)') 236 237 self.patched_functions[func] = fn_code 238 239 return func 240 241 def restore(self, func: Callable, tree: bool = True) -> Callable: 242 if func in self.patched_functions: 243 if tree: 244 self._unpatch_tree(func) 245 else: 246 set_code(func, self.patched_functions[func]) 247 248 return func 249 250 def _unpatch_tree(self, func: Callable) -> Callable: 251 raise NotImplementedError 252 253 254def gly_patched_tree(entity: Callable, gly_patch_manager: Optional[GlyPatchManager] = None) -> Callable: 255 raise NotImplementedError 256 257 258def agly_patched_tree(entity: Callable, gly_patch_manager: Optional[GlyPatchManager] = None) -> Callable: 259 raise NotImplementedError
44def gly_patch(entity: Callable) -> CodeType: 45 # TODO: try to complise strings using _get_code_object() instead of get_code() 46 entity_code_type: CodeTypeEnum = code_type(entity) 47 if entity_code_type is None: 48 raise RuntimeError('Entity {entity} cann not be patched') 49 50 fn_code = get_code(entity) 51 op_sequence, labels, op_by_label = OpSequence.from_bytecode_sequence(fn_code.co_code) 52 op_sequence: OpSequence = op_sequence 53 co_consts = list(fn_code.co_consts) 54 first_new_const_index: int = len(co_consts) 55 co_names = list(fn_code.co_names) 56 first_new_name_index: int = len(co_names) 57 co_varnames = list(fn_code.co_varnames) 58 first_new_varname_index: int = len(co_varnames) 59 co_nlocals = fn_code.co_nlocals 60 61 62 co_consts.extend((0, ('gly',))) 63 co_names.extend(('cengal.parallel_execution.coroutines.coro_standard_services.loop_yield', 'gly')) 64 co_varnames.extend(('gly', 'ly')) 65 co_nlocals = len(co_varnames) 66 67 initialization: List[Instruction] = [ 68 mi('LOAD_CONST', first_new_const_index + 0), 69 mi('LOAD_CONST', first_new_const_index + 1), 70 mi('IMPORT_NAME', first_new_name_index + 0), 71 mi('IMPORT_FROM', first_new_name_index + 1), 72 mi('STORE_FAST', first_new_varname_index + 0), 73 mi('POP_TOP'), 74 mi('LOAD_FAST', first_new_varname_index + 0), 75 mi('CALL_FUNCTION', 0), 76 mi('STORE_FAST', first_new_varname_index + 1), 77 ] 78 initialization_op_sequence, _, _ = OpSequence.from_instructions(initialization) 79 initialization_op_sequence: OpSequence = initialization_op_sequence 80 81 ly_call: List[Instruction] = [ 82 mi('LOAD_FAST', first_new_varname_index + 1), 83 mi('CALL_FUNCTION', 0), 84 mi('POP_TOP'), 85 ] 86 ly_call_op_sequence, _, _ = OpSequence.from_instructions(ly_call) 87 ly_call_op_sequence: OpSequence = ly_call_op_sequence 88 89 op_sequence.insert_op_sequence(0, initialization_op_sequence) 90 91 # for 92 FOR_ITER = opcode('FOR_ITER') 93 94 index = len(initialization) - 1 95 while True: 96 index += 1 97 if index >= len(op_sequence.op_sequence): 98 break 99 100 op, _, _, _, _, _ = op_sequence.op_sequence[index] 101 if FOR_ITER == op: 102 place_to_insert = index + 2 103 op_sequence.insert_op_sequence(place_to_insert, ly_call_op_sequence) 104 105 JUMP_ABSOLUTE = opcode('JUMP_ABSOLUTE') 106 107 index = len(initialization) - 1 108 while True: 109 index += 1 110 if index >= len(op_sequence.op_sequence): 111 break 112 113 op, _, _, _, _, _ = op_sequence.op_sequence[index] 114 arg = op_sequence.get_arg(index) 115 if JUMP_ABSOLUTE == op: 116 place_to_insert = op_sequence.op_sequence_offset_map.new_by_original[arg_to_op_index(arg)] 117 place_to_insert_op, place_to_insert_arg, _, _, _, _ = op_sequence.op_sequence[place_to_insert] 118 if FOR_ITER == place_to_insert_op: 119 continue 120 121 op_sequence.insert_op_sequence(index, ly_call_op_sequence) 122 index += len(ly_call_op_sequence) 123 124 fix_labels(op_sequence, op_by_label) 125 126 new_code_type = CodeType( 127 fn_code.co_argcount, 128 fn_code.co_posonlyargcount, 129 fn_code.co_kwonlyargcount, 130 co_nlocals, 131 fn_code.co_stacksize, 132 fn_code.co_flags, 133 op_sequence.to_bytes(), 134 tuple(co_consts), 135 tuple(co_names), 136 tuple(co_varnames), 137 fn_code.co_filename, 138 fn_code.co_name, 139 fn_code.co_firstlineno, 140 fn_code.co_lnotab, 141 fn_code.co_freevars, 142 fn_code.co_cellvars, 143 ) 144 set_code(entity, new_code_type) 145 return fn_code
148def agly_patch(entity: Callable) -> Callable: 149 # TODO: try to complise strings using _get_code_object() instead of get_code() 150 entity_code_type: CodeTypeEnum = code_type(entity) 151 if entity_code_type is None: 152 raise RuntimeError( 'Entity {entity} cann not be patched') 153 154 fn_code = get_code(entity) 155 return fn_code
158def gly_patched(func: Callable) -> Callable: 159 """Example: 160 @gly_patched 161 def func(): 162 for i in range(200): 163 print(i) 164 for j in range(100): 165 print(j) 166 167 i = 100 168 while i > 0: 169 print(i) 170 i -= 1 171 172 Is equivalent to: 173 from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly 174 def func(): 175 ly = gly() 176 for i in range(200): 177 print(i) 178 for j in range(100): 179 print(j) 180 ly() 181 182 ly() 183 184 i = 100 185 while i > 0: 186 print(i) 187 i -= 1 188 ly() 189 190 Args: 191 func (Callable): _description_ 192 193 Returns: 194 Callable: _description_ 195 """ 196 gly_patch(func) 197 return func
Example: @gly_patched def func(): for i in range(200): print(i) for j in range(100): print(j)
i = 100
while i > 0:
print(i)
i -= 1
Is equivalent to: from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly def func(): ly = gly() for i in range(200): print(i) for j in range(100): print(j) ly()
ly()
i = 100
while i > 0:
print(i)
i -= 1
ly()
Args: func (Callable): _description_
Returns: Callable: _description_
158def gly_patched(func: Callable) -> Callable: 159 """Example: 160 @gly_patched 161 def func(): 162 for i in range(200): 163 print(i) 164 for j in range(100): 165 print(j) 166 167 i = 100 168 while i > 0: 169 print(i) 170 i -= 1 171 172 Is equivalent to: 173 from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly 174 def func(): 175 ly = gly() 176 for i in range(200): 177 print(i) 178 for j in range(100): 179 print(j) 180 ly() 181 182 ly() 183 184 i = 100 185 while i > 0: 186 print(i) 187 i -= 1 188 ly() 189 190 Args: 191 func (Callable): _description_ 192 193 Returns: 194 Callable: _description_ 195 """ 196 gly_patch(func) 197 return func
Example: @gly_patched def func(): for i in range(200): print(i) for j in range(100): print(j)
i = 100
while i > 0:
print(i)
i -= 1
Is equivalent to: from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly def func(): ly = gly() for i in range(200): print(i) for j in range(100): print(j) ly()
ly()
i = 100
while i > 0:
print(i)
i -= 1
ly()
Args: func (Callable): _description_
Returns: Callable: _description_
158def gly_patched(func: Callable) -> Callable: 159 """Example: 160 @gly_patched 161 def func(): 162 for i in range(200): 163 print(i) 164 for j in range(100): 165 print(j) 166 167 i = 100 168 while i > 0: 169 print(i) 170 i -= 1 171 172 Is equivalent to: 173 from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly 174 def func(): 175 ly = gly() 176 for i in range(200): 177 print(i) 178 for j in range(100): 179 print(j) 180 ly() 181 182 ly() 183 184 i = 100 185 while i > 0: 186 print(i) 187 i -= 1 188 ly() 189 190 Args: 191 func (Callable): _description_ 192 193 Returns: 194 Callable: _description_ 195 """ 196 gly_patch(func) 197 return func
Example: @gly_patched def func(): for i in range(200): print(i) for j in range(100): print(j)
i = 100
while i > 0:
print(i)
i -= 1
Is equivalent to: from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly def func(): ly = gly() for i in range(200): print(i) for j in range(100): print(j) ly()
ly()
i = 100
while i > 0:
print(i)
i -= 1
ly()
Args: func (Callable): _description_
Returns: Callable: _description_
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
217class GlyPatchManager: 218 patched_functions = dict() 219 call_tree = dict() 220 221 def __call__(self, func: Callable, tree: bool = True) -> Callable: 222 if func not in self.patched_functions: 223 if tree: 224 if is_async(func): 225 agly_patched_tree(func, self) 226 elif is_callable(func): 227 gly_patched_tree(func, self) 228 else: 229 raise GlyPatchManagerError(f'Entity {func} can not be patched: it must be a callable (either async or not)') 230 else: 231 if is_async(func): 232 fn_code = agly_patch(func) 233 elif is_callable(func): 234 fn_code = gly_patch(func) 235 else: 236 raise GlyPatchManagerError(f'Entity {func} can not be patched: it must be a callable (either async or not)') 237 238 self.patched_functions[func] = fn_code 239 240 return func 241 242 def restore(self, func: Callable, tree: bool = True) -> Callable: 243 if func in self.patched_functions: 244 if tree: 245 self._unpatch_tree(func) 246 else: 247 set_code(func, self.patched_functions[func]) 248 249 return func 250 251 def _unpatch_tree(self, func: Callable) -> Callable: 252 raise NotImplementedError