Source code for credsweeper.deep_scanner.protobuf_scanner

import logging
from abc import ABC
from typing import List, Optional, Generator, Tuple

from credsweeper.common.constants import MIN_DATA_LEN
from credsweeper.credentials.candidate import Candidate
from credsweeper.deep_scanner.abstract_scanner import AbstractScanner
from credsweeper.file_handler.data_content_provider import DataContentProvider

logger = logging.getLogger(__name__)


[docs] class ProtobufScanner(AbstractScanner, ABC): """Implements protobuf (ar) scanning"""
[docs] @staticmethod def read_varint(data: bytes | bytearray, offset: int) -> Tuple[int, int]: """Reads varint from offset up to 64 bit values (10 bytes) Returns: used bytes (-1 when overflow), the value """ data_len = len(data) counter = value = shift = 0 for i in range(offset, offset + 10): if i < data_len and 10 > counter: counter += 1 d = data[i] if 0x7F < d: value |= (0x7F & d) << shift shift += 7 continue value |= d << shift return counter, value break return -1, 0
[docs] @staticmethod def read_wire(data: bytes | bytearray, offset: int) -> Tuple[int, int]: """Reads wire to detect sizes Returns: size of wire type (with primitives types), size of data (length-delimited) """ n, s = ProtobufScanner.read_varint(data, offset) if 0 < n: t = 0x3 & s if 0 == t: # varint _n, _ = ProtobufScanner.read_varint(data, offset + n) if 0 < _n: return n + _n, 0 elif 1 == t: # 64 bit fixed return n + 8, 0 elif 2 == t: # length-delimited _n, _s = ProtobufScanner.read_varint(data, offset + n) if 0 < _n: return n + _n, _s elif 3 == t or 4 == t: # deprecated return n, 0 elif 5 == t: # 32 bit fixed return n + 4, 0 return -1, 0
[docs] @staticmethod def match_protobuf(data: bytes | bytearray, offset: int, limit: int) -> bool: """Process data from start to end as simple protobuf chunk Returns: True when whole chunk was utilized with protobuf structure """ while offset < limit: n, s = ProtobufScanner.read_wire(data, offset) if 0 < n: offset += n + s else: break else: return bool(offset == limit) return False
[docs] @staticmethod def match(data: bytes | bytearray) -> bool: """Simple structure check for whole data""" if data: return ProtobufScanner.match_protobuf(data, 0, len(data)) return False
[docs] @staticmethod def walk_protobuf(data: bytes, offset: int, limit: int) -> Generator[Tuple[int, bytes], None, None]: """Processes sequence of protobuf and yields offset and data recursive""" while offset < limit: n, s = ProtobufScanner.read_wire(data, offset) if 0 > n: raise ValueError(f"Wrong data at 0x{offset:x}") offset += n if MIN_DATA_LEN < s: # yield valuable bytes only if ProtobufScanner.match_protobuf(data, offset, offset + s): yield from ProtobufScanner.walk_protobuf(data, offset, offset + s) else: yield offset, data[offset:offset + s] offset += s
[docs] def data_scan( self, # data_provider: DataContentProvider, # depth: int, # recursive_limit_size: int) -> Optional[List[Candidate]]: """Extracts data file from protobuf payload and launches data_scan""" try: candidates: List[Candidate] = [] for offset, data in ProtobufScanner.walk_protobuf(data_provider.data, 0, len(data_provider.data)): provider = DataContentProvider(data=data, file_path=data_provider.file_path, file_type=data_provider.file_type, info=f"{data_provider.info}|PROTO:0x{offset:x}") protobuf_candidates = self.recursive_scan(provider, depth, recursive_limit_size) candidates.extend(protobuf_candidates) return candidates except Exception as exc: logger.warning(exc) return None