cengal.code_flow_control.python_bytecode_manipulator.versions.v_0.python_bytecode_manipulator

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19from collections import namedtuple
 20from dis import Instruction, dis, get_instructions, _get_code_object, code_info, show_code, findlabels, _unpack_opargs
 21from opcode import hasjrel, hasjabs, opname, opmap, HAVE_ARGUMENT, EXTENDED_ARG
 22from cengal.system import PYTHON_VERSION_INT
 23from enum import Enum
 24from typing import Optional, Callable, List, Tuple, Dict, Set, Union, Any, Sequence
 25from types import CodeType
 26from copy import copy
 27from cengal.entities.copyable import CopyableMixin
 28from cengal.data_manipulation.front_triggerable_variable import FrontTriggerableVariableType, FrontTriggerableVariable
 29import sys
 30
 31
 32"""
 33Module Docstring
 34Docstrings: http://www.python.org/dev/peps/pep-0257/
 35"""
 36
 37__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 38__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 39__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 40__license__ = "Apache License, Version 2.0"
 41__version__ = "4.4.1"
 42__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 43__email__ = "gtalk@butenkoms.space"
 44# __status__ = "Prototype"
 45__status__ = "Development"
 46# __status__ = "Production"
 47
 48
 49BytecodeSequence = Union[bytes, Sequence[Union[int, bytes]]]
 50
 51
 52def patch_function(func, co_code):
 53    fn_code = func.__code__
 54    func.__code__ = CodeType(
 55        fn_code.co_argcount,
 56        fn_code.co_kwonlyargcount,
 57        fn_code.co_nlocals,
 58        fn_code.co_stacksize,
 59        fn_code.co_flags,
 60        co_code,
 61        fn_code.co_consts,
 62        fn_code.co_names,
 63        fn_code.co_varnames,
 64        fn_code.co_filename,
 65        fn_code.co_name,
 66        fn_code.co_firstlineno,
 67        fn_code.co_lnotab,
 68        fn_code.co_freevars,
 69        fn_code.co_cellvars,
 70    )
 71
 72
 73CodeParamNames = namedtuple("CodeParamNames", "positional positional_only keyword_only")
 74
 75
 76def code_param_names(code) -> CodeParamNames:
 77    pos_count = code.co_argcount
 78    arg_names = code.co_varnames
 79    positional = arg_names[:pos_count]
 80    posonly_count = code.co_posonlyargcount
 81    positional_only = arg_names[:posonly_count]
 82    keyword_only_count = code.co_kwonlyargcount
 83    keyword_only = arg_names[pos_count:pos_count + keyword_only_count]
 84    return CodeParamNames(positional, positional_only, keyword_only)
 85
 86
 87def code_name(code: CodeType) -> str:
 88    return code.co_name
 89
 90
 91if sys.version_info >= (3, 11):
 92    def code_qualname(code: CodeType) -> str:
 93        return code.co_qualname
 94
 95
 96def get_code(x = None) -> CodeType:
 97    if x is None:
 98        return
 99    
100    if isinstance(x, CodeType):
101        return x
102    
103    # Extract functions from methods.
104    if hasattr(x, '__func__'):
105        x = x.__func__
106    # Extract compiled code objects from...
107    if hasattr(x, '__code__'):  # ...a function, or
108        x = x.__code__
109    elif hasattr(x, 'gi_code'):  #...a generator object, or
110        x = x.gi_code
111    elif hasattr(x, 'ag_code'):  #...an asynchronous generator object, or
112        x = x.ag_code
113    elif hasattr(x, 'cr_code'):  #...a coroutine, or
114        x = x.cr_code
115    elif hasattr(x, 'f_code'):  #...a frame.
116        x = x.f_code
117    # else:
118    #     raise TypeError(f'Expected a code object or an entity with code, but got {type(x)}')
119    
120    return x
121
122
123def has_code(x = None) -> bool:
124    if x is None:
125        return False
126    
127    if isinstance(x, CodeType):
128        return True
129    
130    # Extract functions from methods.
131    if hasattr(x, '__func__'):
132        return True
133    # Extract compiled code objects from...
134    if hasattr(x, '__code__'):  # ...a function, or
135        return True
136    elif hasattr(x, 'gi_code'):  #...a generator object, or
137        return True
138    elif hasattr(x, 'ag_code'):  #...an asynchronous generator object, or
139        return True
140    elif hasattr(x, 'cr_code'):  #...a coroutine, or
141        return True
142    elif hasattr(x, 'f_code'):  #...a frame.
143        return True
144    # else:
145    #     raise TypeError(f'Expected a code object or an entity with code, but got {type(x)}')
146    
147    return False
148
149
150class CodeTypeEnum(Enum):
151    class_or_module = 0
152    code_object = 1
153    raw_bytecode = 2
154    source_code = 3
155    unknown = 4
156
157
158def code_type(x=None) -> Optional[CodeTypeEnum]:
159    """_summary_
160
161    Args:
162        x (_type_, optional): result of get_code() function. Defaults to None.
163
164    Returns:
165        Optional[CodeTypeEnum]: _description_
166    """
167    if x is None:
168        return
169    
170    # Perform the disassembly.
171    if hasattr(x, '__dict__'):  # Class or module
172        return CodeTypeEnum.class_or_module
173    elif hasattr(x, 'co_code'): # Code object
174        return CodeTypeEnum.code_object
175    elif isinstance(x, (bytes, bytearray)): # Raw bytecode
176        return CodeTypeEnum.raw_bytecode
177    elif isinstance(x, str):    # Source code
178        return CodeTypeEnum.source_code
179    else:
180        return CodeTypeEnum.unknown
181
182
183def set_code(x, code: CodeType):
184    # Extract functions from methods.
185    if hasattr(x, '__func__'):
186        x.__func__ = code
187    # Extract compiled code objects from...
188    if hasattr(x, '__code__'):  # ...a function, or
189        x.__code__ = code
190    elif hasattr(x, 'gi_code'):  #...a generator object, or
191        x.gi_code = code
192    elif hasattr(x, 'ag_code'):  #...an asynchronous generator object, or
193        x.ag_code = code
194    elif hasattr(x, 'cr_code'):  #...a coroutine.
195        x.cr_code = code
196    
197    return x
198
199
200if sys.version_info < (3, 11):
201# if (3, 11) > PYTHON_VERSION_INT[:1]:
202    def modify_code(original_code: CodeType, co_code, co_consts, co_names, co_varnames):
203        co_nlocals = len(co_varnames)
204        return CodeType(
205            original_code.co_argcount,
206            original_code.co_posonlyargcount,
207            original_code.co_kwonlyargcount,
208            co_nlocals,
209            original_code.co_stacksize,
210            original_code.co_flags,
211            co_code,
212            tuple(co_consts),
213            tuple(co_names),
214            tuple(co_varnames),
215            original_code.co_filename,
216            original_code.co_name,
217            original_code.co_firstlineno,
218            original_code.co_lnotab,
219            original_code.co_freevars,
220            original_code.co_cellvars,
221        )
222
223
224    # def unpack_opargs_original(code, denormalize_values: bool = False):
225    #     ftv: FrontTriggerableVariable = FrontTriggerableVariable(FrontTriggerableVariableType.equal, EXTENDED_ARG)
226    #     extended_arg = 0
227    #     real_op_index: Optional[int] = None
228    #     real_byte_index: Optional[int] = None
229    #     need_to_clear_real_data: bool = False
230
231    #     op_index = -1
232    #     for i in range(0, len(code), 2):
233    #         op_index += 1
234    #         op = code[i]
235    #         if op >= HAVE_ARGUMENT:
236    #             ftv_result: Optional[bool] = ftv(op)
237    #             if ftv_result is True:
238    #                 real_op_index = op_index
239    #                 real_byte_index = i
240    #             elif ftv_result is False:
241    #                 need_to_clear_real_data = True
242
243    #             arg = code[i+1] | extended_arg
244    #             if denormalize_values:
245    #                 extended_arg = (arg << 8) if op == EXTENDED_ARG else 0
246    #         else:
247    #             arg = None
248            
249    #         yield (op, arg, op_index, i, real_op_index, real_byte_index)
250    #         if need_to_clear_real_data:
251    #             real_op_index = None
252    #             real_byte_index = None
253    #             need_to_clear_real_data = False
254
255    def unpack_opargs(code: BytecodeSequence, denormalize_values: bool = False):
256        ftv: FrontTriggerableVariable = FrontTriggerableVariable(FrontTriggerableVariableType.equal, EXTENDED_ARG)
257        extended_arg = 0
258        real_op_index: Optional[int] = None
259        real_offset: Optional[int] = None
260        need_to_clear_real_data: bool = False
261
262        op = None
263        arg = None
264        offset = -1
265        op_index = -1
266        i = -1
267        for item in code:
268            if isinstance(item, bytes):
269                item = int.from_bytes(item, byteorder='little')
270            
271            i += 1
272            if not i % 2:
273                op = item
274                op_index += 1
275                offset = i
276                continue
277            
278            if op >= HAVE_ARGUMENT:
279                ftv_result: Optional[bool] = ftv(op)
280                if ftv_result is True:
281                    real_op_index = op_index
282                    real_offset = offset
283                elif ftv_result is False:
284                    need_to_clear_real_data = True
285
286                arg = item | extended_arg
287                if denormalize_values:
288                    extended_arg = (arg << 8) if op == EXTENDED_ARG else 0
289            else:
290                arg = None
291            
292            yield (op, arg, op_index, offset, real_op_index, real_offset)
293            if need_to_clear_real_data:
294                real_op_index = None
295                real_offset = None
296                need_to_clear_real_data = False
297
298    def find_ops_with_labels(code: BytecodeSequence) -> Tuple[List[int], Dict[int, List[Tuple[int, int, int, int]]]]:
299        labels: List[int] = list()
300        op_by_label: Dict[int, List[Tuple[int, int, int, int]]] = dict()
301        for op, arg, op_index, offset, real_op_index, real_offset in unpack_opargs(code, True):
302            if arg is not None:
303                if op in hasjrel:
304                    label = offset + 2 + arg
305                elif op in hasjabs:
306                    label = arg
307                else:
308                    continue
309
310                if label not in op_by_label:
311                    labels.append(label)
312                    op_by_label[label] = list()
313                
314                op_by_label[label].append((op, arg, op_index, offset, real_op_index, real_offset))
315
316        return labels, op_by_label
317    
318    
319    def op_index_to_arg(op_index: int) -> int:
320        return op_index * 2
321    
322
323    def arg_to_op_index(arg: int) -> int:
324        return arg // 2
325
326elif sys.version_info == (3, 11):
327    from dis import _is_backward_jump
328
329    def find_ops_with_labels(code):
330        labels = list()
331        op_by_label = dict()
332        index = -1
333        for offset, op, arg in _unpack_opargs(code):
334            index += 1
335            if arg is not None:
336                if op in hasjrel:
337                    if _is_backward_jump(op):  # inefficient implementation of the _is_backward_jump() function. Need to use set or list instead of string manipulations
338                        arg = -arg
339
340                    label = offset + 2 + arg*2  # In 3.10: this is an instruction index - not a byte index. Need to be fixed and tested
341                elif op in hasjabs:
342                    label = arg*2
343                else:
344                    continue
345
346                if label not in op_by_label:
347                    labels.append(label)
348                    op_by_label[label] = list()
349                
350                op_by_label[label].append((index, offset, op, arg))
351
352        return labels, op_by_label
353else:
354    import warnings
355    warnings.warn(f'Unsupported Python Version: {PYTHON_VERSION_INT}')
356    # raise RuntimeError(f'Unsupported Python Version: {PYTHON_VERSION_INT}')
357
358
359class OpSequenceOffsetMap(CopyableMixin):
360    def __init__(self, op_num: int = 0):
361        self.op_num: int = op_num
362        self.range: Optional[slice] = slice(0, op_num) if op_num else None
363        self.new_by_original: Dict[int, int] = dict()
364        self.original_by_new: Dict[int, int] = dict()
365        if op_num < 0:
366            op_num = 0
367        
368        for index in range(op_num):
369            # bindex = index * 2
370            bindex = index
371            self.new_by_original[bindex] = bindex
372            self.original_by_new[bindex] = bindex
373    
374    def mapped_range(self) -> slice:
375        return slice(min(self.original_by_new), max(self.original_by_new))
376
377    
378    def remove_slice(self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0):
379        if not op_num:
380            return
381        
382        offset_change_start = op_index
383        del_start = op_index
384        del_stop = op_index + op_num
385
386        new_by_original_buff = self.new_by_original
387        self.new_by_original = type(self.new_by_original)()
388        to_be_deleted: List[Tuple[int, int]] = list()
389        for original, new in new_by_original_buff.items():
390            if del_start <= new < del_stop:
391                to_be_deleted.append((original, new))
392        
393        to_be_deleted.sort(key=lambda x: x[1])
394        if preserver_index_for_first_x_op:
395            to_be_deleted = to_be_deleted[preserver_index_for_first_x_op:]
396        
397        for original, new in to_be_deleted:
398            del self.new_by_original[original]
399        
400        original_by_new_buff = self.original_by_new
401        self.original_by_new = type(self.original_by_new)()
402        to_be_deleted = list()
403        for new, original in original_by_new_buff.items():
404            if del_start <= new < del_stop:
405                to_be_deleted.append((new, original))
406        
407        to_be_deleted.sort(key=lambda x: x[0])
408        if preserver_index_for_first_x_op:
409            ignored = to_be_deleted[:preserver_index_for_first_x_op]
410            if ignored:
411                last_ignored = ignored[-1]
412                last_ignored_new = last_ignored[0]
413                offset_change_start = last_ignored_new + 1
414            
415            to_be_deleted = to_be_deleted[preserver_index_for_first_x_op:]
416        
417        for new, original in to_be_deleted:
418            del self.original_by_new[new]
419        
420        self.add_offset(-op_num, offset_change_start)
421        self.op_num -= op_num
422        self.range = slice(self.range.start, self.range.stop - op_num)
423    
424    def insert_slice(self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0):
425        if not op_num:
426            return
427        
428        self.add_offset(op_num, op_index + preserver_index_for_first_x_op)
429        self.op_num += op_num
430        self.range = slice(self.range.start, self.range.stop + op_num)
431    
432    def insert_op_sequence_offset(self, op_index: int, op_sequence_offset_map: 'OpSequenceOffsetMap', insertion_id: Any, preserver_index_for_first_x_op: int = 0):
433        op_num: int = op_sequence_offset_map.op_num
434        if not op_num:
435            return
436        
437        sequence_range: slice = op_sequence_offset_map.range
438        offset = op_index - sequence_range.start
439        self.insert_slice(op_index, op_num, preserver_index_for_first_x_op)
440        for new, original in op_sequence_offset_map.original_by_new.items():
441            self.original_by_new[new + offset] = (insertion_id, new)
442    
443    def add_offset(self, op_offset: int, op_start: int = 0):
444        if not op_offset:
445            return
446        
447        new_by_original_buff = self.new_by_original
448        self.new_by_original = type(self.new_by_original)()
449        for original, new in new_by_original_buff.items():
450            if new >= op_start:
451                new += op_offset
452            
453            self.new_by_original[original] = new
454        
455        original_by_new_buff = self.original_by_new
456        self.original_by_new = type(self.original_by_new)()
457        for new, original in original_by_new_buff.items():
458            if new >= op_start:
459                new += op_offset
460            
461            self.original_by_new[new] = original
462    
463    def shift(self, op_offset: int):
464        self.add_offset(op_offset)
465        self.range = slice(self.range.start + op_offset, self.range.stop + op_offset)
466    
467    def shift_to_absolute(self, op_index: int):
468        op_offset: int = op_index - self.range.start
469        self.add_offset(op_offset)
470        self.range = slice(self.range.start + op_offset, self.range.stop + op_offset)
471    
472    def update(self, op_sequence_offset_map: 'OpSequenceOffsetMap', op_offset: int = 0):
473        if op_offset:
474            op_sequence_offset_map = op_sequence_offset_map.copy()
475            op_sequence_offset_map.add_offset(op_offset)
476        
477        self.new_by_original.update(op_sequence_offset_map.new_by_original)
478        self.original_by_new.update(op_sequence_offset_map.original_by_new)
479    
480    def __getitem__(self, index: Union[int, slice]) -> 'OpSequenceOffsetMap':
481        if isinstance(index, int):
482            index = slice(index, index + 1)
483        
484        op_num = index.stop - index.start
485
486        new_by_original_buff = self.new_by_original
487        new_by_original = type(new_by_original_buff)()
488        for original, new in new_by_original_buff.items():
489            if index.start <= new < index.stop:
490                new_by_original[original] = new
491        
492        original_by_new_buff = self.original_by_new
493        original_by_new = type(original_by_new_buff)()
494        for new, original in original_by_new_buff.items():
495            if index.start <= new < index.stop:
496                original_by_new[new] = original
497
498        result = OpSequenceOffsetMap()
499        result.op_num = op_num
500        result.range = index
501        result.new_by_original = new_by_original
502        result.original_by_new = original_by_new
503        return result
504
505    def copy(self):
506        cls = self.__class__
507        result = cls.__new__(cls)
508        result.__dict__['new_by_original'] = copy(self.new_by_original)
509        result.__dict__['original_by_new'] = copy(self.original_by_new)
510        result.__dict__['op_num'] = self.op_num
511        result.__dict__['range'] = copy(self.range)
512        return result
513
514
515def opcode(name: str) -> int:
516    name = name.upper()
517    return opmap[name]
518
519
520def opcode_name(opcode: int) -> str:
521    return opname[opcode]
522
523
524class OpSequence:
525    extended_arg_opcode_int = opcode('EXTENDED_ARG')
526    extended_arg_opcode_byte = extended_arg_opcode_int.to_bytes(1, byteorder='little')
527
528    def __init__(self, op_sequence: Optional[List[Tuple[int, int, int, int, int, int]]] = None):
529        self.op_sequence: List[Tuple[int, int, int, int, int, int]] = list() if op_sequence is None else op_sequence
530        self.op_sequence_offset_map: OpSequenceOffsetMap = OpSequenceOffsetMap(len(self.op_sequence))
531    
532    def __len__(self):
533        return len(self.op_sequence)
534    
535    def read_slice(self, place: slice) -> 'OpSequence':
536        result = OpSequence()
537        result.op_sequence = self.op_sequence[place]
538        result.op_sequence_offset_map = self.op_sequence_offset_map[place]
539        result.op_sequence_offset_map.shift_to_absolute(0)
540        return result
541    
542    def remove_slice(self, place: slice, preserver_index_for_first_x_op: int = 0):
543        self.op_sequence[place] = []
544        self.op_sequence_offset_map.remove_slice(place.start, place.stop - place.start, preserver_index_for_first_x_op)
545    
546    def insert_op_sequence(self, index: int, op_sequence: 'OpSequence', preserver_index_for_first_x_op: int = 0):
547        self.op_sequence[index: index] = op_sequence.op_sequence
548        op_sequence.op_sequence_offset_map.shift_to_absolute(index)
549        self.op_sequence_offset_map.insert_op_sequence_offset(index, op_sequence.op_sequence_offset_map, op_sequence, preserver_index_for_first_x_op)
550        return op_sequence
551    
552    def normalize_instructions_arg(self):
553        index = -1
554        while True:
555            index += 1
556            if index >= len(self.op_sequence):
557                break
558
559            op, arg, op_index, offset, real_op_index, real_offset = self.op_sequence[index]
560            if arg is not None:
561                if arg > 255:
562                    arg = arg_to_bytes(arg)
563                    if real_op_index is not None:
564                        real_op_index_delta = real_op_index - op_index
565                        real_op_index = index - real_op_index_delta
566                        slice_to_delete = slice(real_op_index, index)
567                        self.remove_slice(slice_to_delete, 1)
568                        index = real_op_index
569
570                    extended_arg: bytes = arg[1:]
571                    extended_arg_len = len(extended_arg)
572                    arg = arg[0]
573                    self.op_sequence[index] = (op, arg, index + extended_arg_len, (index + extended_arg_len) * 2, index, index * 2)
574                    op_sub_sequence_instructions = list()
575                    for extended_arg_int in reversed(extended_arg):
576                        op_sub_sequence_instructions.append(make_instruction('EXTENDED_ARG', extended_arg_int))
577                    
578                    sub_op_sequence: 'OpSequence' = OpSequence.from_instructions_fast(op_sub_sequence_instructions)
579                    self.insert_op_sequence(index, sub_op_sequence, 1)
580                    index += len(sub_op_sequence.op_sequence)
581    
582    def denormalize_instructions_arg(self):
583        raise NotImplementedError
584    
585    def find_op(self, op_index: int) -> Optional[slice]:
586        start_index = None
587        end_index = None
588
589        index = op_index - 1
590        while True:
591            index += 1
592            if index >= len(self.op_sequence):
593                break
594
595            current_op, current_arg, _, _, _, _ = self.op_sequence[index]
596            if current_op == OpSequence.extended_arg_opcode_int:
597                continue
598            else:
599                end_index = index + 1
600                break
601        
602        previous_op_is_extended_arg = None
603        index = op_index
604        while True:
605            index -= 1
606            if index < 0:
607                if previous_op_is_extended_arg:
608                    start_index = 0
609                
610                break
611            
612            current_op, current_arg, _, _, _, _ = self.op_sequence[index]
613            if current_op == OpSequence.extended_arg_opcode_int:
614                previous_op_is_extended_arg = True
615                continue
616            else:
617                previous_op_is_extended_arg = False
618                start_index = index + 1
619                break
620        
621        if (start_index is None) or (end_index is None):
622            return None
623        
624        return slice(start_index, end_index)
625    
626    def get_arg(self, op_index: Union[int, slice]) -> bytes:
627        if isinstance(op_index, int):
628            op_index = self.find_op(op_index)
629        
630        if (op_index is None) or ((op_index.stop - op_index.start) < 1):
631            raise RuntimeError('OP not found by index')
632        
633        op_slice = self.op_sequence[op_index]
634        if (op_index.stop - op_index.start) > 1:
635            arg_list = list()
636            for op, arg, _, _, _, _ in reversed(op_slice):
637                arg_list.append(arg)
638            
639            return arg_to_int(bytes(arg_list))
640        else:
641            return op_slice[0][1]
642    
643    def set_arg(self, op_index: Union[int, slice], new_arg: Union[None, int, bytes]):
644        if isinstance(op_index, int):
645            op_index = self.find_op(op_index)
646        
647        op_index_len = op_index.stop - op_index.start
648        if (op_index is None) or (op_index_len < 1):
649            raise RuntimeError('OP not found by index')
650        
651        new_arg = arg_to_int(new_arg)
652        current_op, current_arg, current_op_index, current_offset, current_real_op_index, current_real_offset = self.op_sequence[op_index.stop - 1]
653        sub_op_sequence_bytes = [current_op, new_arg]
654        sub_op_sequence: OpSequence = OpSequence(list(unpack_opargs(sub_op_sequence_bytes)))
655        sub_op_sequence.normalize_instructions_arg()
656        real_arg = sub_op_sequence.op_sequence[-1][1]
657        sub_op_sequence_len = len(sub_op_sequence.op_sequence)
658        extended_args_len = sub_op_sequence_len - 1
659        if extended_args_len:
660            current_real_op_index = op_index.start
661            current_real_offset = current_real_op_index * 2
662            current_op_index = current_real_op_index + extended_args_len
663            current_offset = current_op_index * 2
664        else:
665            current_op_index = op_index.start
666            current_offset = current_op_index * 2
667            current_real_op_index = None
668            current_real_offset = None
669
670        self.op_sequence[op_index.stop - 1] = (current_op, real_arg, current_op_index, current_offset, current_real_op_index, current_real_offset)
671
672        if sub_op_sequence_len > 1:
673            sub_op_sequence.remove_slice(slice(sub_op_sequence_len - 1, sub_op_sequence_len))
674            if op_index_len > 1:
675                self.remove_slice(slice(op_index.start, op_index.stop - 1), 1)
676            
677            self.insert_op_sequence(op_index.start, sub_op_sequence, 1)
678
679    def fix_labels(self, op_by_label: Dict[int, List[int]]):
680        labels = list(op_by_label.keys())
681        labels.sort()
682        for label, data in op_by_label.items():
683            data.sort()
684        
685        for label in labels:
686            labeled_op_index_new = self.op_sequence_offset_map.new_by_original[label]
687            label_data = op_by_label[label]
688            while True:
689                for op_index in label_data:
690                    op_index_new = self.op_sequence_offset_map.new_by_original[op_index]
691                    op = self.op_sequence[op_index_new][0]
692                    new_arg = op_index_to_arg(labeled_op_index_new)
693                    if op in hasjrel:
694                        new_arg = new_arg - (op_index_to_arg(op_index_new) + 2)
695                    
696                    self.set_arg(op_index_new, new_arg)
697                
698                labeled_op_index_new_after_value_change = self.op_sequence_offset_map.new_by_original[label]
699                if labeled_op_index_new_after_value_change == labeled_op_index_new:
700                    break
701                else:
702                    labeled_op_index_new = labeled_op_index_new_after_value_change
703
704    def to_sequence_of_ints(self):
705        for op, arg, _, _, _, _ in self.op_sequence:
706            yield op
707            if arg is None:
708                yield 0
709            else:
710                yield arg
711
712    def to_bytes(self) -> bytes:
713        return bytes(self.to_sequence_of_ints())
714        
715    @staticmethod
716    def from_bytecode_sequence(code: BytecodeSequence):
717        labels, op_by_label = find_ops_with_labels(code)
718        return OpSequence(list(unpack_opargs(code))), labels, op_by_label
719    
720    @staticmethod
721    def from_entity(entity):
722        code_bytes: BytecodeSequence = _get_code_object(entity).co_code
723        return OpSequence.from_bytecode_sequence(code_bytes)
724    
725    @staticmethod
726    def from_instructions(instructions: Sequence[Instruction]):
727        op_sequence: List = list()
728        for instruction in instructions:
729            arg = instruction.arg
730            if arg is None:
731                arg = 0
732
733            op_sequence.extend((instruction.opcode, arg))
734
735        labels, op_by_label = find_ops_with_labels(op_sequence)
736        result = OpSequence(list(unpack_opargs(op_sequence)))
737        result.normalize_instructions_arg()
738        return result, labels, op_by_label
739    
740    @staticmethod
741    def from_instructions_fast(instructions: Sequence[Instruction]):
742        return OpSequence(list(unpack_opargs(instructions_to_sequence_of_ints(normalize_instructions_arg(instructions)))))
743
744
745def arg_to_bytes(arg: Union[int, bytes]) -> bytes:
746    if isinstance(arg, int):
747        arg = arg.to_bytes(4, byteorder='little')
748        if len(arg) > 1:
749            arg = arg.rstrip(b'\x00')
750            
751        if not arg:
752            arg = b'\x00'
753    
754    return arg
755
756
757def arg_to_int(arg: Union[int, bytes]) -> int:
758    if isinstance(arg, bytes):
759        arg = int.from_bytes(arg, byteorder='little')
760    
761    return arg
762
763
764def get_raw_instructions(x, *, first_line=None):
765    readable_instructions = get_instructions(x, first_line)
766    for instruction in readable_instructions:
767        arg = instruction.arg
768        if arg is not None:
769            if isinstance(arg, int):
770                arg = arg.to_bytes(4, byteorder='little')
771            
772            arg = arg[0]
773
774        yield Instruction(instruction.opname, instruction.opcode, arg, instruction.argval, instruction.argrepr, instruction.offset, instruction.starts_line, instruction.is_jump_target)
775
776
777def normalize_instructions_arg(instructions: Sequence[Instruction]):
778    for instruction in instructions:
779        arg = instruction.arg
780        if arg is not None:
781            arg = arg_to_bytes(arg)
782            extended_arg = arg[1:]
783            arg = arg[0]
784            for extended_arg_int in reversed(extended_arg):
785                yield make_instruction('EXTENDED_ARG', extended_arg_int)
786
787        yield Instruction(instruction.opname, instruction.opcode, arg, instruction.argval, instruction.argrepr, instruction.offset, instruction.starts_line, instruction.is_jump_target)
788
789
790def instructions_to_sequence_of_ints(instructions: Sequence[Instruction]):
791    for instruction in instructions:
792        op = instruction.opcode
793        yield op
794        arg = instruction.arg
795        yield 0 if arg is None else arg
796
797
798def instructions_to_bytes(instructions: Sequence[Instruction]) -> bytes:
799    return bytes(instructions_to_sequence_of_ints(instructions))
800
801
802def make_instruction(name: str, arg: Union[None, int, bytes] = None) -> Instruction:
803    name = name.upper()
804    op = opmap[name]
805    arg = arg_to_int(arg)
806    return Instruction(name, op, arg, None, None, None, None, None)
807
808
809mi = make_instruction
810
811
812def fix_labels(op_sequence: OpSequence, op_by_label: Dict[int, List[Tuple[int, int, int, int, int, int]]]):
813    op_sequence.fix_labels({arg_to_op_index(label): [op_info[2] for op_info in data] for label, data in op_by_label.items()})
814
815
816def code_info(x):
817    """Formatted details of methods, functions, or code."""
818    return _format_code_info(_get_code_object(x))
819
820
821def _pr(name, data):
822    print(f'<<{name}>> type: {type(data)}; value: {data}')
823
824
825def _format_code_info(co):
826    _pr('Name', co.co_name)
827    _pr('Filename', co.co_filename)
828    _pr('Argument count', co.co_argcount)
829    _pr('Positional-only arguments', co.co_posonlyargcount)
830    _pr('Kw-only arguments', co.co_kwonlyargcount)
831    _pr('Number of locals', co.co_nlocals)
832    _pr('Stack size', co.co_stacksize)
833    _pr('Flags', co.co_flags)
834    _pr('Constants', co.co_consts)
835    _pr('Names', co.co_names)
836    _pr('Variable names', co.co_varnames)
837    _pr('Free variables', co.co_freevars)
838    _pr('Cell variables', co.co_cellvars)
BytecodeSequence = typing.Union[bytes, typing.Sequence[typing.Union[int, bytes]]]
def patch_function(func, co_code):
53def patch_function(func, co_code):
54    fn_code = func.__code__
55    func.__code__ = CodeType(
56        fn_code.co_argcount,
57        fn_code.co_kwonlyargcount,
58        fn_code.co_nlocals,
59        fn_code.co_stacksize,
60        fn_code.co_flags,
61        co_code,
62        fn_code.co_consts,
63        fn_code.co_names,
64        fn_code.co_varnames,
65        fn_code.co_filename,
66        fn_code.co_name,
67        fn_code.co_firstlineno,
68        fn_code.co_lnotab,
69        fn_code.co_freevars,
70        fn_code.co_cellvars,
71    )
class CodeParamNames(builtins.tuple):

CodeParamNames(positional, positional_only, keyword_only)

CodeParamNames(positional, positional_only, keyword_only)

Create new instance of CodeParamNames(positional, positional_only, keyword_only)

positional

Alias for field number 0

positional_only

Alias for field number 1

keyword_only

Alias for field number 2

Inherited Members
builtins.tuple
index
count
def code_param_names( code) -> CodeParamNames:
77def code_param_names(code) -> CodeParamNames:
78    pos_count = code.co_argcount
79    arg_names = code.co_varnames
80    positional = arg_names[:pos_count]
81    posonly_count = code.co_posonlyargcount
82    positional_only = arg_names[:posonly_count]
83    keyword_only_count = code.co_kwonlyargcount
84    keyword_only = arg_names[pos_count:pos_count + keyword_only_count]
85    return CodeParamNames(positional, positional_only, keyword_only)
def code_name(code: code) -> str:
88def code_name(code: CodeType) -> str:
89    return code.co_name
def get_code(x=None) -> code:
 97def get_code(x = None) -> CodeType:
 98    if x is None:
 99        return
100    
101    if isinstance(x, CodeType):
102        return x
103    
104    # Extract functions from methods.
105    if hasattr(x, '__func__'):
106        x = x.__func__
107    # Extract compiled code objects from...
108    if hasattr(x, '__code__'):  # ...a function, or
109        x = x.__code__
110    elif hasattr(x, 'gi_code'):  #...a generator object, or
111        x = x.gi_code
112    elif hasattr(x, 'ag_code'):  #...an asynchronous generator object, or
113        x = x.ag_code
114    elif hasattr(x, 'cr_code'):  #...a coroutine, or
115        x = x.cr_code
116    elif hasattr(x, 'f_code'):  #...a frame.
117        x = x.f_code
118    # else:
119    #     raise TypeError(f'Expected a code object or an entity with code, but got {type(x)}')
120    
121    return x
def has_code(x=None) -> bool:
124def has_code(x = None) -> bool:
125    if x is None:
126        return False
127    
128    if isinstance(x, CodeType):
129        return True
130    
131    # Extract functions from methods.
132    if hasattr(x, '__func__'):
133        return True
134    # Extract compiled code objects from...
135    if hasattr(x, '__code__'):  # ...a function, or
136        return True
137    elif hasattr(x, 'gi_code'):  #...a generator object, or
138        return True
139    elif hasattr(x, 'ag_code'):  #...an asynchronous generator object, or
140        return True
141    elif hasattr(x, 'cr_code'):  #...a coroutine, or
142        return True
143    elif hasattr(x, 'f_code'):  #...a frame.
144        return True
145    # else:
146    #     raise TypeError(f'Expected a code object or an entity with code, but got {type(x)}')
147    
148    return False
class CodeTypeEnum(enum.Enum):
151class CodeTypeEnum(Enum):
152    class_or_module = 0
153    code_object = 1
154    raw_bytecode = 2
155    source_code = 3
156    unknown = 4

An enumeration.

class_or_module = <CodeTypeEnum.class_or_module: 0>
code_object = <CodeTypeEnum.code_object: 1>
raw_bytecode = <CodeTypeEnum.raw_bytecode: 2>
source_code = <CodeTypeEnum.source_code: 3>
unknown = <CodeTypeEnum.unknown: 4>
Inherited Members
enum.Enum
name
value
def code_type( x=None) -> Union[CodeTypeEnum, NoneType]:
159def code_type(x=None) -> Optional[CodeTypeEnum]:
160    """_summary_
161
162    Args:
163        x (_type_, optional): result of get_code() function. Defaults to None.
164
165    Returns:
166        Optional[CodeTypeEnum]: _description_
167    """
168    if x is None:
169        return
170    
171    # Perform the disassembly.
172    if hasattr(x, '__dict__'):  # Class or module
173        return CodeTypeEnum.class_or_module
174    elif hasattr(x, 'co_code'): # Code object
175        return CodeTypeEnum.code_object
176    elif isinstance(x, (bytes, bytearray)): # Raw bytecode
177        return CodeTypeEnum.raw_bytecode
178    elif isinstance(x, str):    # Source code
179        return CodeTypeEnum.source_code
180    else:
181        return CodeTypeEnum.unknown

_summary_

Args: x (_type_, optional): result of get_code() function. Defaults to None.

Returns: Optional[CodeTypeEnum]: _description_

def set_code(x, code: code):
184def set_code(x, code: CodeType):
185    # Extract functions from methods.
186    if hasattr(x, '__func__'):
187        x.__func__ = code
188    # Extract compiled code objects from...
189    if hasattr(x, '__code__'):  # ...a function, or
190        x.__code__ = code
191    elif hasattr(x, 'gi_code'):  #...a generator object, or
192        x.gi_code = code
193    elif hasattr(x, 'ag_code'):  #...an asynchronous generator object, or
194        x.ag_code = code
195    elif hasattr(x, 'cr_code'):  #...a coroutine.
196        x.cr_code = code
197    
198    return x
class OpSequenceOffsetMap(cengal.entities.copyable.versions.v_0.copyable.CopyableMixin):
360class OpSequenceOffsetMap(CopyableMixin):
361    def __init__(self, op_num: int = 0):
362        self.op_num: int = op_num
363        self.range: Optional[slice] = slice(0, op_num) if op_num else None
364        self.new_by_original: Dict[int, int] = dict()
365        self.original_by_new: Dict[int, int] = dict()
366        if op_num < 0:
367            op_num = 0
368        
369        for index in range(op_num):
370            # bindex = index * 2
371            bindex = index
372            self.new_by_original[bindex] = bindex
373            self.original_by_new[bindex] = bindex
374    
375    def mapped_range(self) -> slice:
376        return slice(min(self.original_by_new), max(self.original_by_new))
377
378    
379    def remove_slice(self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0):
380        if not op_num:
381            return
382        
383        offset_change_start = op_index
384        del_start = op_index
385        del_stop = op_index + op_num
386
387        new_by_original_buff = self.new_by_original
388        self.new_by_original = type(self.new_by_original)()
389        to_be_deleted: List[Tuple[int, int]] = list()
390        for original, new in new_by_original_buff.items():
391            if del_start <= new < del_stop:
392                to_be_deleted.append((original, new))
393        
394        to_be_deleted.sort(key=lambda x: x[1])
395        if preserver_index_for_first_x_op:
396            to_be_deleted = to_be_deleted[preserver_index_for_first_x_op:]
397        
398        for original, new in to_be_deleted:
399            del self.new_by_original[original]
400        
401        original_by_new_buff = self.original_by_new
402        self.original_by_new = type(self.original_by_new)()
403        to_be_deleted = list()
404        for new, original in original_by_new_buff.items():
405            if del_start <= new < del_stop:
406                to_be_deleted.append((new, original))
407        
408        to_be_deleted.sort(key=lambda x: x[0])
409        if preserver_index_for_first_x_op:
410            ignored = to_be_deleted[:preserver_index_for_first_x_op]
411            if ignored:
412                last_ignored = ignored[-1]
413                last_ignored_new = last_ignored[0]
414                offset_change_start = last_ignored_new + 1
415            
416            to_be_deleted = to_be_deleted[preserver_index_for_first_x_op:]
417        
418        for new, original in to_be_deleted:
419            del self.original_by_new[new]
420        
421        self.add_offset(-op_num, offset_change_start)
422        self.op_num -= op_num
423        self.range = slice(self.range.start, self.range.stop - op_num)
424    
425    def insert_slice(self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0):
426        if not op_num:
427            return
428        
429        self.add_offset(op_num, op_index + preserver_index_for_first_x_op)
430        self.op_num += op_num
431        self.range = slice(self.range.start, self.range.stop + op_num)
432    
433    def insert_op_sequence_offset(self, op_index: int, op_sequence_offset_map: 'OpSequenceOffsetMap', insertion_id: Any, preserver_index_for_first_x_op: int = 0):
434        op_num: int = op_sequence_offset_map.op_num
435        if not op_num:
436            return
437        
438        sequence_range: slice = op_sequence_offset_map.range
439        offset = op_index - sequence_range.start
440        self.insert_slice(op_index, op_num, preserver_index_for_first_x_op)
441        for new, original in op_sequence_offset_map.original_by_new.items():
442            self.original_by_new[new + offset] = (insertion_id, new)
443    
444    def add_offset(self, op_offset: int, op_start: int = 0):
445        if not op_offset:
446            return
447        
448        new_by_original_buff = self.new_by_original
449        self.new_by_original = type(self.new_by_original)()
450        for original, new in new_by_original_buff.items():
451            if new >= op_start:
452                new += op_offset
453            
454            self.new_by_original[original] = new
455        
456        original_by_new_buff = self.original_by_new
457        self.original_by_new = type(self.original_by_new)()
458        for new, original in original_by_new_buff.items():
459            if new >= op_start:
460                new += op_offset
461            
462            self.original_by_new[new] = original
463    
464    def shift(self, op_offset: int):
465        self.add_offset(op_offset)
466        self.range = slice(self.range.start + op_offset, self.range.stop + op_offset)
467    
468    def shift_to_absolute(self, op_index: int):
469        op_offset: int = op_index - self.range.start
470        self.add_offset(op_offset)
471        self.range = slice(self.range.start + op_offset, self.range.stop + op_offset)
472    
473    def update(self, op_sequence_offset_map: 'OpSequenceOffsetMap', op_offset: int = 0):
474        if op_offset:
475            op_sequence_offset_map = op_sequence_offset_map.copy()
476            op_sequence_offset_map.add_offset(op_offset)
477        
478        self.new_by_original.update(op_sequence_offset_map.new_by_original)
479        self.original_by_new.update(op_sequence_offset_map.original_by_new)
480    
481    def __getitem__(self, index: Union[int, slice]) -> 'OpSequenceOffsetMap':
482        if isinstance(index, int):
483            index = slice(index, index + 1)
484        
485        op_num = index.stop - index.start
486
487        new_by_original_buff = self.new_by_original
488        new_by_original = type(new_by_original_buff)()
489        for original, new in new_by_original_buff.items():
490            if index.start <= new < index.stop:
491                new_by_original[original] = new
492        
493        original_by_new_buff = self.original_by_new
494        original_by_new = type(original_by_new_buff)()
495        for new, original in original_by_new_buff.items():
496            if index.start <= new < index.stop:
497                original_by_new[new] = original
498
499        result = OpSequenceOffsetMap()
500        result.op_num = op_num
501        result.range = index
502        result.new_by_original = new_by_original
503        result.original_by_new = original_by_new
504        return result
505
506    def copy(self):
507        cls = self.__class__
508        result = cls.__new__(cls)
509        result.__dict__['new_by_original'] = copy(self.new_by_original)
510        result.__dict__['original_by_new'] = copy(self.original_by_new)
511        result.__dict__['op_num'] = self.op_num
512        result.__dict__['range'] = copy(self.range)
513        return result
OpSequenceOffsetMap(op_num: int = 0)
361    def __init__(self, op_num: int = 0):
362        self.op_num: int = op_num
363        self.range: Optional[slice] = slice(0, op_num) if op_num else None
364        self.new_by_original: Dict[int, int] = dict()
365        self.original_by_new: Dict[int, int] = dict()
366        if op_num < 0:
367            op_num = 0
368        
369        for index in range(op_num):
370            # bindex = index * 2
371            bindex = index
372            self.new_by_original[bindex] = bindex
373            self.original_by_new[bindex] = bindex
op_num: int
range: Union[slice, NoneType]
new_by_original: Dict[int, int]
original_by_new: Dict[int, int]
def mapped_range(self) -> slice:
375    def mapped_range(self) -> slice:
376        return slice(min(self.original_by_new), max(self.original_by_new))
def remove_slice( self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0):
379    def remove_slice(self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0):
380        if not op_num:
381            return
382        
383        offset_change_start = op_index
384        del_start = op_index
385        del_stop = op_index + op_num
386
387        new_by_original_buff = self.new_by_original
388        self.new_by_original = type(self.new_by_original)()
389        to_be_deleted: List[Tuple[int, int]] = list()
390        for original, new in new_by_original_buff.items():
391            if del_start <= new < del_stop:
392                to_be_deleted.append((original, new))
393        
394        to_be_deleted.sort(key=lambda x: x[1])
395        if preserver_index_for_first_x_op:
396            to_be_deleted = to_be_deleted[preserver_index_for_first_x_op:]
397        
398        for original, new in to_be_deleted:
399            del self.new_by_original[original]
400        
401        original_by_new_buff = self.original_by_new
402        self.original_by_new = type(self.original_by_new)()
403        to_be_deleted = list()
404        for new, original in original_by_new_buff.items():
405            if del_start <= new < del_stop:
406                to_be_deleted.append((new, original))
407        
408        to_be_deleted.sort(key=lambda x: x[0])
409        if preserver_index_for_first_x_op:
410            ignored = to_be_deleted[:preserver_index_for_first_x_op]
411            if ignored:
412                last_ignored = ignored[-1]
413                last_ignored_new = last_ignored[0]
414                offset_change_start = last_ignored_new + 1
415            
416            to_be_deleted = to_be_deleted[preserver_index_for_first_x_op:]
417        
418        for new, original in to_be_deleted:
419            del self.original_by_new[new]
420        
421        self.add_offset(-op_num, offset_change_start)
422        self.op_num -= op_num
423        self.range = slice(self.range.start, self.range.stop - op_num)
def insert_slice( self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0):
425    def insert_slice(self, op_index: int, op_num: int, preserver_index_for_first_x_op: int = 0):
426        if not op_num:
427            return
428        
429        self.add_offset(op_num, op_index + preserver_index_for_first_x_op)
430        self.op_num += op_num
431        self.range = slice(self.range.start, self.range.stop + op_num)
def insert_op_sequence_offset( self, op_index: int, op_sequence_offset_map: OpSequenceOffsetMap, insertion_id: Any, preserver_index_for_first_x_op: int = 0):
433    def insert_op_sequence_offset(self, op_index: int, op_sequence_offset_map: 'OpSequenceOffsetMap', insertion_id: Any, preserver_index_for_first_x_op: int = 0):
434        op_num: int = op_sequence_offset_map.op_num
435        if not op_num:
436            return
437        
438        sequence_range: slice = op_sequence_offset_map.range
439        offset = op_index - sequence_range.start
440        self.insert_slice(op_index, op_num, preserver_index_for_first_x_op)
441        for new, original in op_sequence_offset_map.original_by_new.items():
442            self.original_by_new[new + offset] = (insertion_id, new)
def add_offset(self, op_offset: int, op_start: int = 0):
444    def add_offset(self, op_offset: int, op_start: int = 0):
445        if not op_offset:
446            return
447        
448        new_by_original_buff = self.new_by_original
449        self.new_by_original = type(self.new_by_original)()
450        for original, new in new_by_original_buff.items():
451            if new >= op_start:
452                new += op_offset
453            
454            self.new_by_original[original] = new
455        
456        original_by_new_buff = self.original_by_new
457        self.original_by_new = type(self.original_by_new)()
458        for new, original in original_by_new_buff.items():
459            if new >= op_start:
460                new += op_offset
461            
462            self.original_by_new[new] = original
def shift(self, op_offset: int):
464    def shift(self, op_offset: int):
465        self.add_offset(op_offset)
466        self.range = slice(self.range.start + op_offset, self.range.stop + op_offset)
def shift_to_absolute(self, op_index: int):
468    def shift_to_absolute(self, op_index: int):
469        op_offset: int = op_index - self.range.start
470        self.add_offset(op_offset)
471        self.range = slice(self.range.start + op_offset, self.range.stop + op_offset)
def update( self, op_sequence_offset_map: OpSequenceOffsetMap, op_offset: int = 0):
473    def update(self, op_sequence_offset_map: 'OpSequenceOffsetMap', op_offset: int = 0):
474        if op_offset:
475            op_sequence_offset_map = op_sequence_offset_map.copy()
476            op_sequence_offset_map.add_offset(op_offset)
477        
478        self.new_by_original.update(op_sequence_offset_map.new_by_original)
479        self.original_by_new.update(op_sequence_offset_map.original_by_new)
def copy(self):
506    def copy(self):
507        cls = self.__class__
508        result = cls.__new__(cls)
509        result.__dict__['new_by_original'] = copy(self.new_by_original)
510        result.__dict__['original_by_new'] = copy(self.original_by_new)
511        result.__dict__['op_num'] = self.op_num
512        result.__dict__['range'] = copy(self.range)
513        return result

Should make relevant copy of an object (not so general and deep as a deepcopy()). should copy only known object fields. Example: def copy(self): cls = self.__class__ result = cls.__new__(cls) result.__dict__['dimension'] = self.dimension result.__dict__['_point'] = self._point.copy() return result

Raises: NotImplementedError: _description_

Inherited Members
cengal.entities.copyable.versions.v_0.copyable.CopyMethodsMixin
shallow_copy
updated_copy
def opcode(name: str) -> int:
516def opcode(name: str) -> int:
517    name = name.upper()
518    return opmap[name]
def opcode_name(opcode: int) -> str:
521def opcode_name(opcode: int) -> str:
522    return opname[opcode]
class OpSequence:
525class OpSequence:
526    extended_arg_opcode_int = opcode('EXTENDED_ARG')
527    extended_arg_opcode_byte = extended_arg_opcode_int.to_bytes(1, byteorder='little')
528
529    def __init__(self, op_sequence: Optional[List[Tuple[int, int, int, int, int, int]]] = None):
530        self.op_sequence: List[Tuple[int, int, int, int, int, int]] = list() if op_sequence is None else op_sequence
531        self.op_sequence_offset_map: OpSequenceOffsetMap = OpSequenceOffsetMap(len(self.op_sequence))
532    
533    def __len__(self):
534        return len(self.op_sequence)
535    
536    def read_slice(self, place: slice) -> 'OpSequence':
537        result = OpSequence()
538        result.op_sequence = self.op_sequence[place]
539        result.op_sequence_offset_map = self.op_sequence_offset_map[place]
540        result.op_sequence_offset_map.shift_to_absolute(0)
541        return result
542    
543    def remove_slice(self, place: slice, preserver_index_for_first_x_op: int = 0):
544        self.op_sequence[place] = []
545        self.op_sequence_offset_map.remove_slice(place.start, place.stop - place.start, preserver_index_for_first_x_op)
546    
547    def insert_op_sequence(self, index: int, op_sequence: 'OpSequence', preserver_index_for_first_x_op: int = 0):
548        self.op_sequence[index: index] = op_sequence.op_sequence
549        op_sequence.op_sequence_offset_map.shift_to_absolute(index)
550        self.op_sequence_offset_map.insert_op_sequence_offset(index, op_sequence.op_sequence_offset_map, op_sequence, preserver_index_for_first_x_op)
551        return op_sequence
552    
553    def normalize_instructions_arg(self):
554        index = -1
555        while True:
556            index += 1
557            if index >= len(self.op_sequence):
558                break
559
560            op, arg, op_index, offset, real_op_index, real_offset = self.op_sequence[index]
561            if arg is not None:
562                if arg > 255:
563                    arg = arg_to_bytes(arg)
564                    if real_op_index is not None:
565                        real_op_index_delta = real_op_index - op_index
566                        real_op_index = index - real_op_index_delta
567                        slice_to_delete = slice(real_op_index, index)
568                        self.remove_slice(slice_to_delete, 1)
569                        index = real_op_index
570
571                    extended_arg: bytes = arg[1:]
572                    extended_arg_len = len(extended_arg)
573                    arg = arg[0]
574                    self.op_sequence[index] = (op, arg, index + extended_arg_len, (index + extended_arg_len) * 2, index, index * 2)
575                    op_sub_sequence_instructions = list()
576                    for extended_arg_int in reversed(extended_arg):
577                        op_sub_sequence_instructions.append(make_instruction('EXTENDED_ARG', extended_arg_int))
578                    
579                    sub_op_sequence: 'OpSequence' = OpSequence.from_instructions_fast(op_sub_sequence_instructions)
580                    self.insert_op_sequence(index, sub_op_sequence, 1)
581                    index += len(sub_op_sequence.op_sequence)
582    
583    def denormalize_instructions_arg(self):
584        raise NotImplementedError
585    
586    def find_op(self, op_index: int) -> Optional[slice]:
587        start_index = None
588        end_index = None
589
590        index = op_index - 1
591        while True:
592            index += 1
593            if index >= len(self.op_sequence):
594                break
595
596            current_op, current_arg, _, _, _, _ = self.op_sequence[index]
597            if current_op == OpSequence.extended_arg_opcode_int:
598                continue
599            else:
600                end_index = index + 1
601                break
602        
603        previous_op_is_extended_arg = None
604        index = op_index
605        while True:
606            index -= 1
607            if index < 0:
608                if previous_op_is_extended_arg:
609                    start_index = 0
610                
611                break
612            
613            current_op, current_arg, _, _, _, _ = self.op_sequence[index]
614            if current_op == OpSequence.extended_arg_opcode_int:
615                previous_op_is_extended_arg = True
616                continue
617            else:
618                previous_op_is_extended_arg = False
619                start_index = index + 1
620                break
621        
622        if (start_index is None) or (end_index is None):
623            return None
624        
625        return slice(start_index, end_index)
626    
627    def get_arg(self, op_index: Union[int, slice]) -> bytes:
628        if isinstance(op_index, int):
629            op_index = self.find_op(op_index)
630        
631        if (op_index is None) or ((op_index.stop - op_index.start) < 1):
632            raise RuntimeError('OP not found by index')
633        
634        op_slice = self.op_sequence[op_index]
635        if (op_index.stop - op_index.start) > 1:
636            arg_list = list()
637            for op, arg, _, _, _, _ in reversed(op_slice):
638                arg_list.append(arg)
639            
640            return arg_to_int(bytes(arg_list))
641        else:
642            return op_slice[0][1]
643    
644    def set_arg(self, op_index: Union[int, slice], new_arg: Union[None, int, bytes]):
645        if isinstance(op_index, int):
646            op_index = self.find_op(op_index)
647        
648        op_index_len = op_index.stop - op_index.start
649        if (op_index is None) or (op_index_len < 1):
650            raise RuntimeError('OP not found by index')
651        
652        new_arg = arg_to_int(new_arg)
653        current_op, current_arg, current_op_index, current_offset, current_real_op_index, current_real_offset = self.op_sequence[op_index.stop - 1]
654        sub_op_sequence_bytes = [current_op, new_arg]
655        sub_op_sequence: OpSequence = OpSequence(list(unpack_opargs(sub_op_sequence_bytes)))
656        sub_op_sequence.normalize_instructions_arg()
657        real_arg = sub_op_sequence.op_sequence[-1][1]
658        sub_op_sequence_len = len(sub_op_sequence.op_sequence)
659        extended_args_len = sub_op_sequence_len - 1
660        if extended_args_len:
661            current_real_op_index = op_index.start
662            current_real_offset = current_real_op_index * 2
663            current_op_index = current_real_op_index + extended_args_len
664            current_offset = current_op_index * 2
665        else:
666            current_op_index = op_index.start
667            current_offset = current_op_index * 2
668            current_real_op_index = None
669            current_real_offset = None
670
671        self.op_sequence[op_index.stop - 1] = (current_op, real_arg, current_op_index, current_offset, current_real_op_index, current_real_offset)
672
673        if sub_op_sequence_len > 1:
674            sub_op_sequence.remove_slice(slice(sub_op_sequence_len - 1, sub_op_sequence_len))
675            if op_index_len > 1:
676                self.remove_slice(slice(op_index.start, op_index.stop - 1), 1)
677            
678            self.insert_op_sequence(op_index.start, sub_op_sequence, 1)
679
680    def fix_labels(self, op_by_label: Dict[int, List[int]]):
681        labels = list(op_by_label.keys())
682        labels.sort()
683        for label, data in op_by_label.items():
684            data.sort()
685        
686        for label in labels:
687            labeled_op_index_new = self.op_sequence_offset_map.new_by_original[label]
688            label_data = op_by_label[label]
689            while True:
690                for op_index in label_data:
691                    op_index_new = self.op_sequence_offset_map.new_by_original[op_index]
692                    op = self.op_sequence[op_index_new][0]
693                    new_arg = op_index_to_arg(labeled_op_index_new)
694                    if op in hasjrel:
695                        new_arg = new_arg - (op_index_to_arg(op_index_new) + 2)
696                    
697                    self.set_arg(op_index_new, new_arg)
698                
699                labeled_op_index_new_after_value_change = self.op_sequence_offset_map.new_by_original[label]
700                if labeled_op_index_new_after_value_change == labeled_op_index_new:
701                    break
702                else:
703                    labeled_op_index_new = labeled_op_index_new_after_value_change
704
705    def to_sequence_of_ints(self):
706        for op, arg, _, _, _, _ in self.op_sequence:
707            yield op
708            if arg is None:
709                yield 0
710            else:
711                yield arg
712
713    def to_bytes(self) -> bytes:
714        return bytes(self.to_sequence_of_ints())
715        
716    @staticmethod
717    def from_bytecode_sequence(code: BytecodeSequence):
718        labels, op_by_label = find_ops_with_labels(code)
719        return OpSequence(list(unpack_opargs(code))), labels, op_by_label
720    
721    @staticmethod
722    def from_entity(entity):
723        code_bytes: BytecodeSequence = _get_code_object(entity).co_code
724        return OpSequence.from_bytecode_sequence(code_bytes)
725    
726    @staticmethod
727    def from_instructions(instructions: Sequence[Instruction]):
728        op_sequence: List = list()
729        for instruction in instructions:
730            arg = instruction.arg
731            if arg is None:
732                arg = 0
733
734            op_sequence.extend((instruction.opcode, arg))
735
736        labels, op_by_label = find_ops_with_labels(op_sequence)
737        result = OpSequence(list(unpack_opargs(op_sequence)))
738        result.normalize_instructions_arg()
739        return result, labels, op_by_label
740    
741    @staticmethod
742    def from_instructions_fast(instructions: Sequence[Instruction]):
743        return OpSequence(list(unpack_opargs(instructions_to_sequence_of_ints(normalize_instructions_arg(instructions)))))
OpSequence( op_sequence: Union[List[Tuple[int, int, int, int, int, int]], NoneType] = None)
529    def __init__(self, op_sequence: Optional[List[Tuple[int, int, int, int, int, int]]] = None):
530        self.op_sequence: List[Tuple[int, int, int, int, int, int]] = list() if op_sequence is None else op_sequence
531        self.op_sequence_offset_map: OpSequenceOffsetMap = OpSequenceOffsetMap(len(self.op_sequence))
extended_arg_opcode_int = 144
extended_arg_opcode_byte = b'\x90'
op_sequence: List[Tuple[int, int, int, int, int, int]]
op_sequence_offset_map: OpSequenceOffsetMap
def read_slice( self, place: slice) -> OpSequence:
536    def read_slice(self, place: slice) -> 'OpSequence':
537        result = OpSequence()
538        result.op_sequence = self.op_sequence[place]
539        result.op_sequence_offset_map = self.op_sequence_offset_map[place]
540        result.op_sequence_offset_map.shift_to_absolute(0)
541        return result
def remove_slice(self, place: slice, preserver_index_for_first_x_op: int = 0):
543    def remove_slice(self, place: slice, preserver_index_for_first_x_op: int = 0):
544        self.op_sequence[place] = []
545        self.op_sequence_offset_map.remove_slice(place.start, place.stop - place.start, preserver_index_for_first_x_op)
def insert_op_sequence( self, index: int, op_sequence: OpSequence, preserver_index_for_first_x_op: int = 0):
547    def insert_op_sequence(self, index: int, op_sequence: 'OpSequence', preserver_index_for_first_x_op: int = 0):
548        self.op_sequence[index: index] = op_sequence.op_sequence
549        op_sequence.op_sequence_offset_map.shift_to_absolute(index)
550        self.op_sequence_offset_map.insert_op_sequence_offset(index, op_sequence.op_sequence_offset_map, op_sequence, preserver_index_for_first_x_op)
551        return op_sequence
def normalize_instructions_arg(self):
553    def normalize_instructions_arg(self):
554        index = -1
555        while True:
556            index += 1
557            if index >= len(self.op_sequence):
558                break
559
560            op, arg, op_index, offset, real_op_index, real_offset = self.op_sequence[index]
561            if arg is not None:
562                if arg > 255:
563                    arg = arg_to_bytes(arg)
564                    if real_op_index is not None:
565                        real_op_index_delta = real_op_index - op_index
566                        real_op_index = index - real_op_index_delta
567                        slice_to_delete = slice(real_op_index, index)
568                        self.remove_slice(slice_to_delete, 1)
569                        index = real_op_index
570
571                    extended_arg: bytes = arg[1:]
572                    extended_arg_len = len(extended_arg)
573                    arg = arg[0]
574                    self.op_sequence[index] = (op, arg, index + extended_arg_len, (index + extended_arg_len) * 2, index, index * 2)
575                    op_sub_sequence_instructions = list()
576                    for extended_arg_int in reversed(extended_arg):
577                        op_sub_sequence_instructions.append(make_instruction('EXTENDED_ARG', extended_arg_int))
578                    
579                    sub_op_sequence: 'OpSequence' = OpSequence.from_instructions_fast(op_sub_sequence_instructions)
580                    self.insert_op_sequence(index, sub_op_sequence, 1)
581                    index += len(sub_op_sequence.op_sequence)
def denormalize_instructions_arg(self):
583    def denormalize_instructions_arg(self):
584        raise NotImplementedError
def find_op(self, op_index: int) -> Union[slice, NoneType]:
586    def find_op(self, op_index: int) -> Optional[slice]:
587        start_index = None
588        end_index = None
589
590        index = op_index - 1
591        while True:
592            index += 1
593            if index >= len(self.op_sequence):
594                break
595
596            current_op, current_arg, _, _, _, _ = self.op_sequence[index]
597            if current_op == OpSequence.extended_arg_opcode_int:
598                continue
599            else:
600                end_index = index + 1
601                break
602        
603        previous_op_is_extended_arg = None
604        index = op_index
605        while True:
606            index -= 1
607            if index < 0:
608                if previous_op_is_extended_arg:
609                    start_index = 0
610                
611                break
612            
613            current_op, current_arg, _, _, _, _ = self.op_sequence[index]
614            if current_op == OpSequence.extended_arg_opcode_int:
615                previous_op_is_extended_arg = True
616                continue
617            else:
618                previous_op_is_extended_arg = False
619                start_index = index + 1
620                break
621        
622        if (start_index is None) or (end_index is None):
623            return None
624        
625        return slice(start_index, end_index)
def get_arg(self, op_index: Union[int, slice]) -> bytes:
627    def get_arg(self, op_index: Union[int, slice]) -> bytes:
628        if isinstance(op_index, int):
629            op_index = self.find_op(op_index)
630        
631        if (op_index is None) or ((op_index.stop - op_index.start) < 1):
632            raise RuntimeError('OP not found by index')
633        
634        op_slice = self.op_sequence[op_index]
635        if (op_index.stop - op_index.start) > 1:
636            arg_list = list()
637            for op, arg, _, _, _, _ in reversed(op_slice):
638                arg_list.append(arg)
639            
640            return arg_to_int(bytes(arg_list))
641        else:
642            return op_slice[0][1]
def set_arg( self, op_index: Union[int, slice], new_arg: Union[NoneType, int, bytes]):
644    def set_arg(self, op_index: Union[int, slice], new_arg: Union[None, int, bytes]):
645        if isinstance(op_index, int):
646            op_index = self.find_op(op_index)
647        
648        op_index_len = op_index.stop - op_index.start
649        if (op_index is None) or (op_index_len < 1):
650            raise RuntimeError('OP not found by index')
651        
652        new_arg = arg_to_int(new_arg)
653        current_op, current_arg, current_op_index, current_offset, current_real_op_index, current_real_offset = self.op_sequence[op_index.stop - 1]
654        sub_op_sequence_bytes = [current_op, new_arg]
655        sub_op_sequence: OpSequence = OpSequence(list(unpack_opargs(sub_op_sequence_bytes)))
656        sub_op_sequence.normalize_instructions_arg()
657        real_arg = sub_op_sequence.op_sequence[-1][1]
658        sub_op_sequence_len = len(sub_op_sequence.op_sequence)
659        extended_args_len = sub_op_sequence_len - 1
660        if extended_args_len:
661            current_real_op_index = op_index.start
662            current_real_offset = current_real_op_index * 2
663            current_op_index = current_real_op_index + extended_args_len
664            current_offset = current_op_index * 2
665        else:
666            current_op_index = op_index.start
667            current_offset = current_op_index * 2
668            current_real_op_index = None
669            current_real_offset = None
670
671        self.op_sequence[op_index.stop - 1] = (current_op, real_arg, current_op_index, current_offset, current_real_op_index, current_real_offset)
672
673        if sub_op_sequence_len > 1:
674            sub_op_sequence.remove_slice(slice(sub_op_sequence_len - 1, sub_op_sequence_len))
675            if op_index_len > 1:
676                self.remove_slice(slice(op_index.start, op_index.stop - 1), 1)
677            
678            self.insert_op_sequence(op_index.start, sub_op_sequence, 1)
def fix_labels(self, op_by_label: Dict[int, List[int]]):
680    def fix_labels(self, op_by_label: Dict[int, List[int]]):
681        labels = list(op_by_label.keys())
682        labels.sort()
683        for label, data in op_by_label.items():
684            data.sort()
685        
686        for label in labels:
687            labeled_op_index_new = self.op_sequence_offset_map.new_by_original[label]
688            label_data = op_by_label[label]
689            while True:
690                for op_index in label_data:
691                    op_index_new = self.op_sequence_offset_map.new_by_original[op_index]
692                    op = self.op_sequence[op_index_new][0]
693                    new_arg = op_index_to_arg(labeled_op_index_new)
694                    if op in hasjrel:
695                        new_arg = new_arg - (op_index_to_arg(op_index_new) + 2)
696                    
697                    self.set_arg(op_index_new, new_arg)
698                
699                labeled_op_index_new_after_value_change = self.op_sequence_offset_map.new_by_original[label]
700                if labeled_op_index_new_after_value_change == labeled_op_index_new:
701                    break
702                else:
703                    labeled_op_index_new = labeled_op_index_new_after_value_change
def to_sequence_of_ints(self):
705    def to_sequence_of_ints(self):
706        for op, arg, _, _, _, _ in self.op_sequence:
707            yield op
708            if arg is None:
709                yield 0
710            else:
711                yield arg
def to_bytes(self) -> bytes:
713    def to_bytes(self) -> bytes:
714        return bytes(self.to_sequence_of_ints())
@staticmethod
def from_bytecode_sequence(code: Union[bytes, Sequence[Union[int, bytes]]]):
716    @staticmethod
717    def from_bytecode_sequence(code: BytecodeSequence):
718        labels, op_by_label = find_ops_with_labels(code)
719        return OpSequence(list(unpack_opargs(code))), labels, op_by_label
@staticmethod
def from_entity(entity):
721    @staticmethod
722    def from_entity(entity):
723        code_bytes: BytecodeSequence = _get_code_object(entity).co_code
724        return OpSequence.from_bytecode_sequence(code_bytes)
@staticmethod
def from_instructions(instructions: Sequence[dis.Instruction]):
726    @staticmethod
727    def from_instructions(instructions: Sequence[Instruction]):
728        op_sequence: List = list()
729        for instruction in instructions:
730            arg = instruction.arg
731            if arg is None:
732                arg = 0
733
734            op_sequence.extend((instruction.opcode, arg))
735
736        labels, op_by_label = find_ops_with_labels(op_sequence)
737        result = OpSequence(list(unpack_opargs(op_sequence)))
738        result.normalize_instructions_arg()
739        return result, labels, op_by_label
@staticmethod
def from_instructions_fast(instructions: Sequence[dis.Instruction]):
741    @staticmethod
742    def from_instructions_fast(instructions: Sequence[Instruction]):
743        return OpSequence(list(unpack_opargs(instructions_to_sequence_of_ints(normalize_instructions_arg(instructions)))))
def arg_to_bytes(arg: Union[int, bytes]) -> bytes:
746def arg_to_bytes(arg: Union[int, bytes]) -> bytes:
747    if isinstance(arg, int):
748        arg = arg.to_bytes(4, byteorder='little')
749        if len(arg) > 1:
750            arg = arg.rstrip(b'\x00')
751            
752        if not arg:
753            arg = b'\x00'
754    
755    return arg
def arg_to_int(arg: Union[int, bytes]) -> int:
758def arg_to_int(arg: Union[int, bytes]) -> int:
759    if isinstance(arg, bytes):
760        arg = int.from_bytes(arg, byteorder='little')
761    
762    return arg
def get_raw_instructions(x, *, first_line=None):
765def get_raw_instructions(x, *, first_line=None):
766    readable_instructions = get_instructions(x, first_line)
767    for instruction in readable_instructions:
768        arg = instruction.arg
769        if arg is not None:
770            if isinstance(arg, int):
771                arg = arg.to_bytes(4, byteorder='little')
772            
773            arg = arg[0]
774
775        yield Instruction(instruction.opname, instruction.opcode, arg, instruction.argval, instruction.argrepr, instruction.offset, instruction.starts_line, instruction.is_jump_target)
def normalize_instructions_arg(instructions: Sequence[dis.Instruction]):
778def normalize_instructions_arg(instructions: Sequence[Instruction]):
779    for instruction in instructions:
780        arg = instruction.arg
781        if arg is not None:
782            arg = arg_to_bytes(arg)
783            extended_arg = arg[1:]
784            arg = arg[0]
785            for extended_arg_int in reversed(extended_arg):
786                yield make_instruction('EXTENDED_ARG', extended_arg_int)
787
788        yield Instruction(instruction.opname, instruction.opcode, arg, instruction.argval, instruction.argrepr, instruction.offset, instruction.starts_line, instruction.is_jump_target)
def instructions_to_sequence_of_ints(instructions: Sequence[dis.Instruction]):
791def instructions_to_sequence_of_ints(instructions: Sequence[Instruction]):
792    for instruction in instructions:
793        op = instruction.opcode
794        yield op
795        arg = instruction.arg
796        yield 0 if arg is None else arg
def instructions_to_bytes(instructions: Sequence[dis.Instruction]) -> bytes:
799def instructions_to_bytes(instructions: Sequence[Instruction]) -> bytes:
800    return bytes(instructions_to_sequence_of_ints(instructions))
def make_instruction(name: str, arg: Union[NoneType, int, bytes] = None) -> dis.Instruction:
803def make_instruction(name: str, arg: Union[None, int, bytes] = None) -> Instruction:
804    name = name.upper()
805    op = opmap[name]
806    arg = arg_to_int(arg)
807    return Instruction(name, op, arg, None, None, None, None, None)
def mi(name: str, arg: Union[NoneType, int, bytes] = None) -> dis.Instruction:
803def make_instruction(name: str, arg: Union[None, int, bytes] = None) -> Instruction:
804    name = name.upper()
805    op = opmap[name]
806    arg = arg_to_int(arg)
807    return Instruction(name, op, arg, None, None, None, None, None)
def fix_labels( op_sequence: OpSequence, op_by_label: Dict[int, List[Tuple[int, int, int, int, int, int]]]):
813def fix_labels(op_sequence: OpSequence, op_by_label: Dict[int, List[Tuple[int, int, int, int, int, int]]]):
814    op_sequence.fix_labels({arg_to_op_index(label): [op_info[2] for op_info in data] for label, data in op_by_label.items()})
def code_info(x):
817def code_info(x):
818    """Formatted details of methods, functions, or code."""
819    return _format_code_info(_get_code_object(x))

Formatted details of methods, functions, or code.

def modify_code(original_code: code, co_code, co_consts, co_names, co_varnames):
203    def modify_code(original_code: CodeType, co_code, co_consts, co_names, co_varnames):
204        co_nlocals = len(co_varnames)
205        return CodeType(
206            original_code.co_argcount,
207            original_code.co_posonlyargcount,
208            original_code.co_kwonlyargcount,
209            co_nlocals,
210            original_code.co_stacksize,
211            original_code.co_flags,
212            co_code,
213            tuple(co_consts),
214            tuple(co_names),
215            tuple(co_varnames),
216            original_code.co_filename,
217            original_code.co_name,
218            original_code.co_firstlineno,
219            original_code.co_lnotab,
220            original_code.co_freevars,
221            original_code.co_cellvars,
222        )
def unpack_opargs( code: Union[bytes, Sequence[Union[int, bytes]]], denormalize_values: bool = False):
256    def unpack_opargs(code: BytecodeSequence, denormalize_values: bool = False):
257        ftv: FrontTriggerableVariable = FrontTriggerableVariable(FrontTriggerableVariableType.equal, EXTENDED_ARG)
258        extended_arg = 0
259        real_op_index: Optional[int] = None
260        real_offset: Optional[int] = None
261        need_to_clear_real_data: bool = False
262
263        op = None
264        arg = None
265        offset = -1
266        op_index = -1
267        i = -1
268        for item in code:
269            if isinstance(item, bytes):
270                item = int.from_bytes(item, byteorder='little')
271            
272            i += 1
273            if not i % 2:
274                op = item
275                op_index += 1
276                offset = i
277                continue
278            
279            if op >= HAVE_ARGUMENT:
280                ftv_result: Optional[bool] = ftv(op)
281                if ftv_result is True:
282                    real_op_index = op_index
283                    real_offset = offset
284                elif ftv_result is False:
285                    need_to_clear_real_data = True
286
287                arg = item | extended_arg
288                if denormalize_values:
289                    extended_arg = (arg << 8) if op == EXTENDED_ARG else 0
290            else:
291                arg = None
292            
293            yield (op, arg, op_index, offset, real_op_index, real_offset)
294            if need_to_clear_real_data:
295                real_op_index = None
296                real_offset = None
297                need_to_clear_real_data = False
def find_ops_with_labels( code: Union[bytes, Sequence[Union[int, bytes]]]) -> Tuple[List[int], Dict[int, List[Tuple[int, int, int, int]]]]:
299    def find_ops_with_labels(code: BytecodeSequence) -> Tuple[List[int], Dict[int, List[Tuple[int, int, int, int]]]]:
300        labels: List[int] = list()
301        op_by_label: Dict[int, List[Tuple[int, int, int, int]]] = dict()
302        for op, arg, op_index, offset, real_op_index, real_offset in unpack_opargs(code, True):
303            if arg is not None:
304                if op in hasjrel:
305                    label = offset + 2 + arg
306                elif op in hasjabs:
307                    label = arg
308                else:
309                    continue
310
311                if label not in op_by_label:
312                    labels.append(label)
313                    op_by_label[label] = list()
314                
315                op_by_label[label].append((op, arg, op_index, offset, real_op_index, real_offset))
316
317        return labels, op_by_label
def op_index_to_arg(op_index: int) -> int:
320    def op_index_to_arg(op_index: int) -> int:
321        return op_index * 2
def arg_to_op_index(arg: int) -> int:
324    def arg_to_op_index(arg: int) -> int:
325        return arg // 2