Source code for credsweeper.ml_model.ml_validator

import hashlib
import json
import logging
from pathlib import Path
from typing import List, Tuple, Union, Optional, Dict

import numpy as np
from onnxruntime import InferenceSession

from credsweeper.common.constants import ThresholdPreset, ML_HUNK
from credsweeper.credentials.candidate import Candidate
from credsweeper.credentials.candidate_key import CandidateKey
from credsweeper.ml_model import features
from credsweeper.utils.util import Util

logger = logging.getLogger(__name__)


[docs] class MlValidator: """ML validation class""" MAX_LEN = 2 * ML_HUNK # for whole line limit # used for initial fill ZERO_CHAR = '\x00' # applied for unknown characters FAKE_CHAR = '\x01' _dir_path = Path(__file__).parent def __init__( self, # threshold: Union[float, ThresholdPreset], # ml_config: Union[None, str, Path] = None, # ml_model: Union[None, str, Path] = None, # ml_providers: Optional[str] = None) -> None: """Init Args: threshold: decision threshold ml_config: path to ml config ml_model: path to ml model ml_providers: coma separated list of providers https://onnxruntime.ai/docs/execution-providers/ """ self.__session: Optional[InferenceSession] = None if ml_config: ml_config_path = Path(ml_config) else: ml_config_path = MlValidator._dir_path / "ml_config.json" with open(ml_config_path, "rb") as f: __ml_config_data = f.read() model_config = json.loads(__ml_config_data) if ml_model: ml_model_path = Path(ml_model) else: ml_model_path = MlValidator._dir_path / "ml_model.onnx" with open(ml_model_path, "rb") as f: self.__ml_model_data = f.read() if ml_providers: self.providers = ml_providers.split(',') else: self.providers = ["CPUExecutionProvider"] if isinstance(threshold, float): self.threshold = threshold elif isinstance(threshold, ThresholdPreset) and "thresholds" in model_config: self.threshold = model_config["thresholds"][threshold.value] else: self.threshold = 0.5 logger.warning("Use fallback threshold value: %s", self.threshold) char_set = set(model_config["char_set"]) if len(char_set) != len(model_config["char_set"]): logger.warning('Duplicated symbols in "char_set"?') if self.ZERO_CHAR in char_set or self.FAKE_CHAR in char_set: raise ValueError(f'Unacceptable symbols 0x00 or 0x01 in "char_set"={char_set}') self.char_dict = {self.ZERO_CHAR: 0, self.FAKE_CHAR: 1} self.char_dict.update({ char: index for index, char in enumerate(sorted(list(char_set)), start=len(self.char_dict)) }) self.num_classes = len(self.char_dict) self.common_feature_list = [] self.unique_feature_list = [] if logger.isEnabledFor(logging.INFO): config_md5 = hashlib.md5(__ml_config_data).hexdigest() model_md5 = hashlib.md5(self.__ml_model_data).hexdigest() logger.info("Init ML validator with providers: '%s' ; model:'%s' md5:%s ; config:'%s' md5:%s", self.providers, ml_config_path, config_md5, ml_model_path, model_md5) logger.debug(str(model_config)) for feature_definition in model_config["features"]: feature_class = feature_definition["type"] kwargs = feature_definition.get("kwargs", {}) feature_constructor = getattr(features, feature_class, None) if feature_constructor is None: raise ValueError(f"Error while parsing model details. Cannot create feature '{feature_class}'" f" from {feature_definition}") try: feature = feature_constructor(**kwargs) except TypeError: logger.error("Error while parsing model details. Cannot create feature '%s' from %s", feature_class, feature_definition) raise if feature_definition["type"] in ["RuleName"]: self.unique_feature_list.append(feature) else: self.common_feature_list.append(feature) def __reduce__(self): # TypeError: cannot pickle 'onnxruntime.capi.onnxruntime_pybind11_state.InferenceSession' object self.__session = None return super().__reduce__() @property def session(self) -> InferenceSession: """session getter to prevent pickle error""" if not self.__session: self.__session = InferenceSession(self.__ml_model_data, providers=self.providers) if not self.__session: raise RuntimeError("InferenceSession was not initialized!") return self.__session
[docs] def encode(self, text: str, limit: int) -> np.ndarray: """Encodes prepared text to array""" result_array: np.ndarray = np.zeros(shape=(limit, self.num_classes), dtype=np.float32) if text is None: return result_array for i, c in enumerate(text): if i >= limit: break if c in self.char_dict: result_array[i, self.char_dict[c]] = 1.0 else: result_array[i, self.char_dict[MlValidator.FAKE_CHAR]] = 1.0 return result_array
[docs] def encode_line(self, text: str, position: int): """Encodes line with balancing for position""" offset = len(text) - len(text.lstrip()) pos = position - offset stripped = text.strip() if MlValidator.MAX_LEN < len(stripped): stripped = Util.subtext(stripped, pos, ML_HUNK) return self.encode(stripped, MlValidator.MAX_LEN)
[docs] def encode_value(self, text: str) -> np.ndarray: """Encodes line with balancing for position""" stripped = text.strip() return self.encode(stripped[:ML_HUNK], ML_HUNK)
def _call_model(self, line_input: np.ndarray, variable_input: np.ndarray, value_input: np.ndarray, feature_input: np.ndarray) -> np.ndarray: input_feed: Dict[str, np.ndarray] = { "line_input": line_input.astype(np.float32), "variable_input": variable_input.astype(np.float32), "value_input": value_input.astype(np.float32), "feature_input": feature_input.astype(np.float32), } result = self.session.run(output_names=None, input_feed=input_feed) if result and isinstance(result[0], np.ndarray): return result[0] raise RuntimeError(f"Unexpected type {type(result[0])}")
[docs] def extract_common_features(self, candidates: List[Candidate]) -> np.ndarray: """Extract features that are guaranteed to be the same for all candidates on the same line with same value.""" feature_array: np.ndarray = np.array([], dtype=np.float32) # Extract features from credential candidate default_candidate = candidates[0] for feature in self.common_feature_list: new_feature = feature([default_candidate])[0] if not isinstance(new_feature, np.ndarray): new_feature = np.array([new_feature]) feature_array = np.append(feature_array, new_feature) return feature_array
[docs] def extract_unique_features(self, candidates: List[Candidate]) -> np.ndarray: """Extract features that can be different between candidates. Join them with or operator.""" feature_array: np.ndarray = np.array([], dtype=np.int8) default_candidate = candidates[0] for feature in self.unique_feature_list: new_feature = feature([default_candidate])[0] if not isinstance(new_feature, np.ndarray): new_feature = np.array([new_feature]) feature_array = np.append(feature_array, new_feature) for candidate in candidates[1:]: for feature in self.unique_feature_list: new_feature = feature([candidate])[0] if not isinstance(new_feature, np.ndarray): new_feature = np.array([new_feature]) feature_array = feature_array | new_feature return feature_array
[docs] def get_group_features(self, candidates: List[Candidate]) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ `np.newaxis` used to add new dimension if front, so input will be treated as a batch """ # all candidates are from the same line default_candidate = candidates[0] line_input = self.encode_line(default_candidate.line_data_list[0].line, default_candidate.line_data_list[0].value_start)[np.newaxis] variable = '' value = '' for candidate in candidates: if not variable and candidate.line_data_list[0].variable: variable = candidate.line_data_list[0].variable if not value and candidate.line_data_list[0].value: value = candidate.line_data_list[0].value if variable and value: break variable_input = self.encode_value(variable)[np.newaxis] value_input = self.encode_value(value)[np.newaxis] feature_array = self.extract_features(candidates) return line_input, variable_input, value_input, feature_array
[docs] def extract_features(self, candidates: List[Candidate]) -> np.ndarray: """extracts common and unique features from list of candidates""" common_features = self.extract_common_features(candidates) unique_features = self.extract_unique_features(candidates) feature_hstack = np.hstack([common_features, unique_features]) feature_array = np.array([feature_hstack]) return feature_array
def _batch_call_model(self, line_input_list, variable_input_list, value_input_list, features_list) -> np.ndarray: """auxiliary method to invoke twice""" line_inputs_vstack = np.vstack(line_input_list) variable_inputs_vstack = np.vstack(variable_input_list) value_inputs_vstack = np.vstack(value_input_list) feature_array_vstack = np.vstack(features_list) result_call = self._call_model(line_inputs_vstack, variable_inputs_vstack, value_inputs_vstack, feature_array_vstack) result = result_call[:, 0] return result
[docs] def validate_groups(self, group_list: List[Tuple[CandidateKey, List[Candidate]]], batch_size: int) -> Tuple[np.ndarray, np.ndarray]: """Use ml model on list of candidate groups. Args: group_list: List of tuples (value, group) batch_size: ML model batch Return: Boolean numpy array with decision based on the threshold, and numpy array with probability predicted by the model """ line_input_list = [] variable_input_list = [] value_input_list = [] features_list = [] probability: np.ndarray = np.zeros(len(group_list), dtype=np.float32) head = tail = 0 for _group_key, candidates in group_list: line_input, variable_input, value_input, feature_array = self.get_group_features(candidates) line_input_list.append(line_input) variable_input_list.append(variable_input) value_input_list.append(value_input) features_list.append(feature_array) tail += 1 if 0 == tail % batch_size: # use the approach to reduce memory consumption for huge candidates list probability[head:tail] = self._batch_call_model(line_input_list, variable_input_list, value_input_list, features_list) head = tail line_input_list.clear() variable_input_list.clear() value_input_list.clear() features_list.clear() if head != tail: probability[head:tail] = self._batch_call_model(line_input_list, variable_input_list, value_input_list, features_list) is_cred = self.threshold <= probability if logger.isEnabledFor(logging.DEBUG): for i, decision in enumerate(is_cred): logger.debug("ML decision: %s with prediction: %s for value: %s", decision, probability[i], group_list[i][0]) # apply cast to float to avoid json export issue return is_cred, probability.astype(float)