import json
import logging
import multiprocessing
import signal
from pathlib import Path
from typing import Any, List, Optional, Union, Dict, Sequence, Tuple
import pandas as pd
from colorama import Style
# Directory of credsweeper sources MUST be placed before imports to avoid circular import error
APP_PATH = Path(__file__).resolve().parent
from credsweeper.scanner.scanner import Scanner
from credsweeper.common.constants import Severity, ThresholdPreset, DiffRowType, DEFAULT_ENCODING
from credsweeper.config.config import Config
from credsweeper.credentials.candidate import Candidate
from credsweeper.credentials.candidate_key import CandidateKey
from credsweeper.credentials.credential_manager import CredentialManager
from credsweeper.deep_scanner.deep_scanner import DeepScanner
from credsweeper.file_handler.content_provider import ContentProvider
from credsweeper.file_handler.file_path_extractor import FilePathExtractor
from credsweeper.file_handler.abstract_provider import AbstractProvider
from credsweeper.ml_model.ml_validator import MlValidator
from credsweeper.utils.util import Util
logger = logging.getLogger(__name__)
[docs]
class CredSweeper:
"""Advanced credential analyzer base class.
Parameters:
credential_manager: CredSweeper credential manager object
scanner: CredSweeper scanner object
pool_count: number of pools used to run multiprocessing scanning
config: dictionary variable, stores analyzer features
json_filename: string variable, credential candidates export filename
"""
def __init__(self,
rule_path: Union[None, str, Path] = None,
config_path: Optional[str] = None,
json_filename: Union[None, str, Path] = None,
xlsx_filename: Union[None, str, Path] = None,
stdout: bool = False,
color: bool = False,
hashed: bool = False,
subtext: bool = False,
sort_output: bool = False,
use_filters: bool = True,
pool_count: int = 1,
ml_batch_size: Optional[int] = None,
ml_threshold: Union[int, float, ThresholdPreset] = ThresholdPreset.medium,
ml_config: Union[None, str, Path] = None,
ml_model: Union[None, str, Path] = None,
ml_providers: Optional[str] = None,
find_by_ext: bool = False,
pedantic: bool = False,
depth: int = 0,
doc: bool = False,
severity: Union[Severity, str] = Severity.INFO,
size_limit: Optional[str] = None,
exclude_lines: Optional[List[str]] = None,
exclude_values: Optional[List[str]] = None,
thrifty: bool = False,
log_level: Optional[str] = None) -> None:
"""Initialize Advanced credential scanner.
Args:
rule_path: optional str variable, path of rule config file
validation was the grained candidate model on machine learning
config_path: optional str variable, path of CredSweeper config file
default built-in config is used if None
json_filename: optional string variable, path to save result to json
xlsx_filename: optional string variable, path to save result to xlsx
stdout: print results to stdout
color: print concise results to stdout with colorization
hashed: use hash of line, value and variable instead plain text
subtext: use subtext of line near variable-value like it performed in ML
use_filters: boolean variable, specifying the need of rule filters
pool_count: int value, number of parallel processes to use
ml_batch_size: int value, size of the batch for model inference
ml_threshold: float or string value to specify threshold for the ml model
ml_config: str or Path to set custom config of ml model
ml_model: str or Path to set custom ml model
ml_providers: str - comma separated list with providers
find_by_ext: boolean - files will be reported by extension
pedantic: boolean - scan all files
depth: int - how deep container files will be scanned
doc: boolean - document-specific scanning
severity: Severity - minimum severity level of rule
size_limit: optional string integer or human-readable format to skip oversize files
exclude_lines: lines to omit in scan. Will be added to the lines already in config
exclude_values: values to omit in scan. Will be added to the values already in config
thrifty: free provider resources after scan to reduce memory consumption
log_level: str - level for pool initializer according logging levels (UPPERCASE)
"""
self.pool_count: int = max(1, int(pool_count))
if not (_severity := Severity.get(severity)):
raise RuntimeError(f"Severity level provided: {severity}"
f" -- must be one of: {' | '.join([i.value for i in Severity])}")
config_dict = self._get_config_dict(config_path=config_path,
use_filters=use_filters,
find_by_ext=find_by_ext,
pedantic=pedantic,
depth=depth,
doc=doc,
severity=_severity,
size_limit=size_limit,
exclude_lines=exclude_lines,
exclude_values=exclude_values)
self.config = Config(config_dict)
self.scanner = Scanner(self.config, rule_path)
self.deep_scanner = DeepScanner(self.config, self.scanner)
self.credential_manager = CredentialManager()
self.json_filename: Union[None, str, Path] = json_filename
self.xlsx_filename: Union[None, str, Path] = xlsx_filename
self.stdout = stdout
self.color = color
self.hashed = hashed
self.subtext = subtext
self.sort_output = sort_output
self.ml_batch_size = ml_batch_size if ml_batch_size and 0 < ml_batch_size else 16
self.ml_threshold = ml_threshold
self.ml_config = ml_config
self.ml_model = ml_model
self.ml_providers = ml_providers
self.__thrifty = thrifty
self.__log_level = log_level
self.__ml_validator: Optional[MlValidator] = None
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@staticmethod
def _get_config_path(config_path: Optional[str]) -> Path:
if config_path:
return Path(config_path)
return APP_PATH / "secret" / "config.json"
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
def _get_config_dict(
self, #
config_path: Optional[str], #
use_filters: bool, #
find_by_ext: bool, #
pedantic: bool, #
depth: int, #
doc: bool, #
severity: Severity, #
size_limit: Optional[str], #
exclude_lines: Optional[List[str]], #
exclude_values: Optional[List[str]]) -> Dict[str, Any]:
config_dict = Util.json_load(self._get_config_path(config_path))
config_dict["use_filters"] = use_filters
config_dict["find_by_ext"] = find_by_ext
config_dict["size_limit"] = size_limit
config_dict["pedantic"] = pedantic
config_dict["depth"] = depth
config_dict["doc"] = doc
config_dict["severity"] = severity.value
if exclude_lines is not None:
config_dict["exclude"]["lines"] = config_dict["exclude"].get("lines", []) + exclude_lines
if exclude_values is not None:
config_dict["exclude"]["values"] = config_dict["exclude"].get("values", []) + exclude_values
return config_dict # type: ignore
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
def _use_ml_validation(self) -> bool:
if isinstance(self.ml_threshold, int) and 0 == self.ml_threshold:
logger.info("ML validation is disabled")
return False
if not self.credential_manager.candidates:
logger.info("Skip ML validation because no candidates were found")
return False
for i in self.credential_manager.candidates:
if i.use_ml:
# any() or all() is not used to speedup
return True
logger.info("Skip ML validation because no candidates support it")
return False
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@property
def ml_validator(self) -> MlValidator:
"""ml_validator getter"""
if not self.__ml_validator:
self.__ml_validator = MlValidator(
threshold=self.ml_threshold, #
ml_config=self.ml_config, #
ml_model=self.ml_model, #
ml_providers=self.ml_providers, #
)
if not self.__ml_validator:
raise RuntimeError("MlValidator was not initialized!")
return self.__ml_validator
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
@staticmethod
def pool_initializer(log_kwargs) -> None:
"""Ignore SIGINT in child processes."""
logging.basicConfig(**log_kwargs)
signal.signal(signal.SIGINT, signal.SIG_IGN)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def run(self, content_provider: AbstractProvider) -> int:
"""Run an analysis of 'content_provider' object.
Args:
content_provider: path objects to scan
"""
_empty_list: Sequence[ContentProvider] = []
file_extractors = content_provider.get_scannable_files(self.config) if content_provider else _empty_list
if not file_extractors:
logger.info("No scannable targets for %s paths", len(content_provider.paths))
return 0
self.scan(file_extractors)
self.post_processing()
# PatchesProvider has the attribute. Circular import error appears with using the isinstance
change_type = content_provider.change_type if hasattr(content_provider, "change_type") else None
self.export_results(change_type)
return self.credential_manager.len_credentials()
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def scan(self, content_providers: Sequence[ContentProvider]) -> None:
"""Run scanning of files from an argument "content_providers".
Args:
content_providers: file objects to scan
"""
if 1 < self.pool_count and 1 < len(content_providers):
self.__multi_jobs_scan(content_providers)
else:
self.__single_job_scan(content_providers)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
def __single_job_scan(self, content_providers: Sequence[ContentProvider]) -> None:
"""Performs scan in main thread"""
logger.info("Scan for %s providers", len(content_providers))
all_cred = self.files_scan(content_providers)
self.credential_manager.set_credentials(all_cred)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
def __multi_jobs_scan(self, content_providers: Sequence[ContentProvider]) -> None:
"""Performs scan with multiple jobs"""
# use this separation to satisfy YAPF formatter
yapfix = "%(asctime)s | %(levelname)s | %(processName)s:%(threadName)s | %(filename)s:%(lineno)s | %(message)s"
log_kwargs = {"format": yapfix}
if isinstance(self.__log_level, str):
# is not None
if "SILENCE" == self.__log_level:
logging.addLevelName(60, "SILENCE")
log_kwargs["level"] = self.__log_level
pool_count = min(self.pool_count, len(content_providers))
logger.info("Scan in %s processes for %s providers", pool_count, len(content_providers))
with multiprocessing.get_context("spawn").Pool(processes=pool_count,
initializer=CredSweeper.pool_initializer,
initargs=(log_kwargs,)) as pool: # yapf: disable
try:
for scan_results in pool.imap_unordered(self.files_scan,
(content_providers[x::pool_count] for x in range(pool_count))):
for cred in scan_results:
self.credential_manager.add_credential(cred)
except KeyboardInterrupt:
pool.terminate()
pool.join()
raise
pool.close()
pool.join()
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def files_scan(self, content_providers: Sequence[ContentProvider]) -> List[Candidate]:
"""Auxiliary method for scan one sequence"""
all_cred: List[Candidate] = []
for provider in content_providers:
candidates = self.file_scan(provider)
if self.__thrifty:
provider.free()
all_cred.extend(candidates)
logger.info("Completed: processed %s providers with %s candidates", len(content_providers), len(all_cred))
return all_cred
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def file_scan(self, content_provider: ContentProvider) -> List[Candidate]:
"""Run scanning of file from 'file_provider'.
Args:
content_provider: content provider object to scan
Return:
list of credential candidates from scanned file
"""
candidates: List[Candidate] = []
logger.debug("Start scan file: %s %s", content_provider.file_path, content_provider.info)
if FilePathExtractor.is_find_by_ext_file(self.config, content_provider.file_type):
# Skip the file scanning and create fake candidate because the extension is suspicious
dummy_candidate = Candidate.get_dummy_candidate(self.config, content_provider.file_path,
content_provider.file_type, content_provider.info,
FilePathExtractor.FIND_BY_EXT_RULE)
candidates.append(dummy_candidate)
else:
if self.config.depth or self.config.doc:
# deep scan with possible data representation
candidates = self.deep_scanner.scan(content_provider, self.config.depth, self.config.size_limit)
else:
if content_provider.file_type not in self.config.exclude_containers:
# Regular file scanning
candidates = self.scanner.scan(content_provider)
# finally return result from 'file_scan'
return candidates
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def post_processing(self) -> None:
"""Machine learning validation for received credential candidates."""
if purged := self.credential_manager.purge_duplicates():
logger.info("Purged %s duplicates", purged)
if self._use_ml_validation():
logger.info("Grouping %s candidates", len(self.credential_manager.candidates))
new_cred_list: List[Candidate] = []
cred_groups = self.credential_manager.group_credentials()
ml_cred_groups: List[Tuple[CandidateKey, List[Candidate]]] = []
for group_key, group_candidates in cred_groups.items():
# Analyze with ML if any candidate in group require ML
for candidate in group_candidates:
if candidate.use_ml:
ml_cred_groups.append((group_key, group_candidates))
break
else:
# all candidates do not require ML
new_cred_list.extend(group_candidates)
# prevent extra ml_validator creation if ml_cred_groups is empty
if ml_cred_groups:
logger.info("Run ML Validation for %s groups", len(ml_cred_groups))
is_cred, probability = self.ml_validator.validate_groups(ml_cred_groups, self.ml_batch_size)
for i, (_, group_candidates) in enumerate(ml_cred_groups):
for candidate in group_candidates:
if candidate.use_ml:
if is_cred[i]:
candidate.ml_probability = probability[i]
new_cred_list.append(candidate)
else:
new_cred_list.append(candidate)
else:
logger.info("Skipping ML validation due not applicable")
self.credential_manager.set_credentials(new_cred_list)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
[docs]
def export_results(self, change_type: Optional[DiffRowType] = None) -> None:
"""
Save credential candidates to json file or print them to a console.
Args:
change_type: flag to know which file should be created for a patch
"""
credentials = self.credential_manager.get_credentials()
logger.info("Exporting %s credentials", len(credentials))
if self.sort_output:
credentials.sort(key=lambda x: ( #
x.line_data_list[0].path, #
x.line_data_list[0].line_num, #
x.severity, #
x.rule_name, #
x.line_data_list[0].value_start, #
x.line_data_list[0].value_end #
))
if self.json_filename:
json_path = Path(self.json_filename)
if isinstance(change_type, DiffRowType):
# add suffix for appropriated reports to create two files for the patch scan
json_path = json_path.with_suffix(f".{change_type.value}{json_path.suffix}")
with open(json_path, 'w', encoding=DEFAULT_ENCODING) as f:
# use the approach to reduce total memory usage in case of huge data
first_item = True
f.write('[\n')
for credential in credentials:
if first_item:
first_item = False
else:
f.write(",\n")
f.write(json.dumps(credential.to_json(hashed=self.hashed, subtext=self.subtext), indent=4))
f.write("\n]")
if self.xlsx_filename:
data_list = []
for credential in credentials:
data_list.extend(credential.to_dict_list(hashed=self.hashed, subtext=self.subtext))
df = pd.DataFrame(data=data_list)
if isinstance(change_type, DiffRowType):
if Path(self.xlsx_filename).exists():
with pd.ExcelWriter(self.xlsx_filename, mode='a', engine="openpyxl",
if_sheet_exists="replace") as writer:
df.to_excel(writer, sheet_name=change_type.value, index=False)
else:
df.to_excel(self.xlsx_filename, sheet_name=change_type.value, index=False)
else:
df.to_excel(self.xlsx_filename, sheet_name="report", index=False)
if self.color:
for credential in credentials:
for line_data in credential.line_data_list:
# bright rule name and path or info
if isinstance(credential.ml_probability, float):
ml_probability_info = f" {credential.ml_probability:.6f}"
else:
ml_probability_info = ""
print(Style.BRIGHT + credential.rule_name +
f" {line_data.info or line_data.path}:{line_data.line_num}{ml_probability_info}" +
Style.RESET_ALL)
print(line_data.get_colored_line(hashed=self.hashed, subtext=self.subtext))
if self.stdout:
for credential in credentials:
print(credential.to_str(hashed=self.hashed, subtext=self.subtext))