Source code for credsweeper.utils.pem_key_detector

import contextlib
import logging
import re
import string
from typing import List

from credsweeper.common.constants import PEM_BEGIN_PATTERN, PEM_END_PATTERN, Chars, MAX_LINE_LENGTH
from credsweeper.config.config import Config
from credsweeper.credentials.line_data import LineData
from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.utils.util import Util

logger = logging.getLogger(__name__)


[docs] class PemKeyDetector: """Class to detect PEM PRIVATE keys only""" BASE64_CHARS_SET = set(Chars.BASE64STDPAD_CHARS.value) RE_BASE64_CHARS = re.compile(fr"[{re.escape(Chars.BASE64STDPAD_CHARS.value)}]+") ENTROPY_LIMIT_BASE64 = 4.5 # the limit is huge with possible prefixes and escaping MAX_PEM_LENGTH = 4 * MAX_LINE_LENGTH IGNORE_STARTS = [PEM_BEGIN_PATTERN, "Proc-Type", "Version", "DEK-Info"] WRAP_CHARACTERS = "\\'\"`;,[]#*!" REMOVE_CHARACTERS = string.whitespace + WRAP_CHARACTERS # last line contains 4 symbols, at least RE_PEM_BEGIN = re.compile(r"(?P<value>" + PEM_BEGIN_PATTERN + r"(?![^-]{1,80}ENCRYPTED)[^-]{0,80}PRIVATE[^-]{1,80}KEY[^-]{0,80}-----" r"(.{1,8000}" + PEM_END_PATTERN + r"[^-]{1,80}KEY[^-]{0,80}-----)?)") RE_PEM_VALUE = re.compile(fr"(?P<value>.{{0,{MAX_PEM_LENGTH}}})") def __init__(self, config: Config): self.__config = config self._barrier_pos: int = -2 self._barrier_cut: int = -2 self._barrier: str = ''
[docs] def cut_barrier(self, line: str) -> str: """Cut off barrier if detected""" if self._barrier and 0 <= self._barrier_pos < self._barrier_cut < len(line): if line[self._barrier_pos] == self._barrier: return line[self._barrier_cut:] self._barrier = '' self._barrier_pos = self._barrier_cut = -1 return line
[docs] def set_barrier(self, line: str, start=0, end=MAX_LINE_LENGTH): """Detects barrier with offset of RE_PEM_BEGIN""" self._barrier = '' self._barrier_cut = line.find(PEM_END_PATTERN, start, end) self._barrier_pos = self._barrier_cut - 1 if 0 <= self._barrier_pos < self._barrier_cut < len(line): barrier = line[self._barrier_pos] if barrier not in PemKeyDetector.BASE64_CHARS_SET: self._barrier = barrier
[docs] def detect_pem_key(self, first_line: LineData, target: AnalysisTarget) -> List[LineData]: """Detects PEM key in single line and with iterative for next lines according https://www.rfc-editor.org/rfc/rfc7468 Args: first_line: detected -----BEGIN from rule pattern target: Analysis target Return: List of LineData with found PEM """ line_data_list: List[LineData] = [] key_data_list: List[str] = [] # escaped key in one line with prefixes pem_end_limit = min(target.line_len, first_line.value_start + PemKeyDetector.MAX_PEM_LENGTH) first_line_end_pattern_start = target.line.find(PEM_END_PATTERN, first_line.value_start, pem_end_limit) first_line_end_pattern_end = ( # target.line.find("-----", first_line_end_pattern_start + 5, first_line_end_pattern_start + 80) # if 0 <= first_line_end_pattern_start else -2) if first_line.value_start < first_line_end_pattern_start < first_line_end_pattern_end: # the whole PEM in single line pem_text = target.line[first_line.value_start:first_line_end_pattern_end + 5] first_line.value = pem_text first_line.value_end = first_line.value_start + len(pem_text) line_data_list.append(first_line) else: line_data_list.append(first_line) pem_text = first_line.line[first_line.value_start:first_line.value_start + PemKeyDetector.MAX_PEM_LENGTH] # perhaps, in next lines start_pos = max(0, target.line_pos) + 1 end_pos = min(start_pos + 200, target.lines_len) for line_pos in range(start_pos, end_pos): target_line = target.lines[line_pos] end_pattern_start = target_line.find(PEM_END_PATTERN, 0, PemKeyDetector.MAX_PEM_LENGTH) end_pattern_end = (5 + target_line.find("-----", end_pattern_start + 5, end_pattern_start + 80) if 0 <= end_pattern_start else -2) if 0 <= end_pattern_start < end_pattern_end: pem_line = target_line[:end_pattern_end] else: pem_line = target_line[:PemKeyDetector.MAX_PEM_LENGTH] next_line = LineData(self.__config, target_line, line_pos, target.line_nums[line_pos], target.file_path, target.file_type, target.info, PemKeyDetector.RE_PEM_VALUE) line_data_list.append(next_line) pem_text += f"\n{pem_line}" if PEM_END_PATTERN in pem_line: break if PemKeyDetector.MAX_PEM_LENGTH < len(pem_text): logger.warning("PEM text oversize") return [] else: logger.warning("PEM end not found %s", target.descriptor) return [] while "\\\\" in pem_text: # reduce JSON escaping sequences of backslash pem_text = pem_text.replace("\\\\", '\\') # replace escaped line ends with real and process them - PEM does not contain '\' sign pem_text = pem_text.replace("\\r\\n", '\n').replace("\\r", '\n').replace("\\n", '\n').replace("\\t", '\t') pem_lines = pem_text.splitlines() self.set_barrier(pem_lines[-1]) for subline in pem_lines: if PemKeyDetector.is_leading_config_line(subline): continue _subline = self.cut_barrier(subline) if sanitized_line := PemKeyDetector.sanitize_line(_subline): if PEM_END_PATTERN in sanitized_line: return PemKeyDetector.finalize(line_data_list, key_data_list, sanitized_line) # the end is not reached - sanitize the data # PEM key line should not contain spaces or . (and especially not ...) if not PemKeyDetector.RE_BASE64_CHARS.fullmatch(sanitized_line): return [] key_data_list.append(sanitized_line) return []
[docs] @staticmethod def finalize(line_data_list: List[LineData], key_data_list: List[str], last_line: str) -> List[LineData]: """Checks collected key_data according the key type""" if len(key_data_list) < len(line_data_list): PemKeyDetector.sanitize_line_data_list(line_data_list, key_data_list, last_line) key_data = ''.join(key_data_list) if "PGP" in line_data_list[0].value: # Check if entropy is high enough for base64 set with padding sign entropy = Util.get_shannon_entropy(key_data) if PemKeyDetector.ENTROPY_LIMIT_BASE64 <= entropy: return line_data_list logger.debug("Filtered with entropy %f '%s'", entropy, key_data) if "OPENSSH" in line_data_list[0].value: # Check whether the key is encrypted with contextlib.suppress(Exception): decoded = Util.decode_base64(key_data, urlsafe_detect=True) if 32 < len(decoded) and b"bcrypt" not in decoded: # 256 bits is the minimal size of Ed25519 keys # all OK - the key is not encrypted in this top level return line_data_list logger.debug("Filtered with size or bcrypt '%s'", key_data) else: with contextlib.suppress(Exception): if decoded := Util.decode_base64(key_data, padding_safe=True, urlsafe_detect=True): if len(decoded) == Util.get_asn1_size(decoded): # all OK - the key is not encrypted in this top level return line_data_list logger.debug("Filtered with non asn1 '%s'", key_data) return []
[docs] @staticmethod def sanitize_line_data_list(line_data_list: List[LineData], key_data_list: List[str], last_line: str): """Sanitize line_data_list to keep only valuable values""" for value in key_data_list: if 64 <= len(value): # normal value length should not have a collision for line_data in line_data_list: if value == line_data.value: # plain case - no sanitize necessary break value_start = line_data.value.find(value) if 0 <= value_start: line_data.value = value line_data.value_start = value_start line_data.value_end = value_start + len(value) break else: # end of pem may be short and have collisions in long lines value_pattern = re.compile(fr".*[^0-9A-Za-z+/=]?({re.escape(value)})[^0-9A-Za-z+/=]?.*") for line_data in line_data_list: if value == line_data.value: # plain case - no sanitize necessary break if value_match := value_pattern.fullmatch(line_data.value): line_data.value = value_match.group(1) line_data.value_start, line_data.value_end = value_match.span(1) break if last_line.startswith(PEM_END_PATTERN) and last_line.endswith("-----"): last_line_data = line_data_list[-1] last_value_start = last_line_data.value.find(last_line, 0, PemKeyDetector.MAX_PEM_LENGTH) if 0 <= last_line_data.value_start <= last_value_start: # left barrier was sanitized last_line_data.value = last_line last_line_data.value_start = last_value_start last_line_data.value_end = last_value_start + len(last_line)
[docs] @staticmethod def sanitize_line(line: str, recurse_level: int = 5) -> str: """Remove common symbols that can surround PEM keys inside code. Examples:: `# ZZAWarrA1` `* ZZAWarrA1` ` "ZZAWarrA1\\n" + ` Args: line: Line to be cleaned recurse_level: to avoid infinite loop in case when removed symbol inside base64 encoded Return: line with special characters removed from both ends """ recurse_level -= 1 if 0 > recurse_level: return line # Note that this strip would remove `\n` but not `\\n` line = line.strip(string.whitespace) while line.startswith(("// ", "//\t")): line = line[3:] while line.startswith(("/// ", "///\t")): line = line[4:] while line.startswith("/*"): line = line[2:] while line.endswith("*/"): line = line[:-2] while line.endswith("\\"): # line carry in many languages line = line[:-1] # remove concatenation carefully only when it is not part of base64 if line.startswith('+') and 1 < len(line) and line[1] not in PemKeyDetector.BASE64_CHARS_SET: line = line[1:] if line.endswith('+') and 2 < len(line) and line[-2] not in PemKeyDetector.BASE64_CHARS_SET: line = line[:-1] line = line.strip(PemKeyDetector.REMOVE_CHARACTERS) # check whether new iteration requires for x in PemKeyDetector.WRAP_CHARACTERS: if x in line: return PemKeyDetector.sanitize_line(line, recurse_level=recurse_level) return line
[docs] @staticmethod def is_leading_config_line(line: str) -> bool: """Remove non-key lines from the beginning of a list. Example lines with non-key leading lines: .. code-block:: text Proc-Type: 4,ENCRYPTED DEK-Info: DEK-Info: AES-256-CBC,2AA219GG746F88F6DDA0D852A0FD3211 ZZAWarrA1... Args: line: Line to be checked Return: True if the line is not a part of encoded data but leading config """ if not line: return True for ignore_string in PemKeyDetector.IGNORE_STARTS: if ignore_string in line: return True return False