import contextlib
import datetime
import logging
from abc import abstractmethod, ABC
from typing import List, Optional, Tuple, Any, Generator
from credsweeper.common.constants import RECURSIVE_SCAN_LIMITATION, MIN_DATA_LEN, DEFAULT_ENCODING, UTF_8, \
MIN_VALUE_LENGTH
from credsweeper.config.config import Config
from credsweeper.credentials.augment_candidates import augment_candidates
from credsweeper.credentials.candidate import Candidate
from credsweeper.file_handler.byte_content_provider import ByteContentProvider
from credsweeper.file_handler.content_provider import ContentProvider
from credsweeper.file_handler.data_content_provider import DataContentProvider
from credsweeper.file_handler.descriptor import Descriptor
from credsweeper.file_handler.diff_content_provider import DiffContentProvider
from credsweeper.file_handler.file_path_extractor import FilePathExtractor
from credsweeper.file_handler.string_content_provider import StringContentProvider
from credsweeper.file_handler.struct_content_provider import StructContentProvider
from credsweeper.file_handler.text_content_provider import TextContentProvider
from credsweeper.scanner.scanner import Scanner
logger = logging.getLogger(__name__)
[docs]
class AbstractScanner(ABC):
"""Base abstract class for all recursive scanners"""
@property
@abstractmethod
def config(self) -> Config:
"""Abstract property to be defined in DeepScanner"""
raise NotImplementedError(__name__)
@property
@abstractmethod
def scanner(self) -> Scanner:
"""Abstract property to be defined in DeepScanner"""
raise NotImplementedError(__name__)
[docs]
@abstractmethod
def data_scan(
self, #
data_provider: DataContentProvider, #
depth: int, #
recursive_limit_size: int) -> Optional[List[Candidate]]:
"""Abstract method to be defined in DeepScanner"""
raise NotImplementedError(__name__)
[docs]
@staticmethod
@abstractmethod
def get_deep_scanners(data: bytes, descriptor: Descriptor, depth: int) -> Tuple[List[Any], List[Any]]:
"""Returns possibly scan methods for the data depends on content and fallback scanners"""
raise NotImplementedError(__name__)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def recursive_scan(
self, #
data_provider: DataContentProvider, #
depth: int = 0, #
recursive_limit_size: int = 0) -> List[Candidate]:
"""Recursive function to scan files which might be containers like ZIP archives
Args:
data_provider: DataContentProvider object may be a container
depth: maximal level of recursion
recursive_limit_size: maximal bytes of opened files to prevent recursive zip-bomb attack
"""
candidates: List[Candidate] = []
if 0 > depth:
# break recursion if maximal depth is reached
logger.debug("Bottom reached %s recursive_limit_size:%d", data_provider.file_path, recursive_limit_size)
return candidates
depth -= 1
if MIN_DATA_LEN > len(data_provider.data):
# break recursion for minimal data size
logger.debug("Too small data: size=%d, depth=%d, limit=%d, path=%s, info=%s", len(data_provider.data),
depth, recursive_limit_size, data_provider.file_path, data_provider.info)
return candidates
logger.debug("Start data_scan: size=%d, depth=%d, limit=%d, path=%s, info=%s", len(data_provider.data), depth,
recursive_limit_size, data_provider.file_path, data_provider.info)
if FilePathExtractor.is_find_by_ext_file(self.config, data_provider.file_type):
# Skip scanning file and makes fake candidate due the extension is suspicious
dummy_candidate = Candidate.get_dummy_candidate(self.config, data_provider.file_path,
data_provider.file_type, data_provider.info,
FilePathExtractor.FIND_BY_EXT_RULE)
candidates.append(dummy_candidate)
else:
new_candidates = self.deep_scan_with_fallback(data_provider, depth, recursive_limit_size)
augment_candidates(candidates, new_candidates)
return candidates
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
@staticmethod
def key_value_combination(structure: dict) -> Generator[Tuple[Any, Any], None, None]:
"""Combine items by `key` and `value` from a dictionary for augmentation
{..., "key": "api_key", "value": "XXXXXXX", ...} -> ("api_key", "XXXXXXX")
"""
for key_id in ("key", "KEY", "Key"):
if key_id in structure:
struct_key = structure.get(key_id)
break
else:
struct_key = None
if isinstance(struct_key, bytes):
# sqlite table may produce bytes for `key`
with contextlib.suppress(UnicodeError):
struct_key = struct_key.decode(UTF_8)
# only str type is common used for the augmentation
if struct_key and isinstance(struct_key, str):
for value_id in ("value", "VALUE", "Value"):
if value_id in structure:
struct_value = structure.get(value_id)
if struct_value and isinstance(struct_value, (str, bytes)):
yield struct_key, struct_value
# break in successful case
break
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
@staticmethod
def structure_processing(structure: Any) -> Generator[Tuple[Any, Any], None, None]:
"""Yields pair `key, value` from given structure if applicable"""
if isinstance(structure, dict):
# transform dictionary to list
for key, value in structure.items():
if not value:
# skip empty values
continue
if isinstance(value, (list, tuple)):
if 1 == len(value):
# simplify some structures like YAML when single item in new line is a value
yield key, value[0]
continue
# all other data will be precessed in next code
yield key, value
yield from AbstractScanner.key_value_combination(structure)
elif isinstance(structure, (list, tuple)):
# enumerate the items to fit for return structure
for key, value in enumerate(structure):
yield key, value
else:
logger.warning("Not supported type:%s val:%s", str(type(structure)), repr(structure))
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def structure_scan(
self, #
struct_provider: StructContentProvider, #
depth: int, #
recursive_limit_size: int) -> List[Candidate]:
"""Recursive function to scan structured data
Args:
struct_provider: DataContentProvider object may be a container
depth: maximal level of recursion
recursive_limit_size: maximal bytes of opened files to prevent recursive zip-bomb attack
"""
candidates: List[Candidate] = []
logger.debug("Start struct_scan: depth=%d, limit=%d, path=%s, info=%s", depth, recursive_limit_size,
struct_provider.file_path, struct_provider.info)
if 0 > depth:
# break recursion if maximal depth is reached
logger.debug("Bottom reached %s recursive_limit_size:%d", struct_provider.file_path, recursive_limit_size)
return candidates
depth -= 1
augmented_lines_for_keyword_rules = []
for key, value in AbstractScanner.structure_processing(struct_provider.struct):
# a keyword rule may be applicable for `key` (str only) and `value` (str, bytes)
keyword_match = bool(isinstance(key, str) and self.scanner.keywords_required_substrings_check(key.lower()))
if isinstance(value, (dict, list, tuple)) and value:
# recursive scan for not empty structured `value`
val_struct_provider = StructContentProvider(struct=value,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|STRUCT:{key}")
new_candidates = self.structure_scan(val_struct_provider, depth, recursive_limit_size)
candidates.extend(new_candidates)
elif isinstance(value, bytes):
# recursive data scan
if MIN_DATA_LEN <= len(value):
bytes_struct_provider = DataContentProvider(data=value,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|BYTES:{key}")
new_limit = recursive_limit_size - len(value)
new_candidates = self.recursive_scan(bytes_struct_provider, depth, new_limit)
candidates.extend(new_candidates)
if keyword_match and MIN_VALUE_LENGTH <= len(value):
augmented_lines_for_keyword_rules.append(f"{key} = {repr(value)}")
elif isinstance(value, str):
# recursive text scan with transformation into bytes
stripped_value = value.strip()
if MIN_DATA_LEN <= len(stripped_value):
# recursive scan only for data which may be decoded at least
with contextlib.suppress(UnicodeError):
data = stripped_value.encode(encoding=DEFAULT_ENCODING, errors='strict')
str_struct_provider = DataContentProvider(data=data,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|STRING:{key}")
new_limit = recursive_limit_size - len(str_struct_provider.data)
new_candidates = self.recursive_scan(str_struct_provider, depth, new_limit)
candidates.extend(new_candidates)
if keyword_match and MIN_VALUE_LENGTH <= len(stripped_value):
augmented_lines_for_keyword_rules.append(f"{key} = {repr(stripped_value)}")
elif not value or isinstance(value, (int, float, datetime.date, datetime.datetime)):
# skip useless types
pass
else:
logger.warning("Not supported type:%s value(%s)", str(type(value)), str(value))
if augmented_lines_for_keyword_rules:
str_provider = StringContentProvider(augmented_lines_for_keyword_rules,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|KEYWORD")
new_candidates = self.scanner.scan(str_provider)
augment_candidates(candidates, new_candidates)
return candidates
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def deep_scan_with_fallback(self, data_provider: DataContentProvider, depth: int,
recursive_limit_size: int) -> List[Candidate]:
"""Scans with deep scanners and fallback scanners if possible
Args:
data_provider: DataContentProvider with raw data
depth: maximal level of recursion
recursive_limit_size: maximal bytes of opened files to prevent recursive zip-bomb attack
Returns: list with candidates
"""
candidates: List[Candidate] = []
deep_scanners, fallback_scanners = self.get_deep_scanners(data_provider.data, data_provider.descriptor, depth)
fallback = True
for scan_class in deep_scanners:
new_candidates = scan_class.data_scan(self, data_provider, depth, recursive_limit_size)
if new_candidates is None:
# scanner did not recognise the content type
continue
augment_candidates(candidates, new_candidates)
# this scan is successful, so fallback is not necessary
fallback = False
if fallback:
for scan_class in fallback_scanners:
fallback_candidates = scan_class.data_scan(self, data_provider, depth, recursive_limit_size)
if fallback_candidates is None:
continue
augment_candidates(candidates, fallback_candidates)
# use only first successful fallback scanner
break
return candidates
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def scan(self,
content_provider: ContentProvider,
depth: int,
recursive_limit_size: Optional[int] = None) -> List[Candidate]:
"""Initial scan method to launch recursive scan. Skips ByteScanner to prevent extra scan
Args:
content_provider: ContentProvider that might contain raw data
depth: maximal level of recursion
recursive_limit_size: maximal bytes of opened files to prevent recursive zip-bomb attack
"""
recursive_limit_size = recursive_limit_size if isinstance(recursive_limit_size,
int) else RECURSIVE_SCAN_LIMITATION
candidates: List[Candidate] = []
data: Optional[bytes] = None
if isinstance(content_provider, (TextContentProvider, ByteContentProvider)):
# Feature to scan files which might be containers
data = content_provider.data
info = f"FILE:{content_provider.file_path}"
elif isinstance(content_provider, DiffContentProvider) and content_provider.diff:
candidates = self.scanner.scan(content_provider)
# Feature to scan binary diffs
diff = content_provider.diff[0].get("line")
# the check for legal fix mypy issue
if isinstance(diff, bytes):
data = diff
info = f"DIFF:{content_provider.file_path}"
else:
logger.warning("Content provider %s does not support deep scan", type(content_provider))
info = "NA"
if data:
data_provider = DataContentProvider(data=data,
file_path=content_provider.file_path,
file_type=content_provider.file_type,
info=content_provider.info or info)
new_candidates = self.deep_scan_with_fallback(data_provider, depth, recursive_limit_size - len(data))
augment_candidates(candidates, new_candidates)
return candidates