import logging
import multiprocessing
import signal
from pathlib import Path
from typing import Any, List, Optional, Union, Dict, Sequence, Tuple
import pandas as pd
# Directory of credsweeper sources MUST be placed before imports to avoid circular import error
APP_PATH = Path(__file__).resolve().parent
from credsweeper.common.constants import KeyValidationOption, Severity, ThresholdPreset
from credsweeper.config import Config
from credsweeper.credentials import Candidate, CredentialManager, CandidateKey
from credsweeper.deep_scanner.deep_scanner import DeepScanner
from credsweeper.file_handler.diff_content_provider import DiffContentProvider
from credsweeper.file_handler.file_path_extractor import FilePathExtractor
from credsweeper.file_handler.abstract_provider import AbstractProvider
from credsweeper.file_handler.text_content_provider import TextContentProvider
from credsweeper.scanner import Scanner
from credsweeper.utils import Util
from credsweeper.validations.apply_validation import ApplyValidation
logger = logging.getLogger(__name__)
[docs]
class CredSweeper:
"""Advanced credential analyzer base class.
Parameters:
credential_manager: CredSweeper credential manager object
scanner: CredSweeper scanner object
pool_count: number of pools used to run multiprocessing scanning
config: dictionary variable, stores analyzer features
json_filename: string variable, credential candidates export filename
"""
def __init__(self,
rule_path: Union[None, str, Path] = None,
config_path: Optional[str] = None,
api_validation: bool = False,
json_filename: Union[None, str, Path] = None,
xlsx_filename: Union[None, str, Path] = None,
hashed: bool = False,
subtext: bool = False,
sort_output: bool = False,
use_filters: bool = True,
pool_count: int = 1,
ml_batch_size: Optional[int] = None,
ml_threshold: Union[float, ThresholdPreset] = ThresholdPreset.medium,
ml_config: Union[None, str, Path] = None,
ml_model: Union[None, str, Path] = None,
ml_providers: Optional[str] = None,
find_by_ext: bool = False,
depth: int = 0,
doc: bool = False,
severity: Union[Severity, str] = Severity.INFO,
size_limit: Optional[str] = None,
exclude_lines: Optional[List[str]] = None,
exclude_values: Optional[List[str]] = None,
log_level: Optional[str] = None) -> None:
"""Initialize Advanced credential scanner.
Args:
rule_path: optional str variable, path of rule config file
validation was the grained candidate model on machine learning
config_path: optional str variable, path of CredSweeper config file
default built-in config is used if None
api_validation: optional boolean variable, specifying the need of
parallel API validation
json_filename: optional string variable, path to save result
to json
xlsx_filename: optional string variable, path to save result
to xlsx
hashed: use hash of line, value and variable instead plain text
subtext: use subtext of line near variable-value like it performed in ML
use_filters: boolean variable, specifying the need of rule filters
pool_count: int value, number of parallel processes to use
ml_batch_size: int value, size of the batch for model inference
ml_threshold: float or string value to specify threshold for the ml model
ml_config: str or Path to set custom config of ml model
ml_model: str or Path to set custom ml model
ml_providers: str - comma separated list with providers
find_by_ext: boolean - files will be reported by extension
depth: int - how deep container files will be scanned
doc: boolean - document-specific scanning
severity: Severity - minimum severity level of rule
size_limit: optional string integer or human-readable format to skip oversize files
exclude_lines: lines to omit in scan. Will be added to the lines already in config
exclude_values: values to omit in scan. Will be added to the values already in config
log_level: str - level for pool initializer according logging levels (UPPERCASE)
"""
self.pool_count: int = int(pool_count) if int(pool_count) > 1 else 1
if not (_severity := Severity.get(severity)):
raise RuntimeError(f"Severity level provided: {severity}"
f" -- must be one of: {' | '.join([i.value for i in Severity])}")
config_dict = self._get_config_dict(config_path=config_path,
api_validation=api_validation,
use_filters=use_filters,
find_by_ext=find_by_ext,
depth=depth,
doc=doc,
severity=_severity,
size_limit=size_limit,
exclude_lines=exclude_lines,
exclude_values=exclude_values)
self.config = Config(config_dict)
self.scanner = Scanner(self.config, rule_path)
self.deep_scanner = DeepScanner(self.config, self.scanner)
self.credential_manager = CredentialManager()
self.json_filename: Union[None, str, Path] = json_filename
self.xlsx_filename: Union[None, str, Path] = xlsx_filename
self.hashed = hashed
self.subtext = subtext
self.sort_output = sort_output
self.ml_batch_size = ml_batch_size if ml_batch_size and 0 < ml_batch_size else 16
self.ml_threshold = ml_threshold
self.ml_config = ml_config
self.ml_model = ml_model
self.ml_providers = ml_providers
self.ml_validator = None
self.__log_level = log_level
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@staticmethod
def _get_config_path(config_path: Optional[str]) -> Path:
if config_path:
return Path(config_path)
else:
return APP_PATH / "secret" / "config.json"
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
def _get_config_dict(
self, #
config_path: Optional[str], #
api_validation: bool, #
use_filters: bool, #
find_by_ext: bool, #
depth: int, #
doc: bool, #
severity: Severity, #
size_limit: Optional[str], #
exclude_lines: Optional[List[str]], #
exclude_values: Optional[List[str]]) -> Dict[str, Any]:
config_dict = Util.json_load(self._get_config_path(config_path))
config_dict["validation"] = {}
config_dict["validation"]["api_validation"] = api_validation
config_dict["use_filters"] = use_filters
config_dict["find_by_ext"] = find_by_ext
config_dict["size_limit"] = size_limit
config_dict["depth"] = depth
config_dict["doc"] = doc
config_dict["severity"] = severity.value
if exclude_lines is not None:
config_dict["exclude"]["lines"] = config_dict["exclude"].get("lines", []) + exclude_lines
if exclude_values is not None:
config_dict["exclude"]["values"] = config_dict["exclude"].get("values", []) + exclude_values
return config_dict # type: ignore
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
def _use_ml_validation(self) -> bool:
if isinstance(self.ml_threshold, (float, int)) and 0 >= self.ml_threshold:
logger.info("ML validation is disabled")
return False
if not self.credential_manager.candidates:
logger.info("Skip ML validation because no candidates were found")
return False
for i in self.credential_manager.candidates:
if i.use_ml:
# any() or all() is not used to speedup
return True
logger.info("Skip ML validation because no candidates support it")
return False
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# the import cannot be done on top due
# TypeError: cannot pickle 'onnxruntime.capi.onnxruntime_pybind11_state.InferenceSession' object
from credsweeper.ml_model import MlValidator
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@property
def ml_validator(self) -> MlValidator:
"""ml_validator getter"""
from credsweeper.ml_model import MlValidator
if not self.__ml_validator:
self.__ml_validator: MlValidator = MlValidator(
threshold=self.ml_threshold, #
ml_config=self.ml_config, #
ml_model=self.ml_model, #
ml_providers=self.ml_providers, #
)
assert self.__ml_validator, "self.__ml_validator was not initialized"
return self.__ml_validator
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@ml_validator.setter
def ml_validator(self, _ml_validator: Optional[MlValidator]) -> None:
"""ml_validator setter"""
self.__ml_validator = _ml_validator
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
@staticmethod
def pool_initializer(log_kwargs) -> None:
"""Ignore SIGINT in child processes."""
logging.basicConfig(**log_kwargs)
signal.signal(signal.SIGINT, signal.SIG_IGN)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@property
def config(self) -> Config:
"""config getter"""
return self.__config
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@config.setter
def config(self, config: Config) -> None:
"""config setter"""
self.__config = config
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def run(self, content_provider: AbstractProvider) -> int:
"""Run an analysis of 'content_provider' object.
Args:
content_provider: path objects to scan
"""
_empty_list: Sequence[Union[DiffContentProvider, TextContentProvider]] = []
file_extractors: Sequence[Union[DiffContentProvider, TextContentProvider]] = \
content_provider.get_scannable_files(self.config) if content_provider else _empty_list
logger.info(f"Start Scanner for {len(file_extractors)} providers")
self.scan(file_extractors)
self.post_processing()
self.export_results()
return len(self.credential_manager.get_credentials())
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def scan(self, content_providers: Sequence[Union[DiffContentProvider, TextContentProvider]]) -> None:
"""Run scanning of files from an argument "content_providers".
Args:
content_providers: file objects to scan
"""
if 1 < self.pool_count:
self.__multi_jobs_scan(content_providers)
else:
self.__single_job_scan(content_providers)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
def __single_job_scan(self, content_providers: Sequence[Union[DiffContentProvider, TextContentProvider]]) -> None:
"""Performs scan in main thread"""
all_cred = self.files_scan(content_providers)
if self.config.api_validation:
api_validation = ApplyValidation()
for cred in all_cred:
logger.info("Run API Validation")
cred.api_validation = api_validation.validate(cred)
self.credential_manager.add_credential(cred)
else:
self.credential_manager.set_credentials(all_cred)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
def __multi_jobs_scan(self, content_providers: Sequence[Union[DiffContentProvider, TextContentProvider]]) -> None:
"""Performs scan with multiple jobs"""
# use this separation to satisfy YAPF formatter
yapfix = "%(asctime)s | %(levelname)s | %(processName)s:%(threadName)s | %(filename)s:%(lineno)s | %(message)s"
log_kwargs = {"format": yapfix}
if isinstance(self.__log_level, str):
# is not None
if "SILENCE" == self.__log_level:
logging.addLevelName(60, "SILENCE")
log_kwargs["level"] = self.__log_level
# providers_map: List[Sequence[Union[DiffContentProvider, TextContentProvider]]] = \
# [content_providers[x::self.pool_count] for x in range(self.pool_count)]
with multiprocessing.get_context("spawn").Pool(processes=self.pool_count,
initializer=self.pool_initializer,
initargs=(log_kwargs, )) as pool:
try:
for scan_results in pool.imap_unordered(self.files_scan, (content_providers[x::self.pool_count]
for x in range(self.pool_count))):
for cred in scan_results:
self.credential_manager.add_credential(cred)
if self.config.api_validation:
logger.info("Run API Validation")
api_validation = ApplyValidation()
api_validation.validate_credentials(pool, self.credential_manager)
except KeyboardInterrupt:
pool.terminate()
pool.join()
raise
pool.close()
pool.join()
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def files_scan(
self, #
content_providers: Sequence[Union[DiffContentProvider, TextContentProvider]]) -> List[Candidate]:
"""Auxiliary method for scan one sequence"""
all_cred: List[Candidate] = []
for i in content_providers:
candidates = self.file_scan(i)
all_cred.extend(candidates)
logger.info(f"Completed: processed {len(content_providers)} providers with {len(all_cred)} candidates")
return all_cred
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def file_scan(self, content_provider: Union[DiffContentProvider, TextContentProvider]) -> List[Candidate]:
"""Run scanning of file from 'file_provider'.
Args:
content_provider: content provider object to scan
Return:
list of credential candidates from scanned file
"""
candidates: List[Candidate] = []
logger.debug("Start scan file: %s %s", content_provider.file_path, content_provider.info)
if FilePathExtractor.is_find_by_ext_file(self.config, content_provider.file_type):
# Skip the file scanning and create fake candidate because the extension is suspicious
dummy_candidate = Candidate.get_dummy_candidate(self.config, content_provider.file_path,
content_provider.file_type, content_provider.info)
candidates.append(dummy_candidate)
else:
if self.config.depth or self.config.doc:
# deep scan with possible data representation
candidates = self.deep_scanner.scan(content_provider, self.config.depth, self.config.size_limit)
else:
if content_provider.file_type not in self.config.exclude_containers:
# Regular file scanning
candidates = self.scanner.scan(content_provider)
# finally return result from 'file_scan'
return candidates
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def post_processing(self) -> None:
"""Machine learning validation for received credential candidates."""
if purged := self.credential_manager.purge_duplicates():
logger.info(f"Purged {purged} duplicates")
if self._use_ml_validation():
logger.info(f"Grouping {len(self.credential_manager.candidates)} candidates")
new_cred_list: List[Candidate] = []
cred_groups = self.credential_manager.group_credentials()
ml_cred_groups: List[Tuple[CandidateKey, List[Candidate]]] = []
for group_key, group_candidates in cred_groups.items():
# Analyze with ML if any candidate in group require ML
for candidate in group_candidates:
if candidate.use_ml:
ml_cred_groups.append((group_key, group_candidates))
break
else:
# all candidates do not require ML
new_cred_list.extend(group_candidates)
# prevent extra ml_validator creation if ml_cred_groups is empty
if ml_cred_groups:
logger.info(f"Run ML Validation for {len(ml_cred_groups)} groups")
is_cred, probability = self.ml_validator.validate_groups(ml_cred_groups, self.ml_batch_size)
for i, (_, group_candidates) in enumerate(ml_cred_groups):
for candidate in group_candidates:
if candidate.use_ml:
if is_cred[i]:
candidate.ml_validation = KeyValidationOption.VALIDATED_KEY
candidate.ml_probability = probability[i]
new_cred_list.append(candidate)
else:
candidate.ml_validation = KeyValidationOption.NOT_AVAILABLE
new_cred_list.append(candidate)
else:
logger.info("Skipping ML validation due not applicable")
self.credential_manager.set_credentials(new_cred_list)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def export_results(self) -> None:
"""Save credential candidates to json file or print them to a console."""
is_exported = False
credentials = self.credential_manager.get_credentials()
if self.sort_output:
credentials.sort(key=lambda x: ( #
x.line_data_list[0].path, #
x.line_data_list[0].line_num, #
x.severity, #
x.rule_name, #
x.line_data_list[0].value_start, #
x.line_data_list[0].value_end #
))
if self.json_filename:
is_exported = True
Util.json_dump([credential.to_json(hashed=self.hashed, subtext=self.subtext) for credential in credentials],
file_path=self.json_filename)
if self.xlsx_filename:
is_exported = True
data_list = []
for credential in credentials:
data_list.extend(credential.to_dict_list(hashed=self.hashed, subtext=self.subtext))
df = pd.DataFrame(data=data_list)
df.to_excel(self.xlsx_filename, index=False)
if is_exported is False:
for credential in credentials:
print(credential.to_str(hashed=self.hashed, subtext=self.subtext))