Source code for credsweeper.file_handler.data_content_provider

import json
import logging
import warnings
from functools import cached_property
from typing import List, Optional, Any, Generator, Callable, Tuple

import yaml
from bs4 import BeautifulSoup, Tag, XMLParsedAsHTMLWarning

from credsweeper.common.constants import MIN_DATA_LEN
from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.file_handler.content_provider import ContentProvider
from credsweeper.utils.util import Util

warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning, module='bs4')
logger = logging.getLogger(__name__)

# <t>12345678</t> - minimal xml with a credential
MIN_XML_LEN = 16


[docs] class DataContentProvider(ContentProvider): """Dummy raw provider to keep bytes""" def __init__( self, # data: bytes, # file_path: Optional[str] = None, # file_type: Optional[str] = None, # info: Optional[str] = None) -> None: """ Parameters: data: byte sequence to be stored for deep analysis """ super().__init__(file_path=file_path, file_type=file_type, info=info) self.__data = data self.__text: Optional[str] = None self.structure: Optional[List[Any]] = None self.decoded: Optional[bytes] = None self.lines: List[str] = [] self.line_numbers: List[int] = [] self.__html_lines_size = len(data) # the size is used to limit extra memory consumption during html combination @cached_property def data(self) -> Optional[bytes]: """data RO getter for DataContentProvider and the property is used in deep scan""" return self.__data
[docs] def free(self) -> None: """free data after scan to reduce memory usage""" self.__data = None if "data" in self.__dict__: delattr(self, "data") self.__text = None if "text" in self.__dict__: delattr(self, "text") self.structure = None self.decoded = None self.lines = [] self.line_numbers = []
@cached_property def text(self) -> str: """Getter to produce a text from DEFAULT_ENCODING. Empty str for unrecognized data""" if self.__text is None: self.__text = Util.decode_text(self.__data) or '' return self.__text def __is_structure(self) -> bool: """Check whether a structure was recognized""" return self.structure is not None and (isinstance(self.structure, dict) and 0 < len(self.structure.keys()) or isinstance(self.structure, list) and 0 < len(self.structure))
[docs] def represent_as_structure(self) -> Optional[bool]: """Tries to convert data with many parsers. Stores result to internal structure Return: True if some structure found False if no data found None if the format is not acceptable """ if MIN_DATA_LEN > len(self.text): return False # JSON & NDJSON if '{' in self.text and '}' in self.text and '"' in self.text and ':' in self.text: try: self.structure = json.loads(self.text) logger.debug("CONVERTED from json") except Exception as exc: logger.debug("Cannot parse as json:%s %s", exc, self.data) else: if self.__is_structure(): return True try: self.structure = [] for line in self.text.splitlines(): # each line must be in json format, otherwise - exception rises self.structure.append(json.loads(line)) logger.debug("CONVERTED from ndjson") except Exception as exc: logger.debug("Cannot parse as ndjson:%s %s", exc, self.data) self.structure = None else: if self.__is_structure(): return True else: logger.debug("Data do not contain { - weak JSON") # # # Python try: # search only in sources with strings if (';' in self.text or 2 < self.text.count('\n') or 2 < self.text.count('\r')) \ and ('"' in self.text or "'" in self.text): self.structure = Util.parse_python(self.text) logger.debug("CONVERTED from Python") else: logger.debug("Data do not contain line feed - weak PYTHON") except Exception as exc: logger.debug("Cannot parse as Python:%s %s", exc, self.data) else: if self.__is_structure(): return True # # # YAML - almost always recognized try: if ':' in self.text and (2 < self.text.count('\n') or 2 < self.text.count('\r')): self.structure = yaml.safe_load(self.text) logger.debug("CONVERTED from yaml") else: logger.debug("Data do not contain colon mark - weak YAML") except Exception as exc: logger.debug("Cannot parse as yaml:%s %s", exc, self.data) else: if self.__is_structure(): return True # # # None of above return None
[docs] def represent_as_xml(self) -> Optional[bool]: """Tries to read data as xml Return: True if reading was successful False if no data found None if the format is not acceptable """ if MIN_XML_LEN > len(self.text): return False try: if '<' in self.text and '>' in self.text and "</" in self.text: xml_text = self.text.splitlines() self.lines, self.line_numbers = Util.get_xml_from_lines(xml_text) logger.debug("CONVERTED from xml") return bool(self.lines and self.line_numbers) logger.debug("Weak data to parse as XML") except Exception as exc: logger.debug("Cannot parse as XML:%s %s", exc, self.data) return None
def _check_multiline_cell(self, cell: Tag) -> Optional[Tuple[int, str]]: """multiline cell will be analysed as text or return single line from cell returns line number and one line for analysis If there are no text or the text will be analysed as multiline - it returns None""" # use not stripped get_text, otherwise all format is cleaned cell_text = cell.get_text() cell_lines = cell_text.splitlines() line_numbers: List[int] = [] stripped_lines: List[str] = [] for offset, line in enumerate(cell_lines): if stripped_line := line.strip(): line_numbers.append(cell.sourceline + offset) stripped_lines.append(stripped_line) if not stripped_lines: return None if 1 == len(stripped_lines): return line_numbers[0], stripped_lines[0] # otherwise the cell will be analyzed as multiline text self.line_numbers.extend(line_numbers) self.lines.extend(stripped_lines) self.__html_lines_size += sum(len(x) for x in stripped_lines) return None
[docs] @staticmethod def simple_html_representation(html: BeautifulSoup) -> Tuple[List[int], List[str], int]: """simple parse as it is displayed to user and appends the lines""" line_numbers: List[int] = [] lines: List[str] = [] lines_size = 0 # use dedicated variable to deal with yapf and flake tags_to_split = [ "p", "br", "tr", "li", "ol", "h1", "h2", "h3", "h4", "h5", "h6", "blockquote", "pre", "div", "th", "td" ] for p in html.find_all(tags_to_split): p.append('\t') html_lines = html.get_text().splitlines() for line_number, doc_line in enumerate(html_lines): line = doc_line.strip() if line: line_numbers.append(line_number + 1) lines.append(line) lines_size += len(line) return line_numbers, lines, lines_size
@staticmethod def _table_depth_reached(table: Tag, depth: int) -> bool: if parent := table.parent: if isinstance(parent, BeautifulSoup): return False if 0 > depth: return True if "table" == parent.name: depth -= 1 return DataContentProvider._table_depth_reached(parent, depth) return True def _table_representation( self, # table: Tag, # depth: int, # recursive_limit_size: int, # keywords_required_substrings_check: Callable[[str], bool]): """ transform table if table cell is assigned to header cell make from cells a chain like next is assigned to previous """ if DataContentProvider._table_depth_reached(table, depth): logger.warning("Recursive depth limit was reached during HTML table combinations") return table_header: Optional[List[Optional[str]]] = None rowspan_columns = [] for tr in table.find_all("tr"): if recursive_limit_size < self.__html_lines_size: # weird tables may lead to oversize memory break record_numbers = [] record_lines = [] record_leading = None if table_header is None: table_header = [] # first row in table may be a header with <td> and a style, but search <th> too for cell in tr.find_all(["th", "td"]): if recursive_limit_size < self.__html_lines_size: # keep the duplicates for early breaks! break colspan_header = int(cell.get("colspan", 1)) if td_numbered_line := self._check_multiline_cell(cell): td_text = td_numbered_line[1] td_text_has_keywords = keywords_required_substrings_check(td_text.lower()) rowspan_header = int(cell.get("rowspan", 1)) for _ in range(colspan_header): rowspan_columns.append(rowspan_header) if td_text_has_keywords: table_header.append(td_text) self.__html_lines_size += len(td_text) else: table_header.append(None) # approximate size for auxiliary objects (pointer, types, etc.) self.__html_lines_size += 128 if recursive_limit_size < self.__html_lines_size: break if record_leading is None: if td_text_has_keywords: record_leading = td_text else: record_leading = "" else: record_numbers.append(td_numbered_line[0]) record_lines.append(f"{record_leading} : {td_text}") self.__html_lines_size += 128 + len(td_text) # add single text to lines for analysis self.line_numbers.append(td_numbered_line[0]) self.lines.append(td_text) self.__html_lines_size += 128 + len(td_text) else: # empty cell or multiline cell # number of columns is defined with header only rowspan_header = int(cell.get("rowspan", 1)) for _ in range(colspan_header): rowspan_columns.append(rowspan_header) table_header.append(None) self.__html_lines_size += 128 if recursive_limit_size < self.__html_lines_size: break else: header_pos = 0 # not a first line in table - may be combined with a header for cell in tr.find_all("td"): if recursive_limit_size < self.__html_lines_size: # keep the duplicates for early breaks! break while header_pos < len(rowspan_columns) and 1 < rowspan_columns[header_pos]: rowspan_columns[header_pos] -= 1 header_pos += 1 colspan_cell = int(cell.get("colspan", 1)) rowspan_cell = int(cell.get("rowspan", 1)) for i in range(header_pos, header_pos + colspan_cell): if i < len(rowspan_columns): rowspan_columns[i] += rowspan_cell - 1 if td_numbered_line := self._check_multiline_cell(cell): td_text = td_numbered_line[1] if record_leading is None: td_text_has_keywords = keywords_required_substrings_check(td_text.lower()) if td_text_has_keywords: record_leading = td_text else: record_leading = "" elif record_leading: record_numbers.append(td_numbered_line[0]) record_line = f"{record_leading} : {td_text}" record_lines.append(record_line) self.__html_lines_size += 128 + len(record_line) if recursive_limit_size < self.__html_lines_size: break if header_pos < len(table_header): if header_text := table_header[header_pos]: self.line_numbers.append(td_numbered_line[0]) self.lines.append(f"{header_text} : {td_text}") self.__html_lines_size += 128 + len(td_text) else: # empty cell or multiline cell table_header.append(None) self.__html_lines_size += 64 header_pos += colspan_cell if record_lines: # add combinations with left column self.line_numbers.extend(record_numbers) self.lines.extend(record_lines) self.__html_lines_size += sum(len(x) for x in record_lines) def _html_tables_representation( self, # html: BeautifulSoup, # depth: int, # recursive_limit_size: int, # keywords_required_substrings_check: Callable[[str], bool]): """Iterates for all tables in html to explore cells and their combinations""" depth -= 1 if 0 > depth: return for table in html.find_all("table"): if recursive_limit_size < self.__html_lines_size: logger.warning("Recursive size limit was reached during HTML table combinations") break self._table_representation(table, depth, recursive_limit_size, keywords_required_substrings_check)
[docs] def represent_as_html( self, # depth: int, # recursive_limit_size: int, # keywords_required_substrings_check: Callable[[str], bool]) -> Optional[bool]: """Tries to read data as html Return: True if reading was successful False if no data found None if the format is not acceptable """ try: if "</" in self.text and ">" in self.text: if html := BeautifulSoup(self.text, features="html.parser"): line_numbers, lines, lines_size = self.simple_html_representation(html) self.line_numbers.extend(line_numbers) self.lines.extend(lines) self.__html_lines_size += lines_size # apply recursive_limit_size/2 to reduce extra calculation # of all accompanying losses per objects allocation self._html_tables_representation(html, depth, recursive_limit_size >> 1, keywords_required_substrings_check) logger.debug("CONVERTED from html") else: logger.debug("Data do not contain specific tags - weak HTML") except Exception as exc: logger.debug("Cannot parse as HTML:%s %s", exc, self.data) else: return bool(self.lines and self.line_numbers) return None
[docs] def yield_analysis_target(self, min_len: int) -> Generator[AnalysisTarget, None, None]: """Return nothing. The class provides only data storage. Args: min_len: minimal line length to scan Raise: NotImplementedError """ raise NotImplementedError()