Skip to content

File Hashers

dorsal.file.utils.quick_hasher.QuickHasher

Generate a 'quick hash' for large files by sampling content chunks.

  • Designed for speed on large files.
  • Provides a deterministic hash for fast lookups.
  • Not cryptographically collision-resistant like full-file hashes.
  • Aims for strong probabilistic uniqueness.
  • Number of chunks sampled varies with file size.
  • Sampling is deterministic, seeded by file size (using modulo operation), ensuring the same file (by size and content at sampled locations) will always produce the same QuickHash.

hash

hash(
    file_path,
    file_size,
    raise_on_filesize_error=False,
    follow_symlinks=True,
)

Generate a 'quick hash' by sampling file content.

Parameters:

Name Type Description Default
file_path str

Absolute path to the file.

required
file_size int

File size in bytes.

required
raise_on_filesize_error bool

If True, raise ValueError if file size is outside permitted range. Default False (returns None).

False

Returns:

Type Description
str | None

Hexadecimal string of QuickHash if successful and permitted.

str | None

None if size out of range and raise_on_filesize_error is False.

Raises:

Type Description
OSError

For file access errors (e.g., FileNotFoundError, PermissionError).

ValueError

If raise_on_filesize_error is True and file size is out of range, or if internal configuration (e.g. chunk_size) is invalid. (Specific subtypes like QuickHashFileSizeError or QuickHashConfigurationError may be raised).

QuickHashFileInstabilityError

If the file changes state during hashing.

Source code in venv/lib/python3.12/site-packages/dorsal/file/utils/quick_hasher.py
def hash(
    self, file_path: str, file_size: int, raise_on_filesize_error: bool = False, follow_symlinks: bool = True
) -> str | None:
    """
    Generate a 'quick hash' by sampling file content.

    Args:
        file_path: Absolute path to the file.
        file_size: File size in bytes.
        raise_on_filesize_error: If True, raise ValueError if file size
                                 is outside permitted range. Default False (returns None).

    Returns:
        Hexadecimal string of QuickHash if successful and permitted.
        None if size out of range and `raise_on_filesize_error` is False.

    Raises:
        OSError: For file access errors (e.g., FileNotFoundError, PermissionError).
        ValueError: If `raise_on_filesize_error` is True and file size is out of range,
                    or if internal configuration (e.g. chunk_size) is invalid.
                    (Specific subtypes like QuickHashFileSizeError or QuickHashConfigurationError may be raised).
        QuickHashFileInstabilityError: If the file changes state during hashing.
    """
    logger.debug("Attempting to generate QuickHash for: %s", file_path)

    if not follow_symlinks and os.path.islink(file_path):
        logger.debug("QuickHash not generated for file '%s': file_path is a symbolic link.")
        return None

    if not self._check_permitted_filesize(
        file_path=file_path,
        file_size=file_size,
        raise_on_error=raise_on_filesize_error,
    ):
        return None

    hasher_instance: "_Hash" = self.hasher_constructor()
    total_chunks_in_file = self._get_total_chunks(file_size)
    num_chunks_to_sample = self._get_chunk_count(file_size)

    logger.debug(
        "File '%s' (size %d bytes): Total %dMiB-chunks: %d, Chunks to sample: %d",
        file_path,
        file_size,
        self.chunk_size // MiB,
        total_chunks_in_file,
        num_chunks_to_sample,
    )

    chunk_indices_to_read = self._random_sample_chunk_indices(
        file_size=file_size,
        num_chunks_to_sample=num_chunks_to_sample,
        total_chunks_in_file=total_chunks_in_file,
    )

    if not chunk_indices_to_read:
        if file_size > 0 and file_size < self.chunk_size:
            logger.debug(
                "File '%s' (size %d) is smaller than one chunk. Reading entire file for QuickHash.",
                file_path,
                file_size,
            )
            try:
                with open(file_path, "rb") as fp:
                    hasher_instance.update(fp.read())
            except OSError:
                logger.exception(
                    "Failed during full read for small file QuickHash on '%s'.",
                    file_path,
                )
                raise
            hex_digest = hasher_instance.hexdigest()
            logger.debug("Generated QuickHash for small file '%s': %s", file_path, hex_digest)
            return hex_digest
        else:
            logger.warning(
                "QuickHash: No chunk indices selected for file '%s' (size %d). Returning None.",
                file_path,
                file_size,
            )
            return None

    logger.debug(
        "QuickHash: Selected chunk indices to read for '%s': %s",
        file_path,
        chunk_indices_to_read,
    )

    try:
        with open(file_path, "rb") as fp:
            for chunk_index in chunk_indices_to_read:
                byte_offset = chunk_index * self.chunk_size
                if byte_offset >= file_size:
                    msg = (
                        f"Calculated offset {byte_offset} exceeds current file size {file_size} "
                        f"for chunk index {chunk_index}. File may have changed during hashing."
                    )
                    logger.error(
                        "QuickHash Instability: %s",
                        msg,
                        extra={"file_path": file_path},
                    )
                    raise QuickHashFileInstabilityError(msg, file_path=file_path)

                fp.seek(byte_offset)
                chunk_data = fp.read(self.chunk_size)

                if not chunk_data:
                    msg = (
                        f"Read empty chunk at offset {byte_offset} (index {chunk_index}) "
                        f"when data was expected (file size {file_size}). "
                        "File may have changed during hashing."
                    )
                    logger.error(
                        "QuickHash Instability: %s",
                        msg,
                        extra={"file_path": file_path},
                    )
                    raise QuickHashFileInstabilityError(msg, file_path=file_path)
                hasher_instance.update(chunk_data)
    except OSError:
        logger.exception("Failed during chunked read for QuickHash on file '%s'.", file_path)
        raise

    hex_digest = hasher_instance.hexdigest()
    logger.debug("Generated QuickHash for '%s': %s", file_path, hex_digest)
    return hex_digest

dorsal.file.utils.file_hasher.FileHasher

FileHasher()

Calculate cryptographic hashes for a given file by reading it in chunks.

Supports standard hashes like SHA-256 and BLAKE3, and can optionally calculate a TLSH (Trend Micro Locality Sensitive Hash) similarity hash if the tlsh library is available and the file meets minimum size requirements.

Initializes the FileHasher.

Source code in venv/lib/python3.12/site-packages/dorsal/file/utils/file_hasher.py
def __init__(self) -> None:
    """Initializes the FileHasher."""
    self.hashers_constructors: dict[str, Any] = self._default_hashers.copy()
    self._tlsh_available: bool | None = None

hash

hash(
    file_path,
    file_size,
    calculate_sha256=True,
    calculate_blake3=True,
    calculate_tlsh=True,
    follow_symlinks=True,
)

Calculates multiple hashes for the specified file.

Parameters:

Name Type Description Default
file_path str

The absolute path to the file to be hashed.

required
file_size int

The size of the file in bytes.

required
calculate_sha256 bool

Whether to calculate SHA-256 (default True).

True
calculate_blake3 bool

Whether to calculate BLAKE3 (default True).

True
calculate_tlsh bool

If True, attempts to calculate the TLSH similarity hash.

True
follow_symlinks bool

If True (default), follows symlinks to hash target content. If False, hashes the symlink pointer string itself.

True

Returns:

Type Description
dict[HashFunctionId, str]

A dictionary mapping hash algorithm names to their hexadecimal string representations.

dict[HashFunctionId, str]

If a hash cannot be calculated (e.g., TLSH due to size), it is omitted.

Raises:

Type Description
FileNotFoundError

If file_path does not exist.

PermissionError

If the file cannot be read due to permissions.

IOError

For other I/O related errors during file reading.

Source code in venv/lib/python3.12/site-packages/dorsal/file/utils/file_hasher.py
def hash(
    self,
    file_path: str,
    file_size: int,
    calculate_sha256: bool = True,
    calculate_blake3: bool = True,
    calculate_tlsh: bool = True,
    follow_symlinks: bool = True,
) -> dict[HashFunctionId, str]:
    """
    Calculates multiple hashes for the specified file.

    Args:
        file_path: The absolute path to the file to be hashed.
        file_size: The size of the file in bytes.
        calculate_sha256: Whether to calculate SHA-256 (default True).
        calculate_blake3: Whether to calculate BLAKE3 (default True).
        calculate_tlsh: If True, attempts to calculate the TLSH similarity hash.
        follow_symlinks: If True (default), follows symlinks to hash target content.
                          If False, hashes the symlink pointer string itself.

    Returns:
        A dictionary mapping hash algorithm names to their hexadecimal string representations.
        If a hash cannot be calculated (e.g., TLSH due to size), it is omitted.

    Raises:
        FileNotFoundError: If `file_path` does not exist.
        PermissionError: If the file cannot be read due to permissions.
        IOError: For other I/O related errors during file reading.
    """
    functions_to_run = []
    if calculate_sha256:
        functions_to_run.append("SHA-256")
    if calculate_blake3:
        functions_to_run.append("BLAKE3")
    if calculate_tlsh:
        functions_to_run.append("TLSH")

    logger.debug(
        "Hashing file: '%s', size: %d bytes, functions: %s",
        file_path,
        file_size,
        functions_to_run,
    )

    constructors_to_use = {
        name: constructor for name, constructor in self.hashers_constructors.items() if name in functions_to_run
    }

    active_hashers: dict[str, Any] = {name: constructor() for name, constructor in constructors_to_use.items()}

    if calculate_tlsh:
        if not follow_symlinks and os.path.islink(file_path):
            logger.debug("Skipping TLSH for symlink '%s' (Physical Mode).", file_path)
        elif self._check_tlsh_availability():
            if file_size >= self.tlsh_min_size:
                import tlsh  # type: ignore[import-not-found]

                active_hashers["TLSH"] = tlsh.Tlsh()
                logger.debug("TLSH hasher added for file: '%s'", file_path)
            else:
                logger.debug(
                    "File '%s' is too small for TLSH. It will not be calculated.",
                    file_path,
                )

    try:
        with self._stream_file_content(file_path, follow_symlinks=follow_symlinks) as fp:
            for chunk in self._yield_chunks(fp):
                for hasher_instance in active_hashers.values():
                    hasher_instance.update(chunk)
    except (FileNotFoundError, PermissionError, IOError, OSError) as e:
        logger.exception("Failed to read file '%s' for hashing: %s", file_path, e)
        raise

    if "TLSH" in active_hashers:
        try:
            active_hashers["TLSH"].final()
            logger.debug("TLSH hash finalized for file: '%s'", file_path)
        except ValueError as err:
            logger.debug(
                "TLSH finalization failed for file '%s': %s. TLSH will be omitted.",
                file_path,
                err,
            )
            active_hashers.pop("TLSH", None)

    calculated_hashes: dict[HashFunctionId, str] = {}
    for hash_name, hasher_instance in active_hashers.items():
        try:
            key = cast(HashFunctionId, hash_name)
            calculated_hashes[key] = hasher_instance.hexdigest()
        except ValueError as err:
            logger.debug(
                "Failed to get hexdigest for '%s' on file '%s': %s.",
                hash_name,
                file_path,
                err,
            )

    logger.debug(
        "Successfully calculated hashes for file: '%s'. Hashes obtained: %s",
        file_path,
        list(calculated_hashes.keys()),
    )
    return calculated_hashes

hash_blake3

hash_blake3(file_path, follow_symlinks=True)

Calculates the BLAKE3 hash for a single file.

Parameters:

Name Type Description Default
file_path str

The absolute path to the file to be hashed.

required
follow_symlinks bool

If True (default), follows symlinks. If False, hashes the link target string.

True

Returns:

Type Description
str

The hexadecimal BLAKE3 hash as a string.

Raises:

Type Description
(IOError, PermissionError, OSError)

If the file cannot be read.

Source code in venv/lib/python3.12/site-packages/dorsal/file/utils/file_hasher.py
def hash_blake3(self, file_path: str, follow_symlinks: bool = True) -> str:
    """
    Calculates the BLAKE3 hash for a single file.

    Args:
        file_path: The absolute path to the file to be hashed.
        follow_symlinks: If True (default), follows symlinks.
                          If False, hashes the link target string.

    Returns:
        The hexadecimal BLAKE3 hash as a string.

    Raises:
        IOError, PermissionError, OSError: If the file cannot be read.
    """
    logger.debug("BLAKE3 hashing file: '%s'", file_path)
    hasher = blake3.blake3()
    try:
        with self._stream_file_content(file_path, follow_symlinks=follow_symlinks) as fp:
            for chunk in self._yield_chunks(fp):
                hasher.update(chunk)
        return hasher.hexdigest()
    except (IOError, PermissionError, OSError) as err:
        logger.error("Failed to read file '%s' for BLAKE3 hashing: %s", file_path, err)
        raise

hash_sha256

hash_sha256(file_path, follow_symlinks=True)

Calculates the SHA-256 hash for a single file.

Parameters:

Name Type Description Default
file_path str

The absolute path to the file to be hashed.

required
follow_symlinks bool

If True (default), follows symlinks. If False, hashes the link target string.

True

Returns:

Type Description
str

The hexadecimal SHA-256 hash as a string.

Raises:

Type Description
(IOError, PermissionError, OSError)

If the file cannot be read.

Source code in venv/lib/python3.12/site-packages/dorsal/file/utils/file_hasher.py
def hash_sha256(self, file_path: str, follow_symlinks: bool = True) -> str:
    """
    Calculates the SHA-256 hash for a single file.

    Args:
        file_path: The absolute path to the file to be hashed.
        follow_symlinks: If True (default), follows symlinks.
                          If False, hashes the link target string.

    Returns:
        The hexadecimal SHA-256 hash as a string.

    Raises:
        IOError, PermissionError, OSError: If the file cannot be read.
    """
    logger.debug("SHA-256 hashing file: '%s'", file_path)
    hasher = hashlib.sha256()
    try:
        with self._stream_file_content(file_path, follow_symlinks=follow_symlinks) as fp:
            for chunk in self._yield_chunks(fp):
                hasher.update(chunk)
        return hasher.hexdigest()
    except (IOError, PermissionError, OSError) as err:
        logger.error("Failed to read file '%s' for SHA-256 hashing: %s", file_path, err)
        raise

hash_tlsh

hash_tlsh(file_path, file_size, follow_symlinks=True)

Calculates the TLSH similarity hash for a single file.

Parameters:

Name Type Description Default
file_path str

The absolute path to the file to be hashed.

required
file_size int

The size of the file in bytes.

required
follow_symlinks bool

If True (default), follows symlinks. If False, always returns None (TLSH not supported on pointers).

True

Returns:

Type Description
str | None

The TLSH hash as a string, or None if:

str | None
  • The library is unavailable.
str | None
  • The file is too small.
str | None
  • The file is a symlink and follow_symlinks is False.

Raises:

Type Description
(IOError, PermissionError, OSError)

If the file cannot be read.

Source code in venv/lib/python3.12/site-packages/dorsal/file/utils/file_hasher.py
def hash_tlsh(self, file_path: str, file_size: int, follow_symlinks: bool = True) -> str | None:
    """
    Calculates the TLSH similarity hash for a single file.

    Args:
        file_path: The absolute path to the file to be hashed.
        file_size: The size of the file in bytes.
        follow_symlinks: If True (default), follows symlinks.
                          If False, always returns None (TLSH not supported on pointers).

    Returns:
        The TLSH hash as a string, or None if:
        - The library is unavailable.
        - The file is too small.
        - The file is a symlink and `follow_symlinks` is False.

    Raises:
        IOError, PermissionError, OSError: If the file cannot be read.
    """
    logger.debug("TLSH hashing file: '%s'", file_path)

    if not follow_symlinks and os.path.islink(file_path):
        return None

    if not self._check_tlsh_availability():
        logger.warning("Cannot calculate TLSH for '%s': tlsh library not available.", file_path)
        return None

    if file_size < self.tlsh_min_size:
        logger.debug(
            "Cannot calculate TLSH for '%s': File size %d is less than minimum %d bytes.",
            file_path,
            file_size,
            self.tlsh_min_size,
        )
        return None

    import tlsh  # type: ignore[import-not-found]

    hasher = tlsh.Tlsh()

    try:
        with self._stream_file_content(file_path, follow_symlinks=follow_symlinks) as fp:
            for chunk in self._yield_chunks(fp):
                hasher.update(chunk)

        hasher.final()
        return hasher.hexdigest()
    except (IOError, PermissionError, OSError) as err:
        logger.error("Failed to read file '%s' for TLSH hashing: %s", file_path, err)
        raise
    except ValueError as e:
        logger.warning("Could not generate TLSH for '%s': %s", file_path, e)
        return None

dorsal.file.utils.get_blake3_hash

get_blake3_hash(file_path, follow_symlinks=True)
Source code in venv/lib/python3.12/site-packages/dorsal/file/utils/__init__.py
def get_blake3_hash(file_path: str, follow_symlinks: bool = True) -> str:
    try:
        return FILE_HASHER.hash_blake3(file_path=file_path, follow_symlinks=follow_symlinks)
    except (IOError, PermissionError):
        raise

dorsal.file.utils.get_quick_hash

get_quick_hash(
    file_path,
    fallback_to_sha256=False,
    file_size=None,
    follow_symlinks=True,
)

Get the quick hash of a file.

When fallback_to_sha256 is True, when QuickHasher fails (e.g. the file is too small) a SHA-256 hash is calculated and returned in its place.

Source code in venv/lib/python3.12/site-packages/dorsal/file/utils/__init__.py
def get_quick_hash(
    file_path: str, fallback_to_sha256: bool = False, file_size: int | None = None, follow_symlinks: bool = True
) -> str | None:
    """Get the quick hash of a file.

    When `fallback_to_sha256` is True, when QuickHasher fails (e.g. the file is too small)
        a SHA-256 hash is calculated and returned in its place.
    """
    quick_hash: str | None = None
    try:
        if file_size is None:
            file_size = get_filesize(file_path)
        quick_hash = QUICK_HASHER.hash(file_path=file_path, file_size=file_size, follow_symlinks=follow_symlinks)
        if quick_hash is None and fallback_to_sha256:
            quick_hash = FILE_HASHER.hash_sha256(file_path=file_path, follow_symlinks=follow_symlinks)
    except OSError as err:
        logger.exception("multi_hash: Failed to get file size for '%s' - %s", file_path, err)
        raise

    return quick_hash

dorsal.file.utils.get_sha256_hash

get_sha256_hash(file_path, follow_symlinks=True)
Source code in venv/lib/python3.12/site-packages/dorsal/file/utils/__init__.py
def get_sha256_hash(file_path: str, follow_symlinks: bool = True) -> str:
    try:
        return FILE_HASHER.hash_sha256(file_path=file_path, follow_symlinks=follow_symlinks)
    except (IOError, PermissionError):
        raise