Source code for credsweeper.file_handler.diff_content_provider

import logging
from dataclasses import dataclass
from functools import cached_property
from typing import List, Tuple, Generator, TypedDict, Optional, Union, Any, Dict, cast

import whatthepatch

from credsweeper.common.constants import DiffRowType
from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.file_handler.content_provider import ContentProvider

logger = logging.getLogger(__name__)

DiffDict = TypedDict(
    "DiffDict",
    {
        "old": Optional[int],  #
        "new": Optional[int],  #
        "line": Union[str, bytes],  # bytes are possibly since whatthepatch v1.0.4
        "hunk": Any  # not used
    })


[docs] @dataclass(frozen=True) class DiffRowData: """Class for keeping data of diff row.""" line_type: DiffRowType line_numb: int line: str
[docs] class DiffContentProvider(ContentProvider): """Provide data from a single `.patch` file. Parameters: file_path: path to file change_type: set added or deleted file data to scan diff: list of file row changes, with base elements represented as:: { "old": line number before diff, "new": line number after diff, "line": line text, "hunk": diff hunk number } """ def __init__( self, # file_path: str, # change_type: DiffRowType, # diff: List[DiffDict]) -> None: super().__init__(file_path=file_path, info=f"{file_path}:{change_type.value}") self.__change_type = change_type self.__diff = diff @cached_property def data(self) -> bytes: """data getter for DiffContentProvider""" raise NotImplementedError(__name__) @cached_property def diff(self) -> List[DiffDict]: """diff getter for DiffContentProvider""" return self.__diff
[docs] def free(self) -> None: """free data after scan to reduce memory usage""" self.__diff = [] if "diff" in self.__dict__: delattr(self, "diff")
[docs] @staticmethod def parse_lines_data(change_type: DiffRowType, lines_data: List[DiffRowData]) -> Tuple[List[int], List[str]]: """Parse diff lines data. Return list of line numbers with change type "self.change_type" and list of all lines in file in original order(replaced all lines not mentioned in diff file with blank line) Args: change_type: set added or deleted file data to scan lines_data: data of all rows mentioned in diff file Return: tuple of line numbers with change type "self.change_type" and all file lines in original order(replaced all lines not mentioned in diff file with blank line) """ change_numbs = [] all_lines = [] for line_data in lines_data: if line_data.line_type == change_type: change_numbs.append(line_data.line_numb) all_lines.append(line_data.line) return change_numbs, all_lines
[docs] @staticmethod def patch2files_diff(raw_patch: List[str], change_type: DiffRowType) -> Dict[str, List[DiffDict]]: """Generate files changes from patch for added or deleted filepaths. Args: raw_patch: git patch file content change_type: change type to select, DiffRowType.ADDED or DiffRowType.DELETED Return: return dict with ``{file paths: list of file row changes}``, where elements of list of file row changes represented as:: { "old": line number before diff, "new": line number after diff, "line": line text, "hunk": diff hunk number } """ if not raw_patch: return {} added_files: Dict[str, List[DiffDict]] = {} deleted_files: Dict[str, List[DiffDict]] = {} try: for patch in whatthepatch.parse_patch(raw_patch): if patch.changes is None: logger.warning("Patch '%s' cannot be scanned", str(patch.header)) continue changes: List[DiffDict] = [] for change in patch.changes: change_dict = cast(DiffDict, change._asdict()) changes.append(change_dict) added_files[patch.header.new_path] = changes deleted_files[patch.header.old_path] = changes if change_type == DiffRowType.ADDED: return added_files if change_type == DiffRowType.DELETED: return deleted_files logger.error("Change type should be one of: '%s', '%s'; but received %s", DiffRowType.ADDED, DiffRowType.DELETED, change_type) except Exception as exc: logger.warning(exc) return {}
[docs] @staticmethod def preprocess_diff_rows( added_line_number: Optional[int], # deleted_line_number: Optional[int], # line: str) -> List[DiffRowData]: """Auxiliary function to extend diff changes. Args: added_line_number: number of added line or None deleted_line_number: number of deleted line or None line: the text line Return: diff rows data with as list of row change type, line number, row content """ rows_data: List[DiffRowData] = [] if isinstance(added_line_number, int): # indicates line was inserted rows_data.append(DiffRowData(DiffRowType.ADDED, added_line_number, line)) if isinstance(deleted_line_number, int): # indicates line was removed rows_data.append(DiffRowData(DiffRowType.DELETED, deleted_line_number, line)) return rows_data
[docs] @staticmethod def wrong_change(change: DiffDict) -> bool: """Returns True if the change is wrong""" for i in ["line", "new", "old"]: if i not in change: logger.error("Skipping wrong change %s", change) return True return False
[docs] @staticmethod def preprocess_file_diff(changes: List[DiffDict]) -> List[DiffRowData]: """Generate changed file rows from diff data with changed lines (e.g. marked + or - in diff). Args: changes: git diff by file rows data Return: diff rows data with as list of row change type, line number, row content """ if not changes: return [] rows_data = [] # process diff to restore lines and their positions for change in changes: if DiffContentProvider.wrong_change(change): continue line = change["line"] if isinstance(line, str): rows_data.extend(DiffContentProvider.preprocess_diff_rows(change.get("new"), change.get("old"), line)) elif isinstance(line, (bytes, bytearray)): logger.warning("The feature is available with the deep scan option") else: logger.error("Unknown type of line %s", type(line)) return rows_data
[docs] def yield_analysis_target(self, min_len: int) -> Generator[AnalysisTarget, None, None]: """Preprocess file diff data to scan. Args: min_len: minimal line length to scan Return: list of analysis targets of every row of file diff corresponding to change type "self.change_type" """ lines_data = DiffContentProvider.preprocess_file_diff(self.__diff) change_numbs, all_lines = self.parse_lines_data(self.__change_type, lines_data) return self.lines_to_targets(min_len, all_lines, change_numbs)