import contextlib
import datetime
import io
import logging
from abc import abstractmethod, ABC
from bz2 import BZ2File
from collections.abc import Sized
from gzip import GzipFile
from lzma import LZMAFile
from typing import List, Optional, Tuple, Any, Generator, Union
from credsweeper.common.constants import RECURSIVE_SCAN_LIMITATION, MIN_DATA_LEN, DEFAULT_ENCODING, UTF_8, \
MIN_VALUE_LENGTH
from credsweeper.config.config import Config
from credsweeper.credentials.augment_candidates import augment_candidates
from credsweeper.credentials.candidate import Candidate
from credsweeper.file_handler.byte_content_provider import ByteContentProvider
from credsweeper.file_handler.content_provider import ContentProvider
from credsweeper.file_handler.data_content_provider import DataContentProvider
from credsweeper.file_handler.descriptor import Descriptor
from credsweeper.file_handler.diff_content_provider import DiffContentProvider
from credsweeper.file_handler.file_path_extractor import FilePathExtractor
from credsweeper.file_handler.string_content_provider import StringContentProvider
from credsweeper.file_handler.struct_content_provider import StructContentProvider
from credsweeper.file_handler.text_content_provider import TextContentProvider
from credsweeper.scanner.scanner import Scanner
from credsweeper.utils.util import Util
logger = logging.getLogger(__name__)
[docs]
class AbstractScanner(ABC):
"""Base abstract class for all recursive scanners"""
@property
@abstractmethod
def config(self) -> Config:
"""Abstract property to be defined in DeepScanner"""
raise NotImplementedError(__name__)
@property
@abstractmethod
def scanner(self) -> Scanner:
"""Abstract property to be defined in DeepScanner"""
raise NotImplementedError(__name__)
[docs]
@staticmethod
@abstractmethod
def match(data: bytes | bytearray) -> bool:
"""Abstract method for any deep scanner"""
raise NotImplementedError(__name__)
[docs]
@abstractmethod
def data_scan(
self, #
data_provider: DataContentProvider, #
depth: int, #
recursive_limit_size: int) -> Optional[List[Candidate]]:
"""Abstract method to be defined in DeepScanner"""
raise NotImplementedError(__name__)
[docs]
@staticmethod
@abstractmethod
def get_deep_scanners(data: bytes, descriptor: Descriptor, depth: int, limit: int) -> Tuple[List[Any], List[Any]]:
"""Returns possibly scan methods for the data depends on content and fallback scanners"""
raise NotImplementedError(__name__)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def recursive_scan(
self, #
data_provider: DataContentProvider, #
depth: int = 0, #
recursive_limit_size: int = 0) -> List[Candidate]:
"""Recursive function to scan files which might be containers like ZIP archives
Args:
data_provider: DataContentProvider object may be a container
depth: maximal level of recursion
recursive_limit_size: maximal bytes of opened files to prevent recursive zip-bomb attack
"""
candidates: List[Candidate] = []
if 0 > depth:
# break recursion if maximal depth is reached
logger.debug("Bottom reached %s recursive_limit_size:%d", data_provider.file_path, recursive_limit_size)
return candidates
depth -= 1
data_size = len(data_provider.data)
if MIN_DATA_LEN > data_size:
# break recursion for minimal data size
logger.debug("Too small data: size=%d, depth=%d, limit=%d, path=%s, info=%s", data_size, depth,
recursive_limit_size, data_provider.file_path, data_provider.info)
return candidates
recursive_limit_size -= data_size
if MIN_DATA_LEN > recursive_limit_size:
# break recursion for exhausted size limit
logger.debug("Recursive limit exhausted: size=%d, depth=%d, limit=%d, path=%s, info=%s", data_size, depth,
recursive_limit_size, data_provider.file_path, data_provider.info)
return candidates
logger.debug("Start data_scan: size=%d, depth=%d, limit=%d, path=%s, info=%s", data_size, depth,
recursive_limit_size, data_provider.file_path, data_provider.info)
if FilePathExtractor.is_find_by_ext_file(self.config, data_provider.file_type):
# Skip scanning file and makes fake candidate due the extension is suspicious
dummy_candidate = Candidate.get_dummy_candidate(self.config, data_provider.file_path,
data_provider.file_type, data_provider.info,
FilePathExtractor.FIND_BY_EXT_RULE)
candidates.append(dummy_candidate)
else:
new_candidates = self.deep_scan_with_fallback(data_provider, depth, recursive_limit_size)
augment_candidates(candidates, new_candidates)
return candidates
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
@staticmethod
def key_value_combination(structure: dict) -> Generator[Tuple[Any, Any], None, None]:
"""Combine items by `key` and `value` from a dictionary for augmentation
{..., "key": "api_key", "value": "XXXXXXX", ...} -> ("api_key", "XXXXXXX")
"""
for key_id in ("key", "KEY", "Key"):
if key_id in structure:
struct_key = structure.get(key_id)
break
else:
struct_key = None
if isinstance(struct_key, bytes):
# sqlite table may produce bytes for `key`
with contextlib.suppress(UnicodeError):
struct_key = struct_key.decode(UTF_8)
# only str type is common used for the augmentation
if struct_key and isinstance(struct_key, str):
for value_id in ("value", "VALUE", "Value"):
if value_id in structure:
struct_value = structure.get(value_id)
if struct_value and isinstance(struct_value, (str, bytes)):
yield struct_key, struct_value
# break in successful case
break
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
@staticmethod
def structure_size(structure: Any) -> int:
"""Calculates approximated size of structure data"""
size = len(structure) if isinstance(structure, Sized) else 0
if isinstance(structure, dict):
for key, value in structure.items():
size += AbstractScanner.structure_size(key)
size += AbstractScanner.structure_size(value)
elif isinstance(structure, (list, tuple)):
size += sum(AbstractScanner.structure_size(x) for x in structure)
return size
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
@staticmethod
def structure_processing(structure: Any) -> Generator[Tuple[Any, Any], None, None]:
"""Yields pair `key, value` from given structure if applicable"""
if isinstance(structure, dict):
# transform dictionary to list
for key, value in structure.items():
if not value:
# skip empty values
continue
if isinstance(value, (list, tuple)):
if 1 == len(value):
# simplify some structures like YAML when single item in new line is a value
yield key, value[0]
continue
# all other data will be precessed in next code
yield key, value
yield from AbstractScanner.key_value_combination(structure)
elif isinstance(structure, (list, tuple)):
# enumerate the items to fit for return structure
for key, value in enumerate(structure):
yield key, value
else:
logger.warning("Not supported type:%s val:%s", str(type(structure)), repr(structure))
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def structure_scan(
self, #
struct_provider: StructContentProvider, #
depth: int, #
recursive_limit_size: int) -> List[Candidate]:
"""Recursive function to scan structured data
Args:
struct_provider: DataContentProvider object may be a container
depth: maximal level of recursion
recursive_limit_size: maximal bytes of opened files to prevent recursive zip-bomb attack
"""
candidates: List[Candidate] = []
logger.debug("Start struct_scan: depth=%d, limit=%d, path=%s, info=%s", depth, recursive_limit_size,
struct_provider.file_path, struct_provider.info)
structure_size = AbstractScanner.structure_size(struct_provider.struct) # dbg
recursive_limit_size -= structure_size
if 0 > depth or MIN_DATA_LEN > recursive_limit_size:
# break recursion if maximal depth is reached or recursive_limit_size almost exhausted
logger.debug("Stopping recursion on %s depth:%d, recursive_limit_size:%d", struct_provider.file_path, depth,
recursive_limit_size)
return candidates
depth -= 1
augmented_lines_for_keyword_rules = []
for key, value in AbstractScanner.structure_processing(struct_provider.struct):
# a keyword rule may be applicable for `key` (str only) and `value` (str, bytes)
keyword_match = bool(isinstance(key, str) and self.scanner.keywords_required_substrings_check(key.lower()))
if isinstance(value, (dict, list, tuple)) and value:
# recursive scan for not empty structured `value`
val_struct_provider = StructContentProvider(struct=value,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|STRUCT:{key}")
new_candidates = self.structure_scan(val_struct_provider, depth, recursive_limit_size)
candidates.extend(new_candidates)
elif isinstance(value, bytes):
# recursive data scan
if MIN_DATA_LEN <= len(value):
bytes_struct_provider = DataContentProvider(data=value,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|BYTES:{key}")
new_candidates = self.recursive_scan(bytes_struct_provider, depth, recursive_limit_size)
candidates.extend(new_candidates)
if keyword_match and MIN_VALUE_LENGTH <= len(value):
augmented_lines_for_keyword_rules.append(f"{key} = {repr(value)}")
elif isinstance(value, str):
# recursive text scan with transformation into bytes
stripped_value = value.strip()
if MIN_DATA_LEN <= len(stripped_value):
# recursive scan only for data which may be decoded at least
with contextlib.suppress(UnicodeError):
data = stripped_value.encode(encoding=DEFAULT_ENCODING, errors='strict')
str_struct_provider = DataContentProvider(data=data,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|STRING:{key}")
new_candidates = self.recursive_scan(str_struct_provider, depth, recursive_limit_size)
candidates.extend(new_candidates)
if keyword_match and MIN_VALUE_LENGTH <= len(stripped_value):
augmented_lines_for_keyword_rules.append(f"{key} = {repr(stripped_value)}")
elif not value or isinstance(value, (int, float, datetime.date, datetime.datetime)):
# skip useless types
pass
else:
logger.warning("Not supported type:%s value(%s)", str(type(value)), str(value))
if augmented_lines_for_keyword_rules:
str_provider = StringContentProvider(augmented_lines_for_keyword_rules,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|KEYWORD")
new_candidates = self.scanner.scan(str_provider)
augment_candidates(candidates, new_candidates)
return candidates
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def deep_scan_with_fallback(self, data_provider: DataContentProvider, depth: int,
recursive_limit_size: int) -> List[Candidate]:
"""Scans with deep scanners and fallback scanners if possible
Args:
data_provider: DataContentProvider with raw data
depth: maximal level of recursion
recursive_limit_size: maximal bytes of opened files to prevent recursive zip-bomb attack
Returns: list with candidates
"""
candidates: List[Candidate] = []
deep_scanners, fallback_scanners = self.get_deep_scanners(data_provider.data, data_provider.descriptor, depth,
recursive_limit_size)
fallback = True
for scan_class in deep_scanners:
new_candidates = scan_class.data_scan(self, data_provider, depth, recursive_limit_size)
if new_candidates is None:
# scanner did not recognise the content type
continue
augment_candidates(candidates, new_candidates)
# this scan is successful, so fallback is not necessary
fallback = False
if fallback:
for scan_class in fallback_scanners:
fallback_candidates = scan_class.data_scan(self, data_provider, depth, recursive_limit_size)
if fallback_candidates is None:
continue
augment_candidates(candidates, fallback_candidates)
# use only first successful fallback scanner
break
return candidates
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def scan(self,
content_provider: ContentProvider,
depth: int,
recursive_limit_size: Optional[int] = None) -> List[Candidate]:
"""Initial scan method to launch recursive scan. Skips ByteScanner to prevent extra scan
Args:
content_provider: ContentProvider that might contain raw data
depth: maximal level of recursion
recursive_limit_size: maximal bytes of opened files to prevent recursive zip-bomb attack
"""
if not isinstance(recursive_limit_size, int):
recursive_limit_size = RECURSIVE_SCAN_LIMITATION
candidates: List[Candidate] = []
data: Optional[bytes] = None
if isinstance(content_provider, (TextContentProvider, ByteContentProvider)):
# Feature to scan files which might be containers
data = content_provider.data
info = f"FILE:{content_provider.file_path}"
elif isinstance(content_provider, DiffContentProvider) and content_provider.diff:
candidates = self.scanner.scan(content_provider)
# Feature to scan binary diffs
diff = content_provider.diff[0].get("line")
# the check for legal fix mypy issue
if isinstance(diff, bytes):
data = diff
info = f"DIFF:{content_provider.file_path}"
else:
logger.warning("Content provider %s does not support deep scan", type(content_provider))
info = "NA"
if data:
data_provider = DataContentProvider(data=data,
file_path=content_provider.file_path,
file_type=Util.get_type(content_provider.file_path),
info=content_provider.info or info)
new_candidates = self.deep_scan_with_fallback(data_provider, depth, recursive_limit_size - len(data))
augment_candidates(candidates, new_candidates)
return candidates
[docs]
class LimitError(Exception):
"""Decompressed data exceeds configured limit"""
[docs]
@staticmethod
def read_compressed_with_limit(file: Union[LZMAFile, GzipFile, BZ2File], limit: int) -> bytes:
"""Reads data with check limit for single compressed file"""
size = file.seek(0, io.SEEK_END)
if limit < size:
raise AbstractScanner.LimitError(f"Recursive size limit reached {limit} < {size}")
file.seek(0, io.SEEK_SET)
return file.read(size=limit)