import ast
import base64
import contextlib
import json
import logging
import math
import os
import random
import re
import string
import warnings
from pathlib import Path
from typing import Any, Dict, List, Tuple, Optional, Union
import numpy as np
import yaml
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric.dh import DHPrivateKey, DHPublicKey
from cryptography.hazmat.primitives.asymmetric.dsa import DSAPrivateKey, DSAPublicKey
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey, EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey, Ed448PublicKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PublicKey, X25519PrivateKey
from cryptography.hazmat.primitives.asymmetric.x448 import X448PublicKey, X448PrivateKey
from cryptography.hazmat.primitives.serialization import load_der_private_key
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
from lxml import etree
from credsweeper.common.constants import AVAILABLE_ENCODINGS, \
DEFAULT_ENCODING, LATIN_1, CHUNK_SIZE, MAX_LINE_LENGTH, CHUNK_STEP_SIZE, ASCII, UTF_16_LE, UTF_16_BE
logger = logging.getLogger(__name__)
[docs]
class Util:
"""Class that contains different useful methods."""
[docs]
@staticmethod
def get_extension(file_path: str, lower=True) -> str:
"""Return extension of file in lower case by default e.g.: '.txt', '.JPG'"""
_, extension = os.path.splitext(str(file_path))
return extension.lower() if lower else extension
[docs]
@staticmethod
def get_regex_combine_or(re_strs: List[str]) -> str:
"""Routine combination for regex 'or'"""
result = "(?:"
for elem in re_strs:
result += elem + "|"
if result[-1] == "|":
result = result[:-1]
result += ")"
return result
[docs]
@staticmethod
def get_shannon_entropy(data: Union[str, bytes]) -> float:
"""Borrowed from http://blog.dkbza.org/2007/05/scanning-data-for-entropy-anomalies.html."""
if not data:
return 0.0
size = len(data)
_uniq, counts = np.unique(list(data), return_counts=True)
probabilities = counts / size
entropy = -float(np.sum(probabilities * np.log2(probabilities)))
return entropy
# Precalculated data for speedup
MIN_DATA_ENTROPY: Dict[int, float] = {
16: 1.66973671780348,
20: 2.07723544540831,
32: 3.25392803184602,
40: 3.64853567064867,
64: 4.57756933688035,
384: 7.39,
512: 7.55,
}
[docs]
@staticmethod
def get_min_data_entropy(x: int) -> float:
"""Returns minimal entropy for size of random data. Precalculated data is applied for speedup"""
if x in Util.MIN_DATA_ENTROPY:
y = Util.MIN_DATA_ENTROPY[x]
elif 8 < x < 64:
# approximated for range 12 - 64
_x = x - 8
y = ((0.000016617804 * _x - 0.002695077) * _x + 0.170393) * _x + 0.4
elif 64 < x < 384:
# logarithm base 2 - slow, but precise
_x = x - 8
y = 1.095884 * math.log2(_x) - 1.90156
elif 384 < x < 512:
# solved for 384 - 512
y = -0.11215851 * math.log2(x)**2 + 2.34303484 * math.log2(x) - 4.4466237
else:
# less or equal to 8 bytes might have 0 entropy
y = 0
return y
[docs]
@staticmethod
def is_ascii_entropy_validate(data: bytes) -> bool:
"""
Tests small data sequence (<256) for data randomness by testing for ascii and shannon entropy
Returns True when data is an ASCII symbols or have small entropy
"""
if not data:
return True
data_len = len(data)
if 9 > data_len:
# even random data may have 0 entropy for length of 8 bytes and less
return True
entropy = 0.
cells = [int(0)] * 256
ascii_test = True
# "basket" sorting approach
for x in data:
cells[x] += 1
if ascii_test and 0b10000000 & x:
ascii_test = False
if ascii_test:
# only ascii symbols found
return True
left = 0.
step = 256.0 / data_len
right = left + step
while left < 256:
cell_sum = 0
i = int(left)
r = int(right)
while i < r and i < 256:
cell_sum += cells[i]
i += 1
p_x = float(cell_sum) / data_len
if p_x > 0:
entropy += -p_x * math.log2(p_x)
left = right
right += step
min_entropy = Util.get_min_data_entropy(data_len)
return entropy < min_entropy
[docs]
@staticmethod
def is_binary(data: Union[bytes, bytearray]) -> bool:
"""
Returns True when two zeroes sequence is found in begin of data.
The sequence never exists in text format (UTF-8, UTF-16). UTF-32 is not supported.
"""
if 0 <= data.find(b"\0\0", 0, MAX_LINE_LENGTH):
return True
return False
NOT_LATIN1_PRINTABLE_SET = set(range(0, 256)) \
.difference(set(x for x in string.printable.encode(ASCII))) \
.difference({0x1B}) \
.difference(set(x for x in range(0xA0, 0x100)))
[docs]
@staticmethod
def is_latin1(data: Union[bytes, bytearray]) -> bool:
"""Returns True when data looks like LATIN-1 for first MAX_LINE_LENGTH bytes."""
result = False
if data:
non_latin1_cnt = sum(1 for x in data[:MAX_LINE_LENGTH] if x in Util.NOT_LATIN1_PRINTABLE_SET)
# experiment for 255217 binary files shown avg = 0.268264 ± 0.168767, so let choose minimal
chunk_len = min(MAX_LINE_LENGTH, len(data))
result = bool(0.1 > non_latin1_cnt / chunk_len)
return result
[docs]
@staticmethod
def read_file(path: Union[str, Path], encodings: Optional[List[str]] = None) -> List[str]:
"""Read the file content using different encodings.
Try to read the contents of the file according to the list of encodings "encodings" as soon as reading
occurs without any exceptions, the data is returned in the current encoding
Args:
path: path to file
encodings: supported encodings
Return:
list of file rows in a suitable encoding from "encodings",
if none of the encodings match, an empty list will be returned
"""
if data := Util.read_data(path):
return Util.decode_bytes(data, encodings)
return []
[docs]
@staticmethod
def decode_text(content: Optional[bytes], encodings: Optional[List[str]] = None) -> Optional[str]:
"""Decode content using different encodings.
Try to decode bytes according to the list of encodings "encodings"
occurs without any exceptions. UTF-16 requires BOM
Args:
content: raw data that might be text
encodings: supported encodings
Return:
Decoded text in str for any suitable encoding
or None when binary data detected
"""
if content is None:
return None
binary_suggest = False
if encodings:
# use exactly defined encodings
_encodings = encodings
elif content.startswith(b"\xFF\xFE") or 1 < len(content) and 0 == content[1]:
_encodings = [UTF_16_LE]
elif content.startswith(b"\xFE\xFF") or content.startswith(b'\x00'):
_encodings = [UTF_16_BE]
else:
_encodings = AVAILABLE_ENCODINGS
for encoding in _encodings:
try:
if binary_suggest and LATIN_1 == encoding and (Util.is_binary(content) or not Util.is_latin1(content)):
# LATIN_1 may convert data (bytes in range 0x80:0xFF are transformed)
break
text = content.decode(encoding=encoding, errors="strict")
if content != text.encode(encoding=encoding, errors="strict"):
# the refurbish test helps to detect a real encoding
binary_suggest = True
continue
# the case decoding is good
if UTF_16_LE == encoding or UTF_16_BE == encoding:
return text.lstrip('\uFEFF')
return text
except UnicodeError:
binary_suggest = True
logger.debug("UnicodeError: Can't decode content as %s.", encoding)
except Exception as exc:
logger.error("Unexpected Error: Can't read content as %s. Error message: %s", encoding, exc)
return None
[docs]
@staticmethod
def split_text(text: str) -> List[str]:
"""Splits a text into lines, handling all common line endings (e.g., LF, CRLF, CR)."""
return text.replace("\r\n", '\n').replace('\r', '\n').split('\n')
[docs]
@staticmethod
def decode_bytes(content: Optional[bytes], encodings: Optional[List[str]] = None) -> List[str]:
"""Decode content using different encodings.
Try to decode bytes according to the list of encodings "encodings"
occurs without any exceptions. UTF-16 requires BOM
Args:
content: raw data that might be text
encodings: supported encodings
Return:
list of file rows in a suitable encoding from "encodings",
if none of the encodings match, an empty list will be returned
Also empty list will be returned after last encoding and 0 symbol is present in lines not at end
"""
if text := Util.decode_text(content, encodings):
return Util.split_text(text)
return []
[docs]
@staticmethod
def get_asn1_size(data: Union[bytes, bytearray]) -> int:
"""Only sequence type 0x30 and size correctness are checked
Returns size of ASN1 data over 128 bytes or 0 if no interested data
"""
if isinstance(data, (bytes, bytearray)) and 2 <= len(data) and 0x30 == data[0]:
# https://www.oss.com/asn1/resources/asn1-made-simple/asn1-quick-reference/basic-encoding-rules.html#Lengths
length = data[1]
if 0x80 == length:
if data.endswith(b"\x00\x00"):
# assume, all data are ASN1 of various size
return len(data)
# else - skip the case where the ASN1 size is smaller than the actual data
elif 0x80 < length:
byte_len = 0x7F & length
len_limit = 2 + byte_len
if 4 >= byte_len and len(data) >= len_limit:
length = 0
for i in range(2, len_limit):
length <<= 8
length |= data[i]
if len(data) >= length + len_limit:
return length + len_limit
# else - unsupported huge size
else:
# length is less than 0x80
if len(data) >= length + 2:
return length + 2
# fallback - unsupported
return 0
[docs]
@staticmethod
def read_data(path: Union[str, Path]) -> Optional[bytes]:
"""Read the file bytes as is.
Try to read the data of the file.
Args:
path: path to file
Return:
list of file rows in a suitable encoding from "encodings",
if none of the encodings match, an empty list will be returned
"""
try:
with open(path, "rb") as file:
return file.read()
except Exception as exc:
logger.error("Unexpected Error: Can not read '%s'. Error message: '%s'", path, exc)
return None
[docs]
@staticmethod
def get_xml_from_lines(xml_lines: List[str]) -> Tuple[Optional[List[str]], Optional[List[int]]]:
"""Parse xml data from list of string and return List of str.
Args:
xml_lines: list of lines of xml data
Return:
List of formatted string(f"{root.tag} : {root.text}")
Raises:
xml exception
"""
lines = []
line_nums = []
tree = etree.fromstringlist(xml_lines)
for element in tree.iter():
tag = Util.extract_element_data(element, "tag")
text = Util.extract_element_data(element, "text")
lines.append(f"{tag} : {text}")
line_nums.append(element.sourceline)
return lines, line_nums
[docs]
@staticmethod
def json_load(file_path: Union[str, Path], encoding=DEFAULT_ENCODING) -> Any:
"""Load dictionary from JSON file"""
try:
with open(file_path, "r", encoding=encoding) as f:
return json.load(f)
except Exception as exc:
logging.error("Failed to read: %s %s", file_path, exc)
return None
[docs]
@staticmethod
def json_dump(obj: Any, file_path: Union[str, Path], encoding=DEFAULT_ENCODING, indent=4) -> None:
"""Write dictionary to JSON file"""
try:
with open(file_path, "w", encoding=encoding) as f:
json.dump(obj, f, indent=indent)
except Exception as exc:
logging.error("Failed to write: %s %s", file_path, exc)
[docs]
@staticmethod
def yaml_load(file_path: Union[str, Path], encoding=DEFAULT_ENCODING) -> Any:
"""Load dictionary from YAML file"""
try:
with open(file_path, "r", encoding=encoding) as f:
return yaml.safe_load(f)
except Exception as exc:
logger.error("Failed to read %s %s", file_path, exc)
return None
[docs]
@staticmethod
def yaml_dump(obj: Any, file_path: Union[str, Path], encoding=DEFAULT_ENCODING) -> None:
"""Write dictionary to YAML file"""
try:
with open(file_path, "w", encoding=encoding) as f:
yaml.dump(obj, f)
except Exception as exc:
logging.error("Failed to write: %s %s", file_path, exc)
[docs]
@staticmethod
def parse_python(source: str) -> List[Any]:
"""Parse Python source and back to remove strings merge and line wrap"""
with warnings.catch_warnings(record=True):
warnings.simplefilter("error", SyntaxWarning)
src = ast.parse(source)
result = ast.unparse(src).splitlines()
return result
PEM_CLEANING_PATTERN = re.compile(r"\\[tnrvf]")
WHITESPACE_TRANS_TABLE = str.maketrans('', '', string.whitespace)
[docs]
@staticmethod
def decode_base64(text: str, padding_safe: bool = False, urlsafe_detect=False) -> bytes:
"""decode text to bytes with / without padding detect and urlsafe symbols"""
value = text.translate(Util.WHITESPACE_TRANS_TABLE)
if padding_safe:
# workaround for binascii.Error: Excess padding not allowed
value = value.rstrip('=')
pad_num = 0x3 & len(value)
if pad_num:
value += '=' * (4 - pad_num)
if urlsafe_detect and ('-' in value or '_' in value):
decoded = base64.b64decode(value, altchars=b"-_", validate=True)
else:
decoded = base64.b64decode(value, validate=True)
return decoded
[docs]
@staticmethod
def load_pk(data: bytes, password: Optional[bytes] = None) -> Optional[PrivateKeyTypes]:
"""Try to load private key from PKCS1, PKCS8 and PKCS12 formats"""
with contextlib.suppress(Exception):
# PKCS1, PKCS8 probes
private_key = load_der_private_key(data, password)
return private_key
with contextlib.suppress(Exception):
# PKCS12 probe
private_key, _certificate, _additional_certificates = load_key_and_certificates(data, password)
return private_key
return None
RANDOM_DATA = random.randbytes(20)
[docs]
@staticmethod
def check_pk(pkey: PrivateKeyTypes) -> bool:
"""Check private key with encrypt-decrypt random data"""
if not pkey or isinstance(pkey, (EllipticCurvePublicKey, DSAPublicKey, Ed448PublicKey, Ed25519PublicKey,
DHPublicKey, X448PublicKey, X25519PublicKey)):
# These aren't the keys we're looking for
return False
if isinstance(pkey, (EllipticCurvePrivateKey, DSAPrivateKey, Ed448PrivateKey, Ed25519PrivateKey, DHPrivateKey,
X448PrivateKey, X25519PrivateKey)):
# One does not simply perform check the keys
return True
if isinstance(pkey, RSAPrivateKey):
pd = padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None)
ciphertext = pkey.public_key().encrypt(Util.RANDOM_DATA, padding=pd)
refurb = pkey.decrypt(ciphertext, padding=pd)
return bool(refurb == Util.RANDOM_DATA)
logger.warning("Unknown private key type: %s", type(pkey))
return False
[docs]
@staticmethod
def get_chunks(line_len: int) -> List[Tuple[int, int]]:
"""Returns chunks positions for given line length"""
# line length is over MAX_LINE_LENGTH already
chunks = [(0, CHUNK_SIZE)]
# case for oversize line
next_offset = CHUNK_STEP_SIZE
while line_len > next_offset:
# the target is too long for single "finditer" - it will be scanned by chunks
if line_len > next_offset + MAX_LINE_LENGTH:
# the chunk is not the before last
chunks.append((next_offset, next_offset + CHUNK_SIZE))
next_offset += CHUNK_STEP_SIZE
else:
# the tail of line is between CHUNK_SIZE and MAX_LINE_LENGTH
chunks.append((next_offset, line_len))
break
return chunks
[docs]
@staticmethod
def subtext(text: str, pos: int, hunk_size: int) -> str:
"""cut text symmetrically for given position or use remained quota to be fitted in 2x hunk_size"""
# cut trailed whitespaces to obtain more informative data
text = text.rstrip()
if hunk_size <= pos:
left_quota = 0
left_pos = pos - hunk_size
else:
left_quota = hunk_size - pos
left_pos = 0
# skip leading whitespaces in result string
for i in range(left_pos, pos):
if text[i] in string.whitespace:
left_quota += 1
left_pos += 1
else:
break
right_remain = len(text) - pos
if hunk_size <= right_remain:
right_quota = 0
right_pos = pos + hunk_size + left_quota
else:
right_quota = hunk_size - right_remain
right_pos = pos + hunk_size + left_quota
if len(text) < right_pos:
right_pos = len(text)
if 0 < left_pos:
left_pos -= right_quota
if 0 > left_pos:
left_pos = 0
return text[left_pos:right_pos].rstrip()
[docs]
@staticmethod
def get_excel_column_name(column_index: int) -> str:
"""Converts index based column position into Excel style column name"""
name = ''
if isinstance(column_index, int):
while 0 <= column_index:
column_index, remain = divmod(column_index, 26)
name = f"{chr(ord('A') + remain)}{name}"
column_index -= 1
return name