import logging
import zlib
from abc import ABC
from typing import List, Optional
from credsweeper.credentials.candidate import Candidate
from credsweeper.deep_scanner.abstract_scanner import AbstractScanner
from credsweeper.file_handler.data_content_provider import DataContentProvider
logger = logging.getLogger(__name__)
[docs]
class ZlibScanner(AbstractScanner, ABC):
"""Implements zlib data inflate and scan"""
[docs]
@staticmethod
def match(data: bytes | bytearray) -> bool:
"""Returns True if data looks like deflated data with zlib"""
if 6 < len(data):
cmf = data[0]
flg = data[1]
if 8 == (0xF & cmf) and 7 >= (cmf >> 4) and 0 == ((cmf << 8) | flg) % 31 and 0 == (0x20 & flg):
if 0x3 != (data[2] >> 1):
# the last check of impossible bits
return True
return False
[docs]
@staticmethod
def decompress(limit: int, data: bytes) -> bytes:
"""Returns decompressed data by chunks with a limit or exception in unusual cases"""
zlib_obj = zlib.decompressobj()
result = zlib_obj.decompress(data, max_length=limit)
if zlib_obj.unconsumed_tail:
raise ValueError(f"Limit exceeds for {len(zlib_obj.unconsumed_tail)}")
if not zlib_obj.eof:
raise ValueError("Truncated zlib stream")
if zlib_obj.unused_data:
raise ValueError(f"Unused data {len(zlib_obj.unused_data)}")
return result
[docs]
def data_scan(
self, #
data_provider: DataContentProvider, #
depth: int, #
recursive_limit_size: int) -> Optional[List[Candidate]]:
"""Inflate data from zlib compressed and launches data_scan"""
try:
decompressed = ZlibScanner.decompress(recursive_limit_size, data_provider.data)
zlib_content_provider = DataContentProvider(data=decompressed,
file_path=data_provider.file_path,
file_type=data_provider.file_type,
info=f"{data_provider.info}|ZLIB:{len(decompressed)}")
zlib_candidates = self.recursive_scan(zlib_content_provider, depth, recursive_limit_size)
return zlib_candidates
except Exception as zlib_exc:
logger.warning("%s:%s", data_provider.file_path, zlib_exc)
return None