cengal.code_flow_control.python_bytecode_manipulator.versions.v_0.python_bytecode_manipulator
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19from collections import namedtuple 20from dis import Instruction, dis, get_instructions, _get_code_object, code_info, show_code, findlabels, _unpack_opargs 21from opcode import hasjrel, hasjabs, opname, opmap, HAVE_ARGUMENT, EXTENDED_ARG 22from cengal.system import PYTHON_VERSION_INT 23from enum import Enum 24from typing import Optional, Callable, List, Tuple, Dict, Set, Union, Any, Sequence 25from types import CodeType 26from copy import copy 27from cengal.entities.copyable import CopyableMixin 28from cengal.data_manipulation.front_triggerable_variable import FrontTriggerableVariableType, FrontTriggerableVariable 29import sys 30 31 32""" 33Module Docstring 34Docstrings: http://www.python.org/dev/peps/pep-0257/ 35""" 36 37__author__ = "ButenkoMS <gtalk@butenkoms.space>" 38__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 39__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 40__license__ = "Apache License, Version 2.0" 41__version__ = "4.4.1" 42__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 43__email__ = "gtalk@butenkoms.space" 44# __status__ = "Prototype" 45__status__ = "Development" 46# __status__ = "Production" 47 48 49BytecodeSequence = Union[bytes, Sequence[Union[int, bytes]]] 50 51 52def patch_function(func, co_code): 53 fn_code = func.__code__ 54 func.__code__ = CodeType( 55 fn_code.co_argcount, 56 fn_code.co_kwonlyargcount, 57 fn_code.co_nlocals, 58 fn_code.co_stacksize, 59 fn_code.co_flags, 60 co_code, 61 fn_code.co_consts, 62 fn_code.co_names, 63 fn_code.co_varnames, 64 fn_code.co_filename, 65 fn_code.co_name, 66 fn_code.co_firstlineno, 67 fn_code.co_lnotab, 68 fn_code.co_freevars, 69 fn_code.co_cellvars, 70 ) 71 72 73CodeParamNames = namedtuple("CodeParamNames", "positional positional_only keyword_only") 74 75 76def code_param_names(code) -> CodeParamNames: 77 pos_count = code.co_argcount 78 arg_names = code.co_varnames 79 positional = arg_names[:pos_count] 80 posonly_count = code.co_posonlyargcount 81 positional_only = arg_names[:posonly_count] 82 keyword_only_count = code.co_kwonlyargcount 83 keyword_only = arg_names[pos_count:pos_count + keyword_only_count] 84 return CodeParamNames(positional, positional_only, keyword_only) 85 86 87def code_name(code: CodeType) -> str: 88 return code.co_name 89 90 91if sys.version_info >= (3, 11): 92 def code_qualname(code: CodeType) -> str: 93 return code.co_qualname 94 95 96def get_code(x = None) -> CodeType: 97 if x is None: 98 return 99 100 if isinstance(x, CodeType): 101 return x 102 103 # Extract functions from methods. 104 if hasattr(x, '__func__'): 105 x = x.__func__ 106 # Extract compiled code objects from... 107 if hasattr(x, '__code__'): # ...a function, or 108 x = x.__code__ 109 elif hasattr(x, 'gi_code'): #...a generator object, or 110 x = x.gi_code 111 elif hasattr(x, 'ag_code'): #...an asynchronous generator object, or 112 x = x.ag_code 113 elif hasattr(x, 'cr_code'): #...a coroutine, or 114 x = x.cr_code 115 elif hasattr(x, 'f_code'): #...a frame. 116 x = x.f_code 117 # else: 118 # raise TypeError(f'Expected a code object or an entity with code, but got {type(x)}') 119 120 return x 121 122 123def has_code(x = None) -> bool: 124 if x is None: 125 return False 126 127 if isinstance(x, CodeType): 128 return True 129 130 # Extract functions from methods. 131 if hasattr(x, '__func__'): 132 return True 133 # Extract compiled code objects from... 134 if hasattr(x, '__code__'): # ...a function, or 135 return True 136 elif hasattr(x, 'gi_code'): #...a generator object, or 137 return True 138 elif hasattr(x, 'ag_code'): #...an asynchronous generator object, or 139 return True 140 elif hasattr(x, 'cr_code'): #...a coroutine, or 141 return True 142 elif hasattr(x, 'f_code'): #...a frame. 143 return True 144 # else: 145 # raise TypeError(f'Expected a code object or an entity with code, but got {type(x)}') 146 147 return False 148 149 150class CodeTypeEnum(Enum): 151 class_or_module = 0 152 code_object = 1 153 raw_bytecode = 2 154 source_code = 3 155 unknown = 4 156 157 158def code_type(x=None) -> Optional[CodeTypeEnum]: 159 """_summary_ 160 161 Args: 162 x (_type_, optional): result of get_code() function. Defaults to None. 163 164 Returns: 165 Optional[CodeTypeEnum]: _description_ 166 """ 167 if x is None: 168 return 169 170 # Perform the disassembly. 171 if hasattr(x, '__dict__'): # Class or module 172 return CodeTypeEnum.class_or_module 173 elif hasattr(x, 'co_code'): # Code object 174 return CodeTypeEnum.code_object 175 elif isinstance(x, (bytes, bytearray)): # Raw bytecode 176 return CodeTypeEnum.raw_bytecode 177 elif isinstance(x, str): # Source code 178 return CodeTypeEnum.source_code 179 else: 180 return CodeTypeEnum.unknown 181 182 183def set_code(x, code: CodeType): 184 # Extract functions from methods. 185 if hasattr(x, '__func__'): 186 x.__func__ = code 187 # Extract compiled code objects from... 188 if hasattr(x, '__code__'): # ...a function, or 189 x.__code__ = code 190 elif hasattr(x, 'gi_code'): #...a generator object, or 191 x.gi_code = code 192 elif hasattr(x, 'ag_code'): #...an asynchronous generator object, or 193 x.ag_code = code 194 elif hasattr(x, 'cr_code'): #...a coroutine. 195 x.cr_code = code 196 197 return x 198 199 200if sys.version_info < (3, 11): 201# if (3, 11) > PYTHON_VERSION_INT[:1]: 202 def modify_code(original_code: CodeType, co_code, co_consts, co_names, co_varnames): 203 co_nlocals = len(co_varnames) 204 return CodeType( 205 original_code.co_argcount, 206 original_code.co_posonlyargcount, 207 original_code.co_kwonlyargcount, 208 co_nlocals, 209 original_code.co_stacksize, 210 original_code.co_flags, 211 co_code, 212 tuple(co_consts), 213 tuple(co_names), 214 tuple(co_varnames), 215 original_code.co_filename, 216 original_code.co_name, 217 original_code.co_firstlineno, 218 original_code.co_lnotab, 219 original_code.co_freevars, 220 original_code.co_cellvars, 221 ) 222 223 224 # def unpack_opargs_original(code, denormalize_values: bool = False): 225 # ftv: FrontTriggerableVariable = FrontTriggerableVariable(FrontTriggerableVariableType.equal, EXTENDED_ARG) 226 # extended_arg = 0 227 # real_op_index: Optional[int] = None 228 # real_byte_index: Optional[int] = None 229 # need_to_clear_real_data: bool = False 230 231 # op_index = -1 232 # for i in range(0, len(code), 2): 233 # op_index += 1 234 # op = code[i] 235 # if op >= HAVE_ARGUMENT: 236 # ftv_result: Optional[bool] = ftv(op) 237 # if ftv_result is True: 238 # real_op_index = op_index 239 # real_byte_index = i 240 # elif ftv_result is False: 241 # need_to_clear_real_data = True 242 243 # arg = code[i+1] | extended_arg 244 # if denormalize_values: 245 # extended_arg = (arg << 8) if op == EXTENDED_ARG else 0 246 # else: 247 # arg = None 248 249 # yield (op, arg, op_index, i, real_op_index, real_byte_index) 250 # if need_to_clear_real_data: 251 # real_op_index = None 252 # real_byte_index = None 253 # need_to_clear_real_data = False 254 255 def unpack_opargs(code: BytecodeSequence, denormalize_values: bool = False): 256 ftv: FrontTriggerableVariable = FrontTriggerableVariable(FrontTriggerableVariableType.equal, EXTENDED_ARG) 257 extended_arg = 0 258 real_op_index: Optional[int] = None 259 real_offset: Optional[int] = None 260 need_to_clear_real_data: bool = False 261 262 op = None 263 arg = None 264 offset = -1 265 op_index = -1 266 i = -1 267 for item in code: 268 if isinstance(item, bytes): 269 item = int.from_bytes(item, byteorder='little') 270 271 i += 1 272 if not i % 2: 273 op = item 274 op_index += 1 275 offset = i 276 continue 277 278 if op >= HAVE_ARGUMENT: 279 ftv_result: Optional[bool] = ftv(op) 280 if ftv_result is True: 281 real_op_index = op_index 282 real_offset = offset 283 elif ftv_result is False: 284 need_to_clear_real_data = True 285 286 arg = item | extended_arg 287 if denormalize_values: 288 extended_arg = (arg << 8) if op == EXTENDED_ARG else 0 289 else: 290 arg = None 291 292 yield (op, arg, op_index, offset, real_op_index, real_offset) 293 if need_to_clear_real_data: 294 real_op_index = None 295 real_offset = None 296 need_to_clear_real_data = False 297 298 def find_ops_with_labels(code: BytecodeSequence) -> Tuple[List[int], Dict[int, List[Tuple[int, int, int, int]]]]: 299 labels: List[int] = list() 300 op_by_label: Dict[int, List[Tuple[int, int, int, int]]] = dict() 301 for op, arg, op_index, offset, real_op_index, real_offset in unpack_opargs(code, True): 302 if arg is not None: 303 if op in hasjrel: 304 label = offset + 2 + arg 305 elif op in hasjabs: 306 label = arg 307 else: 308 continue 309 310 if label not in op_by_label: 311 labels.append(label) 312 op_by_label[label] = list() 313 314 op_by_label[label].append((op, arg, op_index, offset, real_op_index, real_offset)) 315 316 return labels, op_by_label 317 318 319 def op_index_to_arg(op_index: int) -> int: 320 return op_index * 2 321 322 323 def arg_to_op_index(arg: int) -> int: 324 return arg // 2 325 326elif sys.version_info == (3, 11): 327 from dis import _is_backward_jump 328 329 def find_ops_with_labels(code): 330 labels = list() 331 op_by_label = dict() 332 index = -1 333 for offset, op, arg in _unpack_opargs(code): 334 index += 1 335 if arg is not None: 336 if op in hasjrel: 337 if _is_backward_jump(op): # inefficient implementation of the _is_backward_jump() function. Need to use set or list instead of string manipulations 338 arg = -arg 339 340 label = offset + 2 + arg*2 # In 3.10: this is an instruction index - not a byte index. Need to be fixed and tested 341 elif op in hasjabs: 342 label = arg*2 343 else: 344 continue 345 346 if label not in op_by_label: 347 labels.append(label) 348 op_by_label[label] = list() 349 350 op_by_label[label].append((index, offset, op, arg)) 351 352 return labels, op_by_label 353else: 354 import warnings 355 warnings.warn(f'Unsupported Python Version: {PYTHON_VERSION_INT}') 356 # raise RuntimeError(f'Unsupported Python Version: {PYTHON_VERSION_INT}') 357 358 359class OpSequenceOffsetMap(CopyableMixin): 360 def __init__(self, op_num: int = 0): 361 self.op_num: int = op_num 362 self.range: Optional[slice] = slice(0, op_num) if op_num else None 363 self.new_by_original: Dict[int, int] = dict() 364 self.original_by_new: Dict[int, int] = dict() 365 if op_num < 0: 366 op_num = 0 367 368 for index in range(op_num): 369 # bindex = index * 2 370 bindex = index 371 self.new_by_original[bindex] = bindex 372 self.original_by_new[bindex] = bindex 373 374 def mapped_range(self) -> slice: 375 return slice(min(self.original_by_new), max(self.original_by_new)) 376 377 378 def remove_slice(self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0): 379 if not op_num: 380 return 381 382 offset_change_start = op_index 383 del_start = op_index 384 del_stop = op_index + op_num 385 386 new_by_original_buff = self.new_by_original 387 self.new_by_original = type(self.new_by_original)() 388 to_be_deleted: List[Tuple[int, int]] = list() 389 for original, new in new_by_original_buff.items(): 390 if del_start <= new < del_stop: 391 to_be_deleted.append((original, new)) 392 393 to_be_deleted.sort(key=lambda x: x[1]) 394 if preserver_index_for_first_x_op: 395 to_be_deleted = to_be_deleted[preserver_index_for_first_x_op:] 396 397 for original, new in to_be_deleted: 398 del self.new_by_original[original] 399 400 original_by_new_buff = self.original_by_new 401 self.original_by_new = type(self.original_by_new)() 402 to_be_deleted = list() 403 for new, original in original_by_new_buff.items(): 404 if del_start <= new < del_stop: 405 to_be_deleted.append((new, original)) 406 407 to_be_deleted.sort(key=lambda x: x[0]) 408 if preserver_index_for_first_x_op: 409 ignored = to_be_deleted[:preserver_index_for_first_x_op] 410 if ignored: 411 last_ignored = ignored[-1] 412 last_ignored_new = last_ignored[0] 413 offset_change_start = last_ignored_new + 1 414 415 to_be_deleted = to_be_deleted[preserver_index_for_first_x_op:] 416 417 for new, original in to_be_deleted: 418 del self.original_by_new[new] 419 420 self.add_offset(-op_num, offset_change_start) 421 self.op_num -= op_num 422 self.range = slice(self.range.start, self.range.stop - op_num) 423 424 def insert_slice(self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0): 425 if not op_num: 426 return 427 428 self.add_offset(op_num, op_index + preserver_index_for_first_x_op) 429 self.op_num += op_num 430 self.range = slice(self.range.start, self.range.stop + op_num) 431 432 def insert_op_sequence_offset(self, op_index: int, op_sequence_offset_map: 'OpSequenceOffsetMap', insertion_id: Any, preserver_index_for_first_x_op: int = 0): 433 op_num: int = op_sequence_offset_map.op_num 434 if not op_num: 435 return 436 437 sequence_range: slice = op_sequence_offset_map.range 438 offset = op_index - sequence_range.start 439 self.insert_slice(op_index, op_num, preserver_index_for_first_x_op) 440 for new, original in op_sequence_offset_map.original_by_new.items(): 441 self.original_by_new[new + offset] = (insertion_id, new) 442 443 def add_offset(self, op_offset: int, op_start: int = 0): 444 if not op_offset: 445 return 446 447 new_by_original_buff = self.new_by_original 448 self.new_by_original = type(self.new_by_original)() 449 for original, new in new_by_original_buff.items(): 450 if new >= op_start: 451 new += op_offset 452 453 self.new_by_original[original] = new 454 455 original_by_new_buff = self.original_by_new 456 self.original_by_new = type(self.original_by_new)() 457 for new, original in original_by_new_buff.items(): 458 if new >= op_start: 459 new += op_offset 460 461 self.original_by_new[new] = original 462 463 def shift(self, op_offset: int): 464 self.add_offset(op_offset) 465 self.range = slice(self.range.start + op_offset, self.range.stop + op_offset) 466 467 def shift_to_absolute(self, op_index: int): 468 op_offset: int = op_index - self.range.start 469 self.add_offset(op_offset) 470 self.range = slice(self.range.start + op_offset, self.range.stop + op_offset) 471 472 def update(self, op_sequence_offset_map: 'OpSequenceOffsetMap', op_offset: int = 0): 473 if op_offset: 474 op_sequence_offset_map = op_sequence_offset_map.copy() 475 op_sequence_offset_map.add_offset(op_offset) 476 477 self.new_by_original.update(op_sequence_offset_map.new_by_original) 478 self.original_by_new.update(op_sequence_offset_map.original_by_new) 479 480 def __getitem__(self, index: Union[int, slice]) -> 'OpSequenceOffsetMap': 481 if isinstance(index, int): 482 index = slice(index, index + 1) 483 484 op_num = index.stop - index.start 485 486 new_by_original_buff = self.new_by_original 487 new_by_original = type(new_by_original_buff)() 488 for original, new in new_by_original_buff.items(): 489 if index.start <= new < index.stop: 490 new_by_original[original] = new 491 492 original_by_new_buff = self.original_by_new 493 original_by_new = type(original_by_new_buff)() 494 for new, original in original_by_new_buff.items(): 495 if index.start <= new < index.stop: 496 original_by_new[new] = original 497 498 result = OpSequenceOffsetMap() 499 result.op_num = op_num 500 result.range = index 501 result.new_by_original = new_by_original 502 result.original_by_new = original_by_new 503 return result 504 505 def copy(self): 506 cls = self.__class__ 507 result = cls.__new__(cls) 508 result.__dict__['new_by_original'] = copy(self.new_by_original) 509 result.__dict__['original_by_new'] = copy(self.original_by_new) 510 result.__dict__['op_num'] = self.op_num 511 result.__dict__['range'] = copy(self.range) 512 return result 513 514 515def opcode(name: str) -> int: 516 name = name.upper() 517 return opmap[name] 518 519 520def opcode_name(opcode: int) -> str: 521 return opname[opcode] 522 523 524class OpSequence: 525 extended_arg_opcode_int = opcode('EXTENDED_ARG') 526 extended_arg_opcode_byte = extended_arg_opcode_int.to_bytes(1, byteorder='little') 527 528 def __init__(self, op_sequence: Optional[List[Tuple[int, int, int, int, int, int]]] = None): 529 self.op_sequence: List[Tuple[int, int, int, int, int, int]] = list() if op_sequence is None else op_sequence 530 self.op_sequence_offset_map: OpSequenceOffsetMap = OpSequenceOffsetMap(len(self.op_sequence)) 531 532 def __len__(self): 533 return len(self.op_sequence) 534 535 def read_slice(self, place: slice) -> 'OpSequence': 536 result = OpSequence() 537 result.op_sequence = self.op_sequence[place] 538 result.op_sequence_offset_map = self.op_sequence_offset_map[place] 539 result.op_sequence_offset_map.shift_to_absolute(0) 540 return result 541 542 def remove_slice(self, place: slice, preserver_index_for_first_x_op: int = 0): 543 self.op_sequence[place] = [] 544 self.op_sequence_offset_map.remove_slice(place.start, place.stop - place.start, preserver_index_for_first_x_op) 545 546 def insert_op_sequence(self, index: int, op_sequence: 'OpSequence', preserver_index_for_first_x_op: int = 0): 547 self.op_sequence[index: index] = op_sequence.op_sequence 548 op_sequence.op_sequence_offset_map.shift_to_absolute(index) 549 self.op_sequence_offset_map.insert_op_sequence_offset(index, op_sequence.op_sequence_offset_map, op_sequence, preserver_index_for_first_x_op) 550 return op_sequence 551 552 def normalize_instructions_arg(self): 553 index = -1 554 while True: 555 index += 1 556 if index >= len(self.op_sequence): 557 break 558 559 op, arg, op_index, offset, real_op_index, real_offset = self.op_sequence[index] 560 if arg is not None: 561 if arg > 255: 562 arg = arg_to_bytes(arg) 563 if real_op_index is not None: 564 real_op_index_delta = real_op_index - op_index 565 real_op_index = index - real_op_index_delta 566 slice_to_delete = slice(real_op_index, index) 567 self.remove_slice(slice_to_delete, 1) 568 index = real_op_index 569 570 extended_arg: bytes = arg[1:] 571 extended_arg_len = len(extended_arg) 572 arg = arg[0] 573 self.op_sequence[index] = (op, arg, index + extended_arg_len, (index + extended_arg_len) * 2, index, index * 2) 574 op_sub_sequence_instructions = list() 575 for extended_arg_int in reversed(extended_arg): 576 op_sub_sequence_instructions.append(make_instruction('EXTENDED_ARG', extended_arg_int)) 577 578 sub_op_sequence: 'OpSequence' = OpSequence.from_instructions_fast(op_sub_sequence_instructions) 579 self.insert_op_sequence(index, sub_op_sequence, 1) 580 index += len(sub_op_sequence.op_sequence) 581 582 def denormalize_instructions_arg(self): 583 raise NotImplementedError 584 585 def find_op(self, op_index: int) -> Optional[slice]: 586 start_index = None 587 end_index = None 588 589 index = op_index - 1 590 while True: 591 index += 1 592 if index >= len(self.op_sequence): 593 break 594 595 current_op, current_arg, _, _, _, _ = self.op_sequence[index] 596 if current_op == OpSequence.extended_arg_opcode_int: 597 continue 598 else: 599 end_index = index + 1 600 break 601 602 previous_op_is_extended_arg = None 603 index = op_index 604 while True: 605 index -= 1 606 if index < 0: 607 if previous_op_is_extended_arg: 608 start_index = 0 609 610 break 611 612 current_op, current_arg, _, _, _, _ = self.op_sequence[index] 613 if current_op == OpSequence.extended_arg_opcode_int: 614 previous_op_is_extended_arg = True 615 continue 616 else: 617 previous_op_is_extended_arg = False 618 start_index = index + 1 619 break 620 621 if (start_index is None) or (end_index is None): 622 return None 623 624 return slice(start_index, end_index) 625 626 def get_arg(self, op_index: Union[int, slice]) -> bytes: 627 if isinstance(op_index, int): 628 op_index = self.find_op(op_index) 629 630 if (op_index is None) or ((op_index.stop - op_index.start) < 1): 631 raise RuntimeError('OP not found by index') 632 633 op_slice = self.op_sequence[op_index] 634 if (op_index.stop - op_index.start) > 1: 635 arg_list = list() 636 for op, arg, _, _, _, _ in reversed(op_slice): 637 arg_list.append(arg) 638 639 return arg_to_int(bytes(arg_list)) 640 else: 641 return op_slice[0][1] 642 643 def set_arg(self, op_index: Union[int, slice], new_arg: Union[None, int, bytes]): 644 if isinstance(op_index, int): 645 op_index = self.find_op(op_index) 646 647 op_index_len = op_index.stop - op_index.start 648 if (op_index is None) or (op_index_len < 1): 649 raise RuntimeError('OP not found by index') 650 651 new_arg = arg_to_int(new_arg) 652 current_op, current_arg, current_op_index, current_offset, current_real_op_index, current_real_offset = self.op_sequence[op_index.stop - 1] 653 sub_op_sequence_bytes = [current_op, new_arg] 654 sub_op_sequence: OpSequence = OpSequence(list(unpack_opargs(sub_op_sequence_bytes))) 655 sub_op_sequence.normalize_instructions_arg() 656 real_arg = sub_op_sequence.op_sequence[-1][1] 657 sub_op_sequence_len = len(sub_op_sequence.op_sequence) 658 extended_args_len = sub_op_sequence_len - 1 659 if extended_args_len: 660 current_real_op_index = op_index.start 661 current_real_offset = current_real_op_index * 2 662 current_op_index = current_real_op_index + extended_args_len 663 current_offset = current_op_index * 2 664 else: 665 current_op_index = op_index.start 666 current_offset = current_op_index * 2 667 current_real_op_index = None 668 current_real_offset = None 669 670 self.op_sequence[op_index.stop - 1] = (current_op, real_arg, current_op_index, current_offset, current_real_op_index, current_real_offset) 671 672 if sub_op_sequence_len > 1: 673 sub_op_sequence.remove_slice(slice(sub_op_sequence_len - 1, sub_op_sequence_len)) 674 if op_index_len > 1: 675 self.remove_slice(slice(op_index.start, op_index.stop - 1), 1) 676 677 self.insert_op_sequence(op_index.start, sub_op_sequence, 1) 678 679 def fix_labels(self, op_by_label: Dict[int, List[int]]): 680 labels = list(op_by_label.keys()) 681 labels.sort() 682 for label, data in op_by_label.items(): 683 data.sort() 684 685 for label in labels: 686 labeled_op_index_new = self.op_sequence_offset_map.new_by_original[label] 687 label_data = op_by_label[label] 688 while True: 689 for op_index in label_data: 690 op_index_new = self.op_sequence_offset_map.new_by_original[op_index] 691 op = self.op_sequence[op_index_new][0] 692 new_arg = op_index_to_arg(labeled_op_index_new) 693 if op in hasjrel: 694 new_arg = new_arg - (op_index_to_arg(op_index_new) + 2) 695 696 self.set_arg(op_index_new, new_arg) 697 698 labeled_op_index_new_after_value_change = self.op_sequence_offset_map.new_by_original[label] 699 if labeled_op_index_new_after_value_change == labeled_op_index_new: 700 break 701 else: 702 labeled_op_index_new = labeled_op_index_new_after_value_change 703 704 def to_sequence_of_ints(self): 705 for op, arg, _, _, _, _ in self.op_sequence: 706 yield op 707 if arg is None: 708 yield 0 709 else: 710 yield arg 711 712 def to_bytes(self) -> bytes: 713 return bytes(self.to_sequence_of_ints()) 714 715 @staticmethod 716 def from_bytecode_sequence(code: BytecodeSequence): 717 labels, op_by_label = find_ops_with_labels(code) 718 return OpSequence(list(unpack_opargs(code))), labels, op_by_label 719 720 @staticmethod 721 def from_entity(entity): 722 code_bytes: BytecodeSequence = _get_code_object(entity).co_code 723 return OpSequence.from_bytecode_sequence(code_bytes) 724 725 @staticmethod 726 def from_instructions(instructions: Sequence[Instruction]): 727 op_sequence: List = list() 728 for instruction in instructions: 729 arg = instruction.arg 730 if arg is None: 731 arg = 0 732 733 op_sequence.extend((instruction.opcode, arg)) 734 735 labels, op_by_label = find_ops_with_labels(op_sequence) 736 result = OpSequence(list(unpack_opargs(op_sequence))) 737 result.normalize_instructions_arg() 738 return result, labels, op_by_label 739 740 @staticmethod 741 def from_instructions_fast(instructions: Sequence[Instruction]): 742 return OpSequence(list(unpack_opargs(instructions_to_sequence_of_ints(normalize_instructions_arg(instructions))))) 743 744 745def arg_to_bytes(arg: Union[int, bytes]) -> bytes: 746 if isinstance(arg, int): 747 arg = arg.to_bytes(4, byteorder='little') 748 if len(arg) > 1: 749 arg = arg.rstrip(b'\x00') 750 751 if not arg: 752 arg = b'\x00' 753 754 return arg 755 756 757def arg_to_int(arg: Union[int, bytes]) -> int: 758 if isinstance(arg, bytes): 759 arg = int.from_bytes(arg, byteorder='little') 760 761 return arg 762 763 764def get_raw_instructions(x, *, first_line=None): 765 readable_instructions = get_instructions(x, first_line) 766 for instruction in readable_instructions: 767 arg = instruction.arg 768 if arg is not None: 769 if isinstance(arg, int): 770 arg = arg.to_bytes(4, byteorder='little') 771 772 arg = arg[0] 773 774 yield Instruction(instruction.opname, instruction.opcode, arg, instruction.argval, instruction.argrepr, instruction.offset, instruction.starts_line, instruction.is_jump_target) 775 776 777def normalize_instructions_arg(instructions: Sequence[Instruction]): 778 for instruction in instructions: 779 arg = instruction.arg 780 if arg is not None: 781 arg = arg_to_bytes(arg) 782 extended_arg = arg[1:] 783 arg = arg[0] 784 for extended_arg_int in reversed(extended_arg): 785 yield make_instruction('EXTENDED_ARG', extended_arg_int) 786 787 yield Instruction(instruction.opname, instruction.opcode, arg, instruction.argval, instruction.argrepr, instruction.offset, instruction.starts_line, instruction.is_jump_target) 788 789 790def instructions_to_sequence_of_ints(instructions: Sequence[Instruction]): 791 for instruction in instructions: 792 op = instruction.opcode 793 yield op 794 arg = instruction.arg 795 yield 0 if arg is None else arg 796 797 798def instructions_to_bytes(instructions: Sequence[Instruction]) -> bytes: 799 return bytes(instructions_to_sequence_of_ints(instructions)) 800 801 802def make_instruction(name: str, arg: Union[None, int, bytes] = None) -> Instruction: 803 name = name.upper() 804 op = opmap[name] 805 arg = arg_to_int(arg) 806 return Instruction(name, op, arg, None, None, None, None, None) 807 808 809mi = make_instruction 810 811 812def fix_labels(op_sequence: OpSequence, op_by_label: Dict[int, List[Tuple[int, int, int, int, int, int]]]): 813 op_sequence.fix_labels({arg_to_op_index(label): [op_info[2] for op_info in data] for label, data in op_by_label.items()}) 814 815 816def code_info(x): 817 """Formatted details of methods, functions, or code.""" 818 return _format_code_info(_get_code_object(x)) 819 820 821def _pr(name, data): 822 print(f'<<{name}>> type: {type(data)}; value: {data}') 823 824 825def _format_code_info(co): 826 _pr('Name', co.co_name) 827 _pr('Filename', co.co_filename) 828 _pr('Argument count', co.co_argcount) 829 _pr('Positional-only arguments', co.co_posonlyargcount) 830 _pr('Kw-only arguments', co.co_kwonlyargcount) 831 _pr('Number of locals', co.co_nlocals) 832 _pr('Stack size', co.co_stacksize) 833 _pr('Flags', co.co_flags) 834 _pr('Constants', co.co_consts) 835 _pr('Names', co.co_names) 836 _pr('Variable names', co.co_varnames) 837 _pr('Free variables', co.co_freevars) 838 _pr('Cell variables', co.co_cellvars)
BytecodeSequence =
typing.Union[bytes, typing.Sequence[typing.Union[int, bytes]]]
def
patch_function(func, co_code):
53def patch_function(func, co_code): 54 fn_code = func.__code__ 55 func.__code__ = CodeType( 56 fn_code.co_argcount, 57 fn_code.co_kwonlyargcount, 58 fn_code.co_nlocals, 59 fn_code.co_stacksize, 60 fn_code.co_flags, 61 co_code, 62 fn_code.co_consts, 63 fn_code.co_names, 64 fn_code.co_varnames, 65 fn_code.co_filename, 66 fn_code.co_name, 67 fn_code.co_firstlineno, 68 fn_code.co_lnotab, 69 fn_code.co_freevars, 70 fn_code.co_cellvars, 71 )
class
CodeParamNames(builtins.tuple):
CodeParamNames(positional, positional_only, keyword_only)
CodeParamNames(positional, positional_only, keyword_only)
Create new instance of CodeParamNames(positional, positional_only, keyword_only)
Inherited Members
- builtins.tuple
- index
- count
77def code_param_names(code) -> CodeParamNames: 78 pos_count = code.co_argcount 79 arg_names = code.co_varnames 80 positional = arg_names[:pos_count] 81 posonly_count = code.co_posonlyargcount 82 positional_only = arg_names[:posonly_count] 83 keyword_only_count = code.co_kwonlyargcount 84 keyword_only = arg_names[pos_count:pos_count + keyword_only_count] 85 return CodeParamNames(positional, positional_only, keyword_only)
def
code_name(code: code) -> str:
def
get_code(x=None) -> code:
97def get_code(x = None) -> CodeType: 98 if x is None: 99 return 100 101 if isinstance(x, CodeType): 102 return x 103 104 # Extract functions from methods. 105 if hasattr(x, '__func__'): 106 x = x.__func__ 107 # Extract compiled code objects from... 108 if hasattr(x, '__code__'): # ...a function, or 109 x = x.__code__ 110 elif hasattr(x, 'gi_code'): #...a generator object, or 111 x = x.gi_code 112 elif hasattr(x, 'ag_code'): #...an asynchronous generator object, or 113 x = x.ag_code 114 elif hasattr(x, 'cr_code'): #...a coroutine, or 115 x = x.cr_code 116 elif hasattr(x, 'f_code'): #...a frame. 117 x = x.f_code 118 # else: 119 # raise TypeError(f'Expected a code object or an entity with code, but got {type(x)}') 120 121 return x
def
has_code(x=None) -> bool:
124def has_code(x = None) -> bool: 125 if x is None: 126 return False 127 128 if isinstance(x, CodeType): 129 return True 130 131 # Extract functions from methods. 132 if hasattr(x, '__func__'): 133 return True 134 # Extract compiled code objects from... 135 if hasattr(x, '__code__'): # ...a function, or 136 return True 137 elif hasattr(x, 'gi_code'): #...a generator object, or 138 return True 139 elif hasattr(x, 'ag_code'): #...an asynchronous generator object, or 140 return True 141 elif hasattr(x, 'cr_code'): #...a coroutine, or 142 return True 143 elif hasattr(x, 'f_code'): #...a frame. 144 return True 145 # else: 146 # raise TypeError(f'Expected a code object or an entity with code, but got {type(x)}') 147 148 return False
class
CodeTypeEnum(enum.Enum):
151class CodeTypeEnum(Enum): 152 class_or_module = 0 153 code_object = 1 154 raw_bytecode = 2 155 source_code = 3 156 unknown = 4
An enumeration.
class_or_module =
<CodeTypeEnum.class_or_module: 0>
code_object =
<CodeTypeEnum.code_object: 1>
raw_bytecode =
<CodeTypeEnum.raw_bytecode: 2>
source_code =
<CodeTypeEnum.source_code: 3>
unknown =
<CodeTypeEnum.unknown: 4>
Inherited Members
- enum.Enum
- name
- value
159def code_type(x=None) -> Optional[CodeTypeEnum]: 160 """_summary_ 161 162 Args: 163 x (_type_, optional): result of get_code() function. Defaults to None. 164 165 Returns: 166 Optional[CodeTypeEnum]: _description_ 167 """ 168 if x is None: 169 return 170 171 # Perform the disassembly. 172 if hasattr(x, '__dict__'): # Class or module 173 return CodeTypeEnum.class_or_module 174 elif hasattr(x, 'co_code'): # Code object 175 return CodeTypeEnum.code_object 176 elif isinstance(x, (bytes, bytearray)): # Raw bytecode 177 return CodeTypeEnum.raw_bytecode 178 elif isinstance(x, str): # Source code 179 return CodeTypeEnum.source_code 180 else: 181 return CodeTypeEnum.unknown
_summary_
Args: x (_type_, optional): result of get_code() function. Defaults to None.
Returns: Optional[CodeTypeEnum]: _description_
def
set_code(x, code: code):
184def set_code(x, code: CodeType): 185 # Extract functions from methods. 186 if hasattr(x, '__func__'): 187 x.__func__ = code 188 # Extract compiled code objects from... 189 if hasattr(x, '__code__'): # ...a function, or 190 x.__code__ = code 191 elif hasattr(x, 'gi_code'): #...a generator object, or 192 x.gi_code = code 193 elif hasattr(x, 'ag_code'): #...an asynchronous generator object, or 194 x.ag_code = code 195 elif hasattr(x, 'cr_code'): #...a coroutine. 196 x.cr_code = code 197 198 return x
class
OpSequenceOffsetMap(cengal.entities.copyable.versions.v_0.copyable.CopyableMixin):
360class OpSequenceOffsetMap(CopyableMixin): 361 def __init__(self, op_num: int = 0): 362 self.op_num: int = op_num 363 self.range: Optional[slice] = slice(0, op_num) if op_num else None 364 self.new_by_original: Dict[int, int] = dict() 365 self.original_by_new: Dict[int, int] = dict() 366 if op_num < 0: 367 op_num = 0 368 369 for index in range(op_num): 370 # bindex = index * 2 371 bindex = index 372 self.new_by_original[bindex] = bindex 373 self.original_by_new[bindex] = bindex 374 375 def mapped_range(self) -> slice: 376 return slice(min(self.original_by_new), max(self.original_by_new)) 377 378 379 def remove_slice(self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0): 380 if not op_num: 381 return 382 383 offset_change_start = op_index 384 del_start = op_index 385 del_stop = op_index + op_num 386 387 new_by_original_buff = self.new_by_original 388 self.new_by_original = type(self.new_by_original)() 389 to_be_deleted: List[Tuple[int, int]] = list() 390 for original, new in new_by_original_buff.items(): 391 if del_start <= new < del_stop: 392 to_be_deleted.append((original, new)) 393 394 to_be_deleted.sort(key=lambda x: x[1]) 395 if preserver_index_for_first_x_op: 396 to_be_deleted = to_be_deleted[preserver_index_for_first_x_op:] 397 398 for original, new in to_be_deleted: 399 del self.new_by_original[original] 400 401 original_by_new_buff = self.original_by_new 402 self.original_by_new = type(self.original_by_new)() 403 to_be_deleted = list() 404 for new, original in original_by_new_buff.items(): 405 if del_start <= new < del_stop: 406 to_be_deleted.append((new, original)) 407 408 to_be_deleted.sort(key=lambda x: x[0]) 409 if preserver_index_for_first_x_op: 410 ignored = to_be_deleted[:preserver_index_for_first_x_op] 411 if ignored: 412 last_ignored = ignored[-1] 413 last_ignored_new = last_ignored[0] 414 offset_change_start = last_ignored_new + 1 415 416 to_be_deleted = to_be_deleted[preserver_index_for_first_x_op:] 417 418 for new, original in to_be_deleted: 419 del self.original_by_new[new] 420 421 self.add_offset(-op_num, offset_change_start) 422 self.op_num -= op_num 423 self.range = slice(self.range.start, self.range.stop - op_num) 424 425 def insert_slice(self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0): 426 if not op_num: 427 return 428 429 self.add_offset(op_num, op_index + preserver_index_for_first_x_op) 430 self.op_num += op_num 431 self.range = slice(self.range.start, self.range.stop + op_num) 432 433 def insert_op_sequence_offset(self, op_index: int, op_sequence_offset_map: 'OpSequenceOffsetMap', insertion_id: Any, preserver_index_for_first_x_op: int = 0): 434 op_num: int = op_sequence_offset_map.op_num 435 if not op_num: 436 return 437 438 sequence_range: slice = op_sequence_offset_map.range 439 offset = op_index - sequence_range.start 440 self.insert_slice(op_index, op_num, preserver_index_for_first_x_op) 441 for new, original in op_sequence_offset_map.original_by_new.items(): 442 self.original_by_new[new + offset] = (insertion_id, new) 443 444 def add_offset(self, op_offset: int, op_start: int = 0): 445 if not op_offset: 446 return 447 448 new_by_original_buff = self.new_by_original 449 self.new_by_original = type(self.new_by_original)() 450 for original, new in new_by_original_buff.items(): 451 if new >= op_start: 452 new += op_offset 453 454 self.new_by_original[original] = new 455 456 original_by_new_buff = self.original_by_new 457 self.original_by_new = type(self.original_by_new)() 458 for new, original in original_by_new_buff.items(): 459 if new >= op_start: 460 new += op_offset 461 462 self.original_by_new[new] = original 463 464 def shift(self, op_offset: int): 465 self.add_offset(op_offset) 466 self.range = slice(self.range.start + op_offset, self.range.stop + op_offset) 467 468 def shift_to_absolute(self, op_index: int): 469 op_offset: int = op_index - self.range.start 470 self.add_offset(op_offset) 471 self.range = slice(self.range.start + op_offset, self.range.stop + op_offset) 472 473 def update(self, op_sequence_offset_map: 'OpSequenceOffsetMap', op_offset: int = 0): 474 if op_offset: 475 op_sequence_offset_map = op_sequence_offset_map.copy() 476 op_sequence_offset_map.add_offset(op_offset) 477 478 self.new_by_original.update(op_sequence_offset_map.new_by_original) 479 self.original_by_new.update(op_sequence_offset_map.original_by_new) 480 481 def __getitem__(self, index: Union[int, slice]) -> 'OpSequenceOffsetMap': 482 if isinstance(index, int): 483 index = slice(index, index + 1) 484 485 op_num = index.stop - index.start 486 487 new_by_original_buff = self.new_by_original 488 new_by_original = type(new_by_original_buff)() 489 for original, new in new_by_original_buff.items(): 490 if index.start <= new < index.stop: 491 new_by_original[original] = new 492 493 original_by_new_buff = self.original_by_new 494 original_by_new = type(original_by_new_buff)() 495 for new, original in original_by_new_buff.items(): 496 if index.start <= new < index.stop: 497 original_by_new[new] = original 498 499 result = OpSequenceOffsetMap() 500 result.op_num = op_num 501 result.range = index 502 result.new_by_original = new_by_original 503 result.original_by_new = original_by_new 504 return result 505 506 def copy(self): 507 cls = self.__class__ 508 result = cls.__new__(cls) 509 result.__dict__['new_by_original'] = copy(self.new_by_original) 510 result.__dict__['original_by_new'] = copy(self.original_by_new) 511 result.__dict__['op_num'] = self.op_num 512 result.__dict__['range'] = copy(self.range) 513 return result
OpSequenceOffsetMap(op_num: int = 0)
361 def __init__(self, op_num: int = 0): 362 self.op_num: int = op_num 363 self.range: Optional[slice] = slice(0, op_num) if op_num else None 364 self.new_by_original: Dict[int, int] = dict() 365 self.original_by_new: Dict[int, int] = dict() 366 if op_num < 0: 367 op_num = 0 368 369 for index in range(op_num): 370 # bindex = index * 2 371 bindex = index 372 self.new_by_original[bindex] = bindex 373 self.original_by_new[bindex] = bindex
def
remove_slice( self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0):
379 def remove_slice(self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0): 380 if not op_num: 381 return 382 383 offset_change_start = op_index 384 del_start = op_index 385 del_stop = op_index + op_num 386 387 new_by_original_buff = self.new_by_original 388 self.new_by_original = type(self.new_by_original)() 389 to_be_deleted: List[Tuple[int, int]] = list() 390 for original, new in new_by_original_buff.items(): 391 if del_start <= new < del_stop: 392 to_be_deleted.append((original, new)) 393 394 to_be_deleted.sort(key=lambda x: x[1]) 395 if preserver_index_for_first_x_op: 396 to_be_deleted = to_be_deleted[preserver_index_for_first_x_op:] 397 398 for original, new in to_be_deleted: 399 del self.new_by_original[original] 400 401 original_by_new_buff = self.original_by_new 402 self.original_by_new = type(self.original_by_new)() 403 to_be_deleted = list() 404 for new, original in original_by_new_buff.items(): 405 if del_start <= new < del_stop: 406 to_be_deleted.append((new, original)) 407 408 to_be_deleted.sort(key=lambda x: x[0]) 409 if preserver_index_for_first_x_op: 410 ignored = to_be_deleted[:preserver_index_for_first_x_op] 411 if ignored: 412 last_ignored = ignored[-1] 413 last_ignored_new = last_ignored[0] 414 offset_change_start = last_ignored_new + 1 415 416 to_be_deleted = to_be_deleted[preserver_index_for_first_x_op:] 417 418 for new, original in to_be_deleted: 419 del self.original_by_new[new] 420 421 self.add_offset(-op_num, offset_change_start) 422 self.op_num -= op_num 423 self.range = slice(self.range.start, self.range.stop - op_num)
def
insert_op_sequence_offset( self, op_index: int, op_sequence_offset_map: OpSequenceOffsetMap, insertion_id: Any, preserver_index_for_first_x_op: int = 0):
433 def insert_op_sequence_offset(self, op_index: int, op_sequence_offset_map: 'OpSequenceOffsetMap', insertion_id: Any, preserver_index_for_first_x_op: int = 0): 434 op_num: int = op_sequence_offset_map.op_num 435 if not op_num: 436 return 437 438 sequence_range: slice = op_sequence_offset_map.range 439 offset = op_index - sequence_range.start 440 self.insert_slice(op_index, op_num, preserver_index_for_first_x_op) 441 for new, original in op_sequence_offset_map.original_by_new.items(): 442 self.original_by_new[new + offset] = (insertion_id, new)
def
add_offset(self, op_offset: int, op_start: int = 0):
444 def add_offset(self, op_offset: int, op_start: int = 0): 445 if not op_offset: 446 return 447 448 new_by_original_buff = self.new_by_original 449 self.new_by_original = type(self.new_by_original)() 450 for original, new in new_by_original_buff.items(): 451 if new >= op_start: 452 new += op_offset 453 454 self.new_by_original[original] = new 455 456 original_by_new_buff = self.original_by_new 457 self.original_by_new = type(self.original_by_new)() 458 for new, original in original_by_new_buff.items(): 459 if new >= op_start: 460 new += op_offset 461 462 self.original_by_new[new] = original
473 def update(self, op_sequence_offset_map: 'OpSequenceOffsetMap', op_offset: int = 0): 474 if op_offset: 475 op_sequence_offset_map = op_sequence_offset_map.copy() 476 op_sequence_offset_map.add_offset(op_offset) 477 478 self.new_by_original.update(op_sequence_offset_map.new_by_original) 479 self.original_by_new.update(op_sequence_offset_map.original_by_new)
def
copy(self):
506 def copy(self): 507 cls = self.__class__ 508 result = cls.__new__(cls) 509 result.__dict__['new_by_original'] = copy(self.new_by_original) 510 result.__dict__['original_by_new'] = copy(self.original_by_new) 511 result.__dict__['op_num'] = self.op_num 512 result.__dict__['range'] = copy(self.range) 513 return result
Should make relevant copy of an object (not so general and deep as a deepcopy()). should copy only known object fields. Example: def copy(self): cls = self.__class__ result = cls.__new__(cls) result.__dict__['dimension'] = self.dimension result.__dict__['_point'] = self._point.copy() return result
Raises: NotImplementedError: _description_
Inherited Members
- cengal.entities.copyable.versions.v_0.copyable.CopyMethodsMixin
- shallow_copy
- updated_copy
def
opcode(name: str) -> int:
def
opcode_name(opcode: int) -> str:
class
OpSequence:
525class OpSequence: 526 extended_arg_opcode_int = opcode('EXTENDED_ARG') 527 extended_arg_opcode_byte = extended_arg_opcode_int.to_bytes(1, byteorder='little') 528 529 def __init__(self, op_sequence: Optional[List[Tuple[int, int, int, int, int, int]]] = None): 530 self.op_sequence: List[Tuple[int, int, int, int, int, int]] = list() if op_sequence is None else op_sequence 531 self.op_sequence_offset_map: OpSequenceOffsetMap = OpSequenceOffsetMap(len(self.op_sequence)) 532 533 def __len__(self): 534 return len(self.op_sequence) 535 536 def read_slice(self, place: slice) -> 'OpSequence': 537 result = OpSequence() 538 result.op_sequence = self.op_sequence[place] 539 result.op_sequence_offset_map = self.op_sequence_offset_map[place] 540 result.op_sequence_offset_map.shift_to_absolute(0) 541 return result 542 543 def remove_slice(self, place: slice, preserver_index_for_first_x_op: int = 0): 544 self.op_sequence[place] = [] 545 self.op_sequence_offset_map.remove_slice(place.start, place.stop - place.start, preserver_index_for_first_x_op) 546 547 def insert_op_sequence(self, index: int, op_sequence: 'OpSequence', preserver_index_for_first_x_op: int = 0): 548 self.op_sequence[index: index] = op_sequence.op_sequence 549 op_sequence.op_sequence_offset_map.shift_to_absolute(index) 550 self.op_sequence_offset_map.insert_op_sequence_offset(index, op_sequence.op_sequence_offset_map, op_sequence, preserver_index_for_first_x_op) 551 return op_sequence 552 553 def normalize_instructions_arg(self): 554 index = -1 555 while True: 556 index += 1 557 if index >= len(self.op_sequence): 558 break 559 560 op, arg, op_index, offset, real_op_index, real_offset = self.op_sequence[index] 561 if arg is not None: 562 if arg > 255: 563 arg = arg_to_bytes(arg) 564 if real_op_index is not None: 565 real_op_index_delta = real_op_index - op_index 566 real_op_index = index - real_op_index_delta 567 slice_to_delete = slice(real_op_index, index) 568 self.remove_slice(slice_to_delete, 1) 569 index = real_op_index 570 571 extended_arg: bytes = arg[1:] 572 extended_arg_len = len(extended_arg) 573 arg = arg[0] 574 self.op_sequence[index] = (op, arg, index + extended_arg_len, (index + extended_arg_len) * 2, index, index * 2) 575 op_sub_sequence_instructions = list() 576 for extended_arg_int in reversed(extended_arg): 577 op_sub_sequence_instructions.append(make_instruction('EXTENDED_ARG', extended_arg_int)) 578 579 sub_op_sequence: 'OpSequence' = OpSequence.from_instructions_fast(op_sub_sequence_instructions) 580 self.insert_op_sequence(index, sub_op_sequence, 1) 581 index += len(sub_op_sequence.op_sequence) 582 583 def denormalize_instructions_arg(self): 584 raise NotImplementedError 585 586 def find_op(self, op_index: int) -> Optional[slice]: 587 start_index = None 588 end_index = None 589 590 index = op_index - 1 591 while True: 592 index += 1 593 if index >= len(self.op_sequence): 594 break 595 596 current_op, current_arg, _, _, _, _ = self.op_sequence[index] 597 if current_op == OpSequence.extended_arg_opcode_int: 598 continue 599 else: 600 end_index = index + 1 601 break 602 603 previous_op_is_extended_arg = None 604 index = op_index 605 while True: 606 index -= 1 607 if index < 0: 608 if previous_op_is_extended_arg: 609 start_index = 0 610 611 break 612 613 current_op, current_arg, _, _, _, _ = self.op_sequence[index] 614 if current_op == OpSequence.extended_arg_opcode_int: 615 previous_op_is_extended_arg = True 616 continue 617 else: 618 previous_op_is_extended_arg = False 619 start_index = index + 1 620 break 621 622 if (start_index is None) or (end_index is None): 623 return None 624 625 return slice(start_index, end_index) 626 627 def get_arg(self, op_index: Union[int, slice]) -> bytes: 628 if isinstance(op_index, int): 629 op_index = self.find_op(op_index) 630 631 if (op_index is None) or ((op_index.stop - op_index.start) < 1): 632 raise RuntimeError('OP not found by index') 633 634 op_slice = self.op_sequence[op_index] 635 if (op_index.stop - op_index.start) > 1: 636 arg_list = list() 637 for op, arg, _, _, _, _ in reversed(op_slice): 638 arg_list.append(arg) 639 640 return arg_to_int(bytes(arg_list)) 641 else: 642 return op_slice[0][1] 643 644 def set_arg(self, op_index: Union[int, slice], new_arg: Union[None, int, bytes]): 645 if isinstance(op_index, int): 646 op_index = self.find_op(op_index) 647 648 op_index_len = op_index.stop - op_index.start 649 if (op_index is None) or (op_index_len < 1): 650 raise RuntimeError('OP not found by index') 651 652 new_arg = arg_to_int(new_arg) 653 current_op, current_arg, current_op_index, current_offset, current_real_op_index, current_real_offset = self.op_sequence[op_index.stop - 1] 654 sub_op_sequence_bytes = [current_op, new_arg] 655 sub_op_sequence: OpSequence = OpSequence(list(unpack_opargs(sub_op_sequence_bytes))) 656 sub_op_sequence.normalize_instructions_arg() 657 real_arg = sub_op_sequence.op_sequence[-1][1] 658 sub_op_sequence_len = len(sub_op_sequence.op_sequence) 659 extended_args_len = sub_op_sequence_len - 1 660 if extended_args_len: 661 current_real_op_index = op_index.start 662 current_real_offset = current_real_op_index * 2 663 current_op_index = current_real_op_index + extended_args_len 664 current_offset = current_op_index * 2 665 else: 666 current_op_index = op_index.start 667 current_offset = current_op_index * 2 668 current_real_op_index = None 669 current_real_offset = None 670 671 self.op_sequence[op_index.stop - 1] = (current_op, real_arg, current_op_index, current_offset, current_real_op_index, current_real_offset) 672 673 if sub_op_sequence_len > 1: 674 sub_op_sequence.remove_slice(slice(sub_op_sequence_len - 1, sub_op_sequence_len)) 675 if op_index_len > 1: 676 self.remove_slice(slice(op_index.start, op_index.stop - 1), 1) 677 678 self.insert_op_sequence(op_index.start, sub_op_sequence, 1) 679 680 def fix_labels(self, op_by_label: Dict[int, List[int]]): 681 labels = list(op_by_label.keys()) 682 labels.sort() 683 for label, data in op_by_label.items(): 684 data.sort() 685 686 for label in labels: 687 labeled_op_index_new = self.op_sequence_offset_map.new_by_original[label] 688 label_data = op_by_label[label] 689 while True: 690 for op_index in label_data: 691 op_index_new = self.op_sequence_offset_map.new_by_original[op_index] 692 op = self.op_sequence[op_index_new][0] 693 new_arg = op_index_to_arg(labeled_op_index_new) 694 if op in hasjrel: 695 new_arg = new_arg - (op_index_to_arg(op_index_new) + 2) 696 697 self.set_arg(op_index_new, new_arg) 698 699 labeled_op_index_new_after_value_change = self.op_sequence_offset_map.new_by_original[label] 700 if labeled_op_index_new_after_value_change == labeled_op_index_new: 701 break 702 else: 703 labeled_op_index_new = labeled_op_index_new_after_value_change 704 705 def to_sequence_of_ints(self): 706 for op, arg, _, _, _, _ in self.op_sequence: 707 yield op 708 if arg is None: 709 yield 0 710 else: 711 yield arg 712 713 def to_bytes(self) -> bytes: 714 return bytes(self.to_sequence_of_ints()) 715 716 @staticmethod 717 def from_bytecode_sequence(code: BytecodeSequence): 718 labels, op_by_label = find_ops_with_labels(code) 719 return OpSequence(list(unpack_opargs(code))), labels, op_by_label 720 721 @staticmethod 722 def from_entity(entity): 723 code_bytes: BytecodeSequence = _get_code_object(entity).co_code 724 return OpSequence.from_bytecode_sequence(code_bytes) 725 726 @staticmethod 727 def from_instructions(instructions: Sequence[Instruction]): 728 op_sequence: List = list() 729 for instruction in instructions: 730 arg = instruction.arg 731 if arg is None: 732 arg = 0 733 734 op_sequence.extend((instruction.opcode, arg)) 735 736 labels, op_by_label = find_ops_with_labels(op_sequence) 737 result = OpSequence(list(unpack_opargs(op_sequence))) 738 result.normalize_instructions_arg() 739 return result, labels, op_by_label 740 741 @staticmethod 742 def from_instructions_fast(instructions: Sequence[Instruction]): 743 return OpSequence(list(unpack_opargs(instructions_to_sequence_of_ints(normalize_instructions_arg(instructions)))))
OpSequence( op_sequence: Union[List[Tuple[int, int, int, int, int, int]], NoneType] = None)
529 def __init__(self, op_sequence: Optional[List[Tuple[int, int, int, int, int, int]]] = None): 530 self.op_sequence: List[Tuple[int, int, int, int, int, int]] = list() if op_sequence is None else op_sequence 531 self.op_sequence_offset_map: OpSequenceOffsetMap = OpSequenceOffsetMap(len(self.op_sequence))
op_sequence_offset_map: OpSequenceOffsetMap
def
insert_op_sequence( self, index: int, op_sequence: OpSequence, preserver_index_for_first_x_op: int = 0):
547 def insert_op_sequence(self, index: int, op_sequence: 'OpSequence', preserver_index_for_first_x_op: int = 0): 548 self.op_sequence[index: index] = op_sequence.op_sequence 549 op_sequence.op_sequence_offset_map.shift_to_absolute(index) 550 self.op_sequence_offset_map.insert_op_sequence_offset(index, op_sequence.op_sequence_offset_map, op_sequence, preserver_index_for_first_x_op) 551 return op_sequence
def
normalize_instructions_arg(self):
553 def normalize_instructions_arg(self): 554 index = -1 555 while True: 556 index += 1 557 if index >= len(self.op_sequence): 558 break 559 560 op, arg, op_index, offset, real_op_index, real_offset = self.op_sequence[index] 561 if arg is not None: 562 if arg > 255: 563 arg = arg_to_bytes(arg) 564 if real_op_index is not None: 565 real_op_index_delta = real_op_index - op_index 566 real_op_index = index - real_op_index_delta 567 slice_to_delete = slice(real_op_index, index) 568 self.remove_slice(slice_to_delete, 1) 569 index = real_op_index 570 571 extended_arg: bytes = arg[1:] 572 extended_arg_len = len(extended_arg) 573 arg = arg[0] 574 self.op_sequence[index] = (op, arg, index + extended_arg_len, (index + extended_arg_len) * 2, index, index * 2) 575 op_sub_sequence_instructions = list() 576 for extended_arg_int in reversed(extended_arg): 577 op_sub_sequence_instructions.append(make_instruction('EXTENDED_ARG', extended_arg_int)) 578 579 sub_op_sequence: 'OpSequence' = OpSequence.from_instructions_fast(op_sub_sequence_instructions) 580 self.insert_op_sequence(index, sub_op_sequence, 1) 581 index += len(sub_op_sequence.op_sequence)
def
find_op(self, op_index: int) -> Union[slice, NoneType]:
586 def find_op(self, op_index: int) -> Optional[slice]: 587 start_index = None 588 end_index = None 589 590 index = op_index - 1 591 while True: 592 index += 1 593 if index >= len(self.op_sequence): 594 break 595 596 current_op, current_arg, _, _, _, _ = self.op_sequence[index] 597 if current_op == OpSequence.extended_arg_opcode_int: 598 continue 599 else: 600 end_index = index + 1 601 break 602 603 previous_op_is_extended_arg = None 604 index = op_index 605 while True: 606 index -= 1 607 if index < 0: 608 if previous_op_is_extended_arg: 609 start_index = 0 610 611 break 612 613 current_op, current_arg, _, _, _, _ = self.op_sequence[index] 614 if current_op == OpSequence.extended_arg_opcode_int: 615 previous_op_is_extended_arg = True 616 continue 617 else: 618 previous_op_is_extended_arg = False 619 start_index = index + 1 620 break 621 622 if (start_index is None) or (end_index is None): 623 return None 624 625 return slice(start_index, end_index)
def
get_arg(self, op_index: Union[int, slice]) -> bytes:
627 def get_arg(self, op_index: Union[int, slice]) -> bytes: 628 if isinstance(op_index, int): 629 op_index = self.find_op(op_index) 630 631 if (op_index is None) or ((op_index.stop - op_index.start) < 1): 632 raise RuntimeError('OP not found by index') 633 634 op_slice = self.op_sequence[op_index] 635 if (op_index.stop - op_index.start) > 1: 636 arg_list = list() 637 for op, arg, _, _, _, _ in reversed(op_slice): 638 arg_list.append(arg) 639 640 return arg_to_int(bytes(arg_list)) 641 else: 642 return op_slice[0][1]
def
set_arg( self, op_index: Union[int, slice], new_arg: Union[NoneType, int, bytes]):
644 def set_arg(self, op_index: Union[int, slice], new_arg: Union[None, int, bytes]): 645 if isinstance(op_index, int): 646 op_index = self.find_op(op_index) 647 648 op_index_len = op_index.stop - op_index.start 649 if (op_index is None) or (op_index_len < 1): 650 raise RuntimeError('OP not found by index') 651 652 new_arg = arg_to_int(new_arg) 653 current_op, current_arg, current_op_index, current_offset, current_real_op_index, current_real_offset = self.op_sequence[op_index.stop - 1] 654 sub_op_sequence_bytes = [current_op, new_arg] 655 sub_op_sequence: OpSequence = OpSequence(list(unpack_opargs(sub_op_sequence_bytes))) 656 sub_op_sequence.normalize_instructions_arg() 657 real_arg = sub_op_sequence.op_sequence[-1][1] 658 sub_op_sequence_len = len(sub_op_sequence.op_sequence) 659 extended_args_len = sub_op_sequence_len - 1 660 if extended_args_len: 661 current_real_op_index = op_index.start 662 current_real_offset = current_real_op_index * 2 663 current_op_index = current_real_op_index + extended_args_len 664 current_offset = current_op_index * 2 665 else: 666 current_op_index = op_index.start 667 current_offset = current_op_index * 2 668 current_real_op_index = None 669 current_real_offset = None 670 671 self.op_sequence[op_index.stop - 1] = (current_op, real_arg, current_op_index, current_offset, current_real_op_index, current_real_offset) 672 673 if sub_op_sequence_len > 1: 674 sub_op_sequence.remove_slice(slice(sub_op_sequence_len - 1, sub_op_sequence_len)) 675 if op_index_len > 1: 676 self.remove_slice(slice(op_index.start, op_index.stop - 1), 1) 677 678 self.insert_op_sequence(op_index.start, sub_op_sequence, 1)
def
fix_labels(self, op_by_label: Dict[int, List[int]]):
680 def fix_labels(self, op_by_label: Dict[int, List[int]]): 681 labels = list(op_by_label.keys()) 682 labels.sort() 683 for label, data in op_by_label.items(): 684 data.sort() 685 686 for label in labels: 687 labeled_op_index_new = self.op_sequence_offset_map.new_by_original[label] 688 label_data = op_by_label[label] 689 while True: 690 for op_index in label_data: 691 op_index_new = self.op_sequence_offset_map.new_by_original[op_index] 692 op = self.op_sequence[op_index_new][0] 693 new_arg = op_index_to_arg(labeled_op_index_new) 694 if op in hasjrel: 695 new_arg = new_arg - (op_index_to_arg(op_index_new) + 2) 696 697 self.set_arg(op_index_new, new_arg) 698 699 labeled_op_index_new_after_value_change = self.op_sequence_offset_map.new_by_original[label] 700 if labeled_op_index_new_after_value_change == labeled_op_index_new: 701 break 702 else: 703 labeled_op_index_new = labeled_op_index_new_after_value_change
@staticmethod
def
from_instructions(instructions: Sequence[dis.Instruction]):
726 @staticmethod 727 def from_instructions(instructions: Sequence[Instruction]): 728 op_sequence: List = list() 729 for instruction in instructions: 730 arg = instruction.arg 731 if arg is None: 732 arg = 0 733 734 op_sequence.extend((instruction.opcode, arg)) 735 736 labels, op_by_label = find_ops_with_labels(op_sequence) 737 result = OpSequence(list(unpack_opargs(op_sequence))) 738 result.normalize_instructions_arg() 739 return result, labels, op_by_label
def
arg_to_bytes(arg: Union[int, bytes]) -> bytes:
def
arg_to_int(arg: Union[int, bytes]) -> int:
def
get_raw_instructions(x, *, first_line=None):
765def get_raw_instructions(x, *, first_line=None): 766 readable_instructions = get_instructions(x, first_line) 767 for instruction in readable_instructions: 768 arg = instruction.arg 769 if arg is not None: 770 if isinstance(arg, int): 771 arg = arg.to_bytes(4, byteorder='little') 772 773 arg = arg[0] 774 775 yield Instruction(instruction.opname, instruction.opcode, arg, instruction.argval, instruction.argrepr, instruction.offset, instruction.starts_line, instruction.is_jump_target)
def
normalize_instructions_arg(instructions: Sequence[dis.Instruction]):
778def normalize_instructions_arg(instructions: Sequence[Instruction]): 779 for instruction in instructions: 780 arg = instruction.arg 781 if arg is not None: 782 arg = arg_to_bytes(arg) 783 extended_arg = arg[1:] 784 arg = arg[0] 785 for extended_arg_int in reversed(extended_arg): 786 yield make_instruction('EXTENDED_ARG', extended_arg_int) 787 788 yield Instruction(instruction.opname, instruction.opcode, arg, instruction.argval, instruction.argrepr, instruction.offset, instruction.starts_line, instruction.is_jump_target)
def
instructions_to_sequence_of_ints(instructions: Sequence[dis.Instruction]):
def
instructions_to_bytes(instructions: Sequence[dis.Instruction]) -> bytes:
def
make_instruction(name: str, arg: Union[NoneType, int, bytes] = None) -> dis.Instruction:
def
mi(name: str, arg: Union[NoneType, int, bytes] = None) -> dis.Instruction:
def
fix_labels( op_sequence: OpSequence, op_by_label: Dict[int, List[Tuple[int, int, int, int, int, int]]]):
def
code_info(x):
817def code_info(x): 818 """Formatted details of methods, functions, or code.""" 819 return _format_code_info(_get_code_object(x))
Formatted details of methods, functions, or code.
def
modify_code(original_code: code, co_code, co_consts, co_names, co_varnames):
203 def modify_code(original_code: CodeType, co_code, co_consts, co_names, co_varnames): 204 co_nlocals = len(co_varnames) 205 return CodeType( 206 original_code.co_argcount, 207 original_code.co_posonlyargcount, 208 original_code.co_kwonlyargcount, 209 co_nlocals, 210 original_code.co_stacksize, 211 original_code.co_flags, 212 co_code, 213 tuple(co_consts), 214 tuple(co_names), 215 tuple(co_varnames), 216 original_code.co_filename, 217 original_code.co_name, 218 original_code.co_firstlineno, 219 original_code.co_lnotab, 220 original_code.co_freevars, 221 original_code.co_cellvars, 222 )
def
unpack_opargs( code: Union[bytes, Sequence[Union[int, bytes]]], denormalize_values: bool = False):
256 def unpack_opargs(code: BytecodeSequence, denormalize_values: bool = False): 257 ftv: FrontTriggerableVariable = FrontTriggerableVariable(FrontTriggerableVariableType.equal, EXTENDED_ARG) 258 extended_arg = 0 259 real_op_index: Optional[int] = None 260 real_offset: Optional[int] = None 261 need_to_clear_real_data: bool = False 262 263 op = None 264 arg = None 265 offset = -1 266 op_index = -1 267 i = -1 268 for item in code: 269 if isinstance(item, bytes): 270 item = int.from_bytes(item, byteorder='little') 271 272 i += 1 273 if not i % 2: 274 op = item 275 op_index += 1 276 offset = i 277 continue 278 279 if op >= HAVE_ARGUMENT: 280 ftv_result: Optional[bool] = ftv(op) 281 if ftv_result is True: 282 real_op_index = op_index 283 real_offset = offset 284 elif ftv_result is False: 285 need_to_clear_real_data = True 286 287 arg = item | extended_arg 288 if denormalize_values: 289 extended_arg = (arg << 8) if op == EXTENDED_ARG else 0 290 else: 291 arg = None 292 293 yield (op, arg, op_index, offset, real_op_index, real_offset) 294 if need_to_clear_real_data: 295 real_op_index = None 296 real_offset = None 297 need_to_clear_real_data = False
def
find_ops_with_labels( code: Union[bytes, Sequence[Union[int, bytes]]]) -> Tuple[List[int], Dict[int, List[Tuple[int, int, int, int]]]]:
299 def find_ops_with_labels(code: BytecodeSequence) -> Tuple[List[int], Dict[int, List[Tuple[int, int, int, int]]]]: 300 labels: List[int] = list() 301 op_by_label: Dict[int, List[Tuple[int, int, int, int]]] = dict() 302 for op, arg, op_index, offset, real_op_index, real_offset in unpack_opargs(code, True): 303 if arg is not None: 304 if op in hasjrel: 305 label = offset + 2 + arg 306 elif op in hasjabs: 307 label = arg 308 else: 309 continue 310 311 if label not in op_by_label: 312 labels.append(label) 313 op_by_label[label] = list() 314 315 op_by_label[label].append((op, arg, op_index, offset, real_op_index, real_offset)) 316 317 return labels, op_by_label
def
op_index_to_arg(op_index: int) -> int:
def
arg_to_op_index(arg: int) -> int: