Source code for credsweeper.deep_scanner.deep_scanner

import datetime
import logging
from pathlib import Path
from typing import List, Optional, Any, Tuple, Union

from credsweeper.common.constants import RECURSIVE_SCAN_LIMITATION
from credsweeper.config import Config
from credsweeper.credentials import Candidate
from credsweeper.credentials.augment_candidates import augment_candidates
from credsweeper.file_handler.byte_content_provider import ByteContentProvider
from credsweeper.file_handler.content_provider import ContentProvider
from credsweeper.file_handler.data_content_provider import DataContentProvider
from credsweeper.file_handler.diff_content_provider import DiffContentProvider
from credsweeper.file_handler.string_content_provider import StringContentProvider
from credsweeper.file_handler.text_content_provider import TextContentProvider
from credsweeper.scanner import Scanner
from credsweeper.utils import Util
from .byte_scanner import ByteScanner
from .bzip2_scanner import Bzip2Scanner
from .docx_scanner import DocxScanner
from .eml_scanner import EmlScanner
from .encoder_scanner import EncoderScanner
from .gzip_scanner import GzipScanner
from .html_scanner import HtmlScanner
from .jks_scanner import JksScanner
from .lang_scanner import LangScanner
from .pdf_scanner import PdfScanner
from .pkcs12_scanner import Pkcs12Scanner
from .pptx_scanner import PptxScanner
from .tar_scanner import TarScanner
from .xlsx_scanner import XlsxScanner
from .xml_scanner import XmlScanner
from .zip_scanner import ZipScanner
from ..common.constants import DEFAULT_ENCODING
from ..file_handler.file_path_extractor import FilePathExtractor
from ..file_handler.struct_content_provider import StructContentProvider

logger = logging.getLogger(__name__)


[docs] class DeepScanner( ByteScanner, # Bzip2Scanner, # DocxScanner, # EncoderScanner, # GzipScanner, # HtmlScanner, # JksScanner, # LangScanner, # PdfScanner, # Pkcs12Scanner, # PptxScanner, # TarScanner, # XmlScanner, # XlsxScanner, # ZipScanner ): # yapf: disable """Advanced scanner with recursive exploring of data""" def __init__(self, config: Config, scanner: Scanner) -> None: """Initialize Advanced credential scanner. Args: scanner: CredSweeper scanner object config: dictionary variable, stores analyzer features """ self.__config = config self.__scanner = scanner @property def config(self) -> Config: return self.__config @property def scanner(self) -> Scanner: return self.__scanner
[docs] @staticmethod def get_deep_scanners(data: bytes, file_type: str) -> List[Any]: """Returns possibly scan methods for the data depends on content""" deep_scanners: List[Any] = [] if Util.is_zip(data): deep_scanners.append(ZipScanner) # probably, there might be a docx, xlxs and so on. # It might be scanned with text representation in third-party libraries. deep_scanners.append(XlsxScanner) deep_scanners.append(DocxScanner) deep_scanners.append(PptxScanner) elif Util.is_bzip2(data): deep_scanners.append(Bzip2Scanner) elif Util.is_tar(data): deep_scanners.append(TarScanner) elif Util.is_gzip(data): deep_scanners.append(GzipScanner) elif Util.is_pdf(data): deep_scanners.append(PdfScanner) elif Util.is_jks(data): deep_scanners.append(JksScanner) elif Util.is_asn1(data): deep_scanners.append(Pkcs12Scanner) elif file_type in [".eml", ".mht"]: if Util.is_eml(data): deep_scanners.append(EmlScanner) elif Util.is_html(data): deep_scanners.append(HtmlScanner) else: deep_scanners = [ByteScanner] else: deep_scanners = [ByteScanner, EncoderScanner, HtmlScanner, XmlScanner, LangScanner] return deep_scanners
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs] def scan(self, content_provider: ContentProvider, depth: int, recursive_limit_size: Optional[int] = None) -> List[Candidate]: """Initial scan method to launch recursive scan. Skips ByteScanner to prevent extra scan Args: content_provider: ContentProvider that might contain raw data depth: maximal level of recursion recursive_limit_size: maximal bytes of opened files to prevent recursive zip-bomb attack """ recursive_limit_size = recursive_limit_size if isinstance(recursive_limit_size, int) else RECURSIVE_SCAN_LIMITATION candidates: List[Candidate] = [] data: Optional[bytes] = None if isinstance(content_provider, TextContentProvider) or isinstance(content_provider, ByteContentProvider): # Feature to scan files which might be containers data = content_provider.data elif isinstance(content_provider, DiffContentProvider) and content_provider.diff: candidates = self.scanner.scan(content_provider) # Feature to scan binary diffs diff = content_provider.diff[0].get("line") # the check for legal fix mypy issue if isinstance(diff, bytes): data = diff else: logger.warning(f"Content provider {type(content_provider)} does not support deep scan") if data: data_provider = DataContentProvider(data=data, file_path=content_provider.file_path, file_type=content_provider.file_type, info=Path(content_provider.file_path).as_posix()) # iterate for all possibly scanner methods WITHOUT ByteContentProvider for TextContentProvider scanner_classes = self.get_deep_scanners(data, content_provider.file_type) for scan_class in scanner_classes: new_candidates = scan_class.data_scan(self, data_provider, depth - 1, recursive_limit_size - len(data)) augment_candidates(candidates, new_candidates) return candidates
[docs] def recursive_scan( self, # data_provider: DataContentProvider, # depth: int = 0, # recursive_limit_size: int = 0) -> List[Candidate]: """Recursive function to scan files which might be containers like ZIP archives Args: data_provider: DataContentProvider object may be a container depth: maximal level of recursion recursive_limit_size: maximal bytes of opened files to prevent recursive zip-bomb attack """ candidates: List[Candidate] = [] logger.debug("Start data_scan: size=%d, depth=%d, limit=%d, path=%s, info=%s", len(data_provider.data), depth, recursive_limit_size, data_provider.file_path, data_provider.info) if 0 > depth: # break recursion if maximal depth is reached logger.debug("bottom reached %s recursive_limit_size:%d", data_provider.file_path, recursive_limit_size) return candidates depth -= 1 if FilePathExtractor.is_find_by_ext_file(self.config, data_provider.file_type): # Skip scanning file and makes fake candidate due the extension is suspicious dummy_candidate = Candidate.get_dummy_candidate(self.config, data_provider.file_path, data_provider.file_type, data_provider.info) candidates.append(dummy_candidate) else: # iterate for all possibly scanner methods for scanner_classes in self.get_deep_scanners(data_provider.data, data_provider.file_type): new_candidates = scanner_classes.data_scan(self, data_provider, depth, recursive_limit_size) augment_candidates(candidates, new_candidates) return candidates
[docs] def structure_scan( self, # struct_provider: StructContentProvider, # depth: int, # recursive_limit_size: int) -> List[Candidate]: """Recursive function to scan structured data Args: struct_provider: DataContentProvider object may be a container depth: maximal level of recursion recursive_limit_size: maximal bytes of opened files to prevent recursive zip-bomb attack """ candidates: List[Candidate] = [] logger.debug("Start struct_scan: depth=%d, limit=%d, path=%s, info=%s", depth, recursive_limit_size, struct_provider.file_path, struct_provider.info) if 0 > depth: # break recursion if maximal depth is reached logger.debug("bottom reached %s recursive_limit_size:%d", struct_provider.file_path, recursive_limit_size) return candidates depth -= 1 items: List[Tuple[Union[int, str], Any]] = [] struct_key: Optional[str] = None struct_value: Optional[str] = None line_for_keyword_rules = "" if isinstance(struct_provider.struct, dict): for key, value in struct_provider.struct.items(): if isinstance(value, (list, tuple)) and 1 == len(value): # simplify some structures like YAML when single item in new line is a value items.append((key, value[0])) else: items.append((key, value)) # for transformation {"key": "api_key", "value": "XXXXXXX"} -> {"api_key": "XXXXXXX"} struct_key = struct_provider.struct.get("key") struct_value = struct_provider.struct.get("value") elif isinstance(struct_provider.struct, list) or isinstance(struct_provider.struct, tuple): items = list(enumerate(struct_provider.struct)) else: logger.error("Not supported type:%s val:%s", str(type(struct_provider.struct)), str(struct_provider.struct)) for key, value in items: if isinstance(value, dict) or isinstance(value, (list, tuple)) and 1 < len(value): val_struct_provider = StructContentProvider(struct=value, file_path=struct_provider.file_path, file_type=struct_provider.file_type, info=f"{struct_provider.info}|STRUCT:{key}") new_candidates = self.structure_scan(val_struct_provider, depth, recursive_limit_size) candidates.extend(new_candidates) elif isinstance(value, bytes): bytes_struct_provider = DataContentProvider(data=value, file_path=struct_provider.file_path, file_type=struct_provider.file_type, info=f"{struct_provider.info}|BYTES:{key}") new_limit = recursive_limit_size - len(value) new_candidates = self.recursive_scan(bytes_struct_provider, depth, new_limit) candidates.extend(new_candidates) elif isinstance(value, str): data = value.encode(encoding=DEFAULT_ENCODING, errors='replace') str_struct_provider = DataContentProvider(data=data, file_path=struct_provider.file_path, file_type=struct_provider.file_type, info=f"{struct_provider.info}|STRING:{key}") new_limit = recursive_limit_size - len(str_struct_provider.data) new_candidates = self.recursive_scan(str_struct_provider, depth, new_limit) candidates.extend(new_candidates) # use key = "value" scan for common cases like in TOML if isinstance(key, str) and self.scanner.keywords_required_substrings_check(key): line_for_keyword_rules += f"{key} = \"{value}\"; " elif isinstance(value, (int, float, datetime.date, datetime.datetime)): # use the fields only in case of matched keywords if isinstance(key, str) and self.scanner.keywords_required_substrings_check(key): line_for_keyword_rules += f"{key} = \"{value}\"; " else: logger.warning("Not supported type:%s value(%s)", str(type(value)), str(value)) if line_for_keyword_rules: str_provider = StringContentProvider([line_for_keyword_rules], file_path=struct_provider.file_path, file_type=".toml", info=f"{struct_provider.info}|KEYWORD:`{line_for_keyword_rules}`") new_candidates = self.scanner.scan(str_provider) augment_candidates(candidates, new_candidates) # last check when dictionary is {"key": "api_key", "value": "XXXXXXX"} -> {"api_key": "XXXXXXX"} if isinstance(struct_key, str) and isinstance(struct_value, str): line_for_keyword_rules = f"{struct_key} = \"{struct_value}\"" key_value_provider = StringContentProvider( [line_for_keyword_rules], file_path=struct_provider.file_path, file_type=".toml", info=f"{struct_provider.info}|KEY_VALUE:`{line_for_keyword_rules}`") new_candidates = self.scanner.scan(key_value_provider) augment_candidates(candidates, new_candidates) return candidates