Skip to content

MetadataReader

dorsal.file.metadata_reader.MetadataReader

MetadataReader(
    api_key=None,
    client=None,
    model_config="default",
    ignore_duplicates=False,
    base_url=constants.BASE_URL,
    file_class=LocalFile,
    offline=False,
)

A high-level utility for processing local files and indexing metadata.

This class provides a convenient interface to the ModelRunner engine. It is designed to simplify the process of scanning local files, extracting rich metadata based on a configurable pipeline, and optionally pushing the results directly to a DorsalHub instance.

Example
from dorsal.file import MetadataReader

# Initialize the reader
reader = MetadataReader()

# Use the reader to process a directory and get LocalFile objects
local_files = reader.scan_directory("path/to/my_data")
print(f"Processed {len(local_files)} files.")

# Or, use the reader to process and immediately index a directory
summary = reader.index_directory("path/to/my_data")
print(f"Indexed {summary['success']} files to DorsalHub.")

Parameters:

Name Type Description Default
client DorsalClient

An existing DorsalClient instance to use for API operations. If not provided, a shared instance will be used. Defaults to None.

None
model_config str | list

A path to a custom JSON pipeline configuration file or a dictionary defining the pipeline for the ModelRunner. If None, the default pipeline is used. Defaults to None.

'default'

Parameters:

Name Type Description Default
api_key str | None

Optional API key for DorsalHub. If not provided, DorsalClient will attempt to read it from environment variables.

None
model_config str | list[dict[str, Any]] | None

Optional configuration for the ModelRunner instance used for local file processing.

'default'
ignore_duplicates bool

If True, duplicate files (based on content hash) encountered during directory indexing will be skipped. If False (default), a DuplicateFileError will be raised.

False
base_url str

The base URL for the Dorsal API. Defaults to the common BASE_URL.

BASE_URL
file_class Type[LocalFile]

Currently only LocalFile supported.

LocalFile
Source code in venv/lib/python3.12/site-packages/dorsal/file/metadata_reader.py
def __init__(
    self,
    api_key: str | None = None,
    client: DorsalClient | None = None,
    model_config: str | list[dict[str, Any]] | None = "default",
    ignore_duplicates: bool = False,
    base_url: str = constants.BASE_URL,
    file_class: Type[LocalFile] = LocalFile,
    offline: bool = False,
):
    """
    Args:
        api_key: Optional API key for DorsalHub. If not provided, DorsalClient
                 will attempt to read it from environment variables.
        model_config: Optional configuration for the ModelRunner instance used for
                      local file processing.
        ignore_duplicates: If True, duplicate files (based on content hash)
                           encountered during directory indexing will be skipped.
                           If False (default), a DuplicateFileError will be raised.
        base_url: The base URL for the Dorsal API. Defaults to the common BASE_URL.
        file_class: Currently only `LocalFile` supported.
    """
    self.offline = offline or is_offline_mode()
    self._ignore_duplicates = ignore_duplicates
    self._model_runner = ModelRunner(pipeline_config=model_config)

    if self.offline:
        if client is not None:
            logger.warning("MetadataReader initialized in OFFLINE mode. The provided 'client' will be ignored.")
        self._client_instance = None
    else:
        self._client_instance = client

    self._api_key = api_key
    self._base_url = base_url
    self._file_class = file_class
    logger.debug("MetadataReader initialized.")

generate_processed_records_from_directory

generate_processed_records_from_directory(
    dir_path,
    *,
    recursive=False,
    limit=None,
    console=None,
    palette=None,
    skip_cache=False,
    overwrite_cache=False,
    follow_symlinks=True
)

Scan directory, and sends each file through the ModelRunner pipeline.

When duplicates are ignored, this returned map intentionally contains only the path of the first file

encountered for each hash

Parameters:

Name Type Description Default
dir_path str

Path to the directory to scan.

required
recursive bool

If True, scans subdirectories recursively. Defaults to False.

False
limit int | None

Optional. top processing files once this many unique records have been generated.

None

Returns:

Type Description
tuple[list[FileRecordStrict], dict[str, str]]

tuple[list[FileRecordStrict], dict[str, str]]: - list of unique FileRecordStrict objects. - mapping file content hashes to original file paths.

Raises:

Type Description
FileNotFoundError

If dir_path does not exist.

DuplicateFileError

If self._ignore_duplicates is False and duplicates found.

DorsalClientError

For errors scanning directory or during local processing.

Source code in venv/lib/python3.12/site-packages/dorsal/file/metadata_reader.py
def generate_processed_records_from_directory(
    self,
    dir_path: str,
    *,
    recursive: bool = False,
    limit: int | None = None,
    console: Console | None = None,
    palette: dict | None = None,
    skip_cache: bool = False,
    overwrite_cache: bool = False,
    follow_symlinks: bool = True,
) -> tuple[list[FileRecordStrict], dict[str, str]]:
    """
    Scan directory, and sends each file through the ModelRunner pipeline.

    NOTE: When duplicates are ignored, this returned map intentionally contains only the path of the *first* file
          encountered for each hash

    Args:
        dir_path: Path to the directory to scan.
        recursive: If True, scans subdirectories recursively. Defaults to False.
        limit: Optional. top processing files once this many unique records have been generated.

    Returns:
        tuple[list[FileRecordStrict], dict[str, str]]:
            - list of unique `FileRecordStrict` objects.
            - mapping file content hashes to original file paths.

    Raises:
        FileNotFoundError: If `dir_path` does not exist.
        DuplicateFileError: If `self._ignore_duplicates` is False and duplicates found.
        DorsalClientError: For errors scanning directory or during local processing.
    """
    logger.debug(
        "Generating processed records from directory: '%s' (Recursive: %s, Ignore Duplicates: %s, Max: %s)",
        dir_path,
        recursive,
        self._ignore_duplicates,
        limit,
    )
    try:
        file_paths = get_file_paths(dir_path=dir_path, recursive=recursive)
    except FileNotFoundError:
        logger.error("Directory not found for generating records: %s", dir_path)
        raise
    except Exception as e:
        logger.error("Error scanning directory '%s' for record generation: %s", dir_path, e)
        raise DorsalClientError(
            message=f"An error occurred while scanning directory: {dir_path}.",
            original_exception=e,
        ) from e

    if not file_paths:
        logger.debug("No files found to process in directory: %s", dir_path)
        return [], {}
    if limit and len(file_paths) > limit:
        error_message = f"Number of records in the directory exceeds the limit: {len(file_paths)} > {limit}"
        raise DorsalError(error_message)

    file_hash_to_path_map: dict[str, str] = {}
    records_to_index_list: list[FileRecordStrict] = []
    processed_files_summary_local: dict[str, str] = {}

    logger.debug(
        "Generating metadata for up to %s file(s) from directory '%s' (Max: %s).",
        len(file_paths),
        dir_path,
        limit if limit is not None else "All",
    )

    rich_progress = None
    iterator: Iterable[str]
    if is_jupyter_environment():
        iterator = tqdm(file_paths, desc="Generating Metadata (Local)")
    elif console is not None:
        from dorsal.cli.themes.palettes import DEFAULT_PALETTE

        active_palette = palette if palette is not None else DEFAULT_PALETTE
        progress_columns = (
            TextColumn(
                "[progress.description]{task.description}",
                style=active_palette["progress_description"],
            ),
            BarColumn(bar_width=None, style=active_palette["progress_bar"]),
            TaskProgressColumn(style=active_palette["progress_percentage"]),
            MofNCompleteColumn(),
            TextColumn("•", style="dim"),
            TimeElapsedColumn(),
            TextColumn("•", style="dim"),
            TimeRemainingColumn(),
        )
        rich_progress = Progress(
            *progress_columns,
            console=console,
            redirect_stdout=True,
            redirect_stderr=True,
        )
        task_id = rich_progress.add_task("Generating Metadata", total=len(file_paths))
        iterator = file_paths
    else:
        iterator = file_paths

    with rich_progress if rich_progress else open(os.devnull, "w"):
        for file_path in iterator:
            if limit is not None and len(records_to_index_list) >= limit:
                logger.debug(
                    "Reached generation limit of %d records. Stopping further local processing for '%s'.",
                    limit,
                    dir_path,
                )
                break
            try:
                processing_path = file_path
                if follow_symlinks:
                    try:
                        path_obj = pathlib.Path(file_path)
                        resolved = path_obj.resolve()
                        if resolved.exists():
                            processing_path = str(resolved)
                    except (OSError, RuntimeError) as err:
                        logger.debug("Failed to resolve symlink: %s - %s", file_path, err)
                        pass
                file_record = self._get_or_create_record(
                    file_path=processing_path, skip_cache=skip_cache, overwrite_cache=overwrite_cache
                )
                if not hasattr(file_record, "hash") or not file_record.hash:
                    logger.error(
                        "Skipping file %s: processed record is missing a hash.",
                        file_path,
                    )
                    processed_files_summary_local[file_path] = "error_missing_hash"
                    continue
                if file_record.hash in file_hash_to_path_map:
                    original_file = file_hash_to_path_map[file_record.hash]
                    if self._ignore_duplicates:
                        logger.debug(
                            "Skipping duplicate: '%s' (original: '%s', hash: %s)",
                            file_path,
                            original_file,
                            file_record.hash,
                        )
                        processed_files_summary_local[file_path] = "skipped_duplicate_content"
                        continue
                    else:
                        logger.error(
                            "Duplicate file content: '%s' and '%s' (hash: %s). Not ignoring.",
                            file_path,
                            original_file,
                            file_record.hash,
                        )
                        raise DuplicateFileError(
                            message="Duplicate file content detected during local processing.",
                            file_paths=[file_path, original_file],
                        )
                file_hash_to_path_map[file_record.hash] = file_path
                records_to_index_list.append(file_record)
                processed_files_summary_local[file_path] = "processed_for_indexing"
            except DuplicateFileError:
                raise
            except (FileNotFoundError, IOError) as err:
                logger.error("Skipping '%s': local access error: %s", file_path, err)
                processed_files_summary_local[file_path] = f"error_local_access: {type(err).__name__}"
            except DorsalClientError as err:
                logger.error("Skipping '%s': model execution error: %s", file_path, err)
                processed_files_summary_local[file_path] = (
                    f"error_model_execution: {type(err.original_exception or err).__name__}"
                )
            except Exception as err:
                logger.exception(
                    "Unexpected error processing file '%s'. Skipping this file.",
                    file_path,
                )
                processed_files_summary_local[file_path] = f"error_unexpected: {type(err).__name__}"

            if rich_progress:
                rich_progress.update(task_id, advance=1)

    logger.debug(
        "Completed local generation for '%s'. %d unique records generated.",
        dir_path,
        len(records_to_index_list),
    )
    return records_to_index_list, file_hash_to_path_map

index_directory

index_directory(
    dir_path,
    *,
    recursive=False,
    public=False,
    skip_cache=False,
    fail_fast=True,
    console=None,
    palette=None,
    follow_symlinks=True
)

Scans, processes, and indexes all files in a directory to DorsalHub.

This method performs a complete workflow: 1. Scans the directory for files. 2. Runs the metadata extraction pipeline on each unique file. 3. Indexes the records to DorsalHub in managed batches.

This method supports two modes of operation via fail_fast: - True (Default): Aborts immediately if a batch fails. Useful for interactive sessions or debugging. - False: Logs errors and continues processing subsequent batches. Useful for bulk uploads where partial success is acceptable.

Example
from dorsal.file import MetadataReader
from dorsal.common.exceptions import BatchIndexingError

reader = MetadataReader()

# Scenario 1: Interactive / Fail-Fast (Default)
try:
    summary = reader.index_directory("path/to/data")
    print(f"Success! {summary['success']} records indexed.")
except BatchIndexingError as e:
    print(f"Aborted after batch {e.summary['processed']}. Error: {e}")

# Scenario 2: Bulk Upload / Best-Effort
summary = reader.index_directory(
    "path/to/large_dataset",
    fail_fast=False
)
print(f"Finished. Success: {summary['success']}, Failed: {summary['failed']}")
if summary['errors']:
    print(f"First error: {summary['errors'][0]['error_message']}")

Parameters:

Name Type Description Default
dir_path str

The path to the directory to scan and index.

required
recursive bool

If True, scans all subdirectories.

False
public bool

If True, records are created as public.

False
skip_cache bool

If True, re-processes files locally even if they haven't changed.

False
fail_fast bool

If True, raises BatchIndexingError immediately on batch failure. If False, records the error and continues. Defaults to True.

True

Returns:

Name Type Description
dict dict

A summary dictionary detailing the results. Keys: 'total_records', 'processed', 'success', 'failed', 'batches', 'errors'.

Raises:

Type Description
FileNotFoundError

If dir_path does not exist.

BatchIndexingError

If a batch fails and fail_fast is True.

DorsalClientError

For critical errors before batching begins.

Source code in venv/lib/python3.12/site-packages/dorsal/file/metadata_reader.py
def index_directory(
    self,
    dir_path: str,
    *,
    recursive: bool = False,
    public: bool = False,
    skip_cache: bool = False,
    fail_fast: bool = True,
    console: "Console | None" = None,
    palette: dict | None = None,
    follow_symlinks: bool = True,
) -> dict:
    """Scans, processes, and indexes all files in a directory to DorsalHub.

    This method performs a complete workflow:
    1. Scans the directory for files.
    2. Runs the metadata extraction pipeline on each unique file.
    3. Indexes the records to DorsalHub in managed batches.

    This method supports two modes of operation via `fail_fast`:
    - **True (Default):** Aborts immediately if a batch fails. Useful for
      interactive sessions or debugging.
    - **False:** Logs errors and continues processing subsequent batches.
      Useful for bulk uploads where partial success is acceptable.

    Example:
        ```python
        from dorsal.file import MetadataReader
        from dorsal.common.exceptions import BatchIndexingError

        reader = MetadataReader()

        # Scenario 1: Interactive / Fail-Fast (Default)
        try:
            summary = reader.index_directory("path/to/data")
            print(f"Success! {summary['success']} records indexed.")
        except BatchIndexingError as e:
            print(f"Aborted after batch {e.summary['processed']}. Error: {e}")

        # Scenario 2: Bulk Upload / Best-Effort
        summary = reader.index_directory(
            "path/to/large_dataset",
            fail_fast=False
        )
        print(f"Finished. Success: {summary['success']}, Failed: {summary['failed']}")
        if summary['errors']:
            print(f"First error: {summary['errors'][0]['error_message']}")
        ```

    Args:
        dir_path (str): The path to the directory to scan and index.
        recursive (bool, optional): If True, scans all subdirectories.
        public (bool, optional): If True, records are created as public.
        skip_cache (bool, optional): If True, re-processes files locally
            even if they haven't changed.
        fail_fast (bool, optional): If True, raises `BatchIndexingError`
            immediately on batch failure. If False, records the error
            and continues. Defaults to True.

    Returns:
        dict: A summary dictionary detailing the results.
              Keys: 'total_records', 'processed', 'success', 'failed', 'batches', 'errors'.

    Raises:
        FileNotFoundError: If `dir_path` does not exist.
        BatchIndexingError: If a batch fails and `fail_fast` is True.
        DorsalClientError: For critical errors before batching begins.
    """
    from dorsal.common.exceptions import DorsalClientError, DuplicateFileError

    logger.debug(
        "MetadataReader.index_directory called: dir='%s', rec=%s, public=%s, fail_fast=%s",
        dir_path,
        recursive,
        public,
        fail_fast,
    )
    if self.offline:
        raise DorsalError("Cannot index directory: MetadataReader is in OFFLINE mode.")

    records_to_index_list, file_hash_to_path_map = self.generate_processed_records_from_directory(
        dir_path=dir_path,
        recursive=recursive,
        limit=None,
        skip_cache=skip_cache,
        console=console,
        palette=palette,
        follow_symlinks=follow_symlinks,
    )

    if not records_to_index_list:
        return {
            "total_records": 0,
            "processed": 0,
            "success": 0,
            "failed": 0,
            "batches": [],
            "errors": [],
        }

    return self.upload_records(
        records=records_to_index_list,
        public=public,
        fail_fast=fail_fast,
        hash_to_path_map=file_hash_to_path_map,
        console=console,
        palette=palette,
    )

index_file

index_file(
    file_path,
    *,
    public=False,
    skip_cache=False,
    overwrite_cache=False
)

Processes a single file and immediately indexes it to DorsalHub.

This method provides a complete "read and push" workflow for a single file. It runs the metadata extraction pipeline and then uploads the resulting record to DorsalHub.

Example
from dorsal.file import MetadataReader

reader = MetadataReader()

try:
    result = reader.index_file("path/to/important_document.docx")
    if result.success:
        print("Document indexed successfully!")
except Exception as e:
    print(f"Failed to index document: {e}")

Parameters:

Name Type Description Default
file_path str

The path to the local file to process and index.

required
public bool

If True, the file record will be created as public. Defaults to False.

False
skip_cache bool

If True, forces reprocessing even if cache exists.

False
overwrite_cache bool

If True, updates the cache with new results.

False

Returns:

Name Type Description
FileIndexResponse FileIndexResponse

A response object from the API detailing the result of the indexing operation.

Raises:

Type Description
DorsalClientError

If there's an error processing the file locally or an API error during indexing.

FileNotFoundError

If the file_path does not exist.

IOError

If the file cannot be read by ModelRunner.

Source code in venv/lib/python3.12/site-packages/dorsal/file/metadata_reader.py
def index_file(
    self, file_path: str, *, public: bool = False, skip_cache: bool = False, overwrite_cache: bool = False
) -> FileIndexResponse:
    """Processes a single file and immediately indexes it to DorsalHub.

    This method provides a complete "read and push" workflow for a single
    file. It runs the metadata extraction pipeline and then uploads the
    resulting record to DorsalHub.

    Example:
        ```python
        from dorsal.file import MetadataReader

        reader = MetadataReader()

        try:
            result = reader.index_file("path/to/important_document.docx")
            if result.success:
                print("Document indexed successfully!")
        except Exception as e:
            print(f"Failed to index document: {e}")
        ```

    Args:
        file_path (str): The path to the local file to process and index.
        public (bool, optional): If True, the file record will be created as public. Defaults to False.
        skip_cache (bool, optional): If True, forces reprocessing even if cache exists.
        overwrite_cache (bool, optional): If True, updates the cache with new results.

    Returns:
        FileIndexResponse: A response object from the API detailing the
            result of the indexing operation.

    Raises:
        DorsalClientError: If there's an error processing the file locally or an API error
                           during indexing.
        FileNotFoundError: If the file_path does not exist.
        IOError: If the file cannot be read by ModelRunner.
    """
    if self.offline:
        raise DorsalError("Cannot index file: MetadataReader is in OFFLINE mode.")
    logger.debug("Starting file indexing for: %s (Public: %s)", file_path, public)
    file_record = self._get_or_create_record(
        file_path=file_path, skip_cache=skip_cache, overwrite_cache=overwrite_cache
    )

    logger.debug("Indexing file '%s' (hash: %s) to DorsalHub...", file_path, file_record.hash)

    api_response: FileIndexResponse
    if public:
        api_response = self._client.index_public_file_records(file_records=[file_record])
    else:
        api_response = self._client.index_private_file_records(file_records=[file_record])

    log_status_msg = "processed by API"
    if api_response.results:
        result_item = api_response.results[0]
        result_item.file_path = file_path

        was_created = False
        if api_response.response and api_response.response.status_code == 201:
            was_created = True
        if was_created:
            log_status_msg = "newly indexed"
        elif api_response.success > 0:
            log_status_msg = "updated/existing"
        elif api_response.unauthorized > 0:
            log_status_msg = "unauthorized"
        elif api_response.error > 0:
            log_status_msg = "error during indexing attempt"

        logger.debug(
            "File '%s' %s on DorsalHub. URL: %s",
            file_path,
            log_status_msg,
            getattr(result_item, "url", "N/A"),
        )
    elif api_response.error > 0:
        logger.error("Failed to index file %s.", file_path)
    else:
        raw_resp_text = api_response.response.text[:200] if api_response.response else "N/A"
        logger.warning(
            "Unexpected response from server for file %s (no results or errors in structured response, "
            "but success/error counts are Total:%d, Success:%d, Error:%d, Unauthorized:%d). "
            "Raw response snippet: %s",
            file_path,
            api_response.total,
            api_response.success,
            api_response.error,
            api_response.unauthorized,
            raw_resp_text,
        )
    return api_response

scan_directory

scan_directory(
    dir_path: str,
    *,
    recursive: bool = False,
    return_errors: Literal[False] = False,
    console: Console | None = None,
    palette: dict | None = None,
    skip_cache: bool = False,
    overwrite_cache: bool = False,
    follow_symlinks: bool = True
) -> list[LocalFile]
scan_directory(
    dir_path: str,
    *,
    recursive: bool = False,
    return_errors: Literal[True],
    console: Console | None = None,
    palette: dict | None = None,
    skip_cache: bool = False,
    overwrite_cache: bool = False,
    follow_symlinks: bool = True
) -> tuple[list[LocalFile], list[str]]
scan_directory(
    dir_path,
    *,
    recursive=False,
    return_errors=False,
    console=None,
    palette=None,
    skip_cache=False,
    overwrite_cache=False,
    follow_symlinks=True
)

Scans a directory and runs the pipeline on all found files.

This method discovers all files within a given directory (and optionally its subdirectories), runs the metadata extraction pipeline on each one, and returns a list of the resulting LocalFile objects. This is an offline operation.

Example
from dorsal.file import MetadataReader

reader = MetadataReader()
files_to_process = reader.scan_directory("path/to/images", recursive=True)

print(f"Found and processed {len(files_to_process)} image files.")

Parameters:

Name Type Description Default
dir_path str

The path to the directory to scan.

required
recursive bool

If True, scans all subdirectories recursively. Defaults to False.

False

Returns:

Name Type Description
list[LocalFile] | tuple[list[LocalFile], list[str]]

list[LocalFile]: initialized LocalFile instances.

list[LocalFile] | tuple[list[LocalFile], list[str]]

Or

tuple list[LocalFile] | tuple[list[LocalFile], list[str]]
list[LocalFile] | tuple[list[LocalFile], list[str]]
  • list[LocalFile]: initialized LocalFile instances.
list[LocalFile] | tuple[list[LocalFile], list[str]]
  • list[str]: errors (for cli output).
Source code in venv/lib/python3.12/site-packages/dorsal/file/metadata_reader.py
def scan_directory(
    self,
    dir_path: str,
    *,
    recursive: bool = False,
    return_errors: bool = False,
    console: Console | None = None,
    palette: dict | None = None,
    skip_cache: bool = False,
    overwrite_cache: bool = False,
    follow_symlinks: bool = True,
) -> list[LocalFile] | tuple[list[LocalFile], list[str]]:
    """Scans a directory and runs the pipeline on all found files.

    This method discovers all files within a given directory (and optionally
    its subdirectories), runs the metadata extraction pipeline on each one,
    and returns a list of the resulting `LocalFile` objects. This is an
    offline operation.

    Example:
        ```python
        from dorsal.file import MetadataReader

        reader = MetadataReader()
        files_to_process = reader.scan_directory("path/to/images", recursive=True)

        print(f"Found and processed {len(files_to_process)} image files.")
        ```

    Args:
        dir_path (str): The path to the directory to scan.
        recursive (bool, optional): If True, scans all subdirectories
            recursively. Defaults to False.

    Returns:
        list[LocalFile]: initialized `LocalFile` instances.
        Or
        tuple:
        - list[LocalFile]: initialized `LocalFile` instances.
        - list[str]: errors (for cli output).
    """
    logger.debug("Starting directory read for: %s (Recursive: %s)", dir_path, recursive)
    try:
        file_paths = get_file_paths(dir_path=dir_path, recursive=recursive)
    except FileNotFoundError:
        logger.error("Directory not found: %s", dir_path)
        raise
    except Exception as e:
        logger.error("Error scanning directory %s: %s", dir_path, e)
        raise DorsalError(
            message=f"An error occurred while scanning directory: {dir_path}.",
            original_exception=e,
        ) from e

    if not file_paths:
        logger.debug("No files found to read in directory: %s", dir_path)
        return ([], []) if return_errors else []

    local_files: list[LocalFile] = []
    warnings: list[str] = []
    logger.debug("Found %d file(s). Processing with local models...", len(file_paths))

    rich_progress = None
    iterator: Iterable[str]
    if is_jupyter_environment():
        iterator = tqdm(file_paths, desc="Processing Files")
    elif console is not None:
        from dorsal.cli.themes.palettes import DEFAULT_PALETTE

        active_palette = palette if palette is not None else DEFAULT_PALETTE
        progress_columns = (
            TextColumn(
                "[progress.description]{task.description}",
                style=active_palette["progress_description"],
            ),
            BarColumn(bar_width=None, style=active_palette["progress_bar"]),
            TaskProgressColumn(style=active_palette["progress_percentage"]),
            MofNCompleteColumn(),
            TextColumn("•", style="dim"),
            TimeElapsedColumn(),
            TextColumn("•", style="dim"),
            TimeRemainingColumn(),
        )
        rich_progress = Progress(
            *progress_columns,
            console=console,
            transient=True,
            redirect_stdout=True,
            redirect_stderr=True,
        )
        task_id = rich_progress.add_task("Processing files", total=len(file_paths))
        iterator = file_paths
    else:
        iterator = file_paths

    with rich_progress if rich_progress else open(os.devnull, "w"):
        for file_path in iterator:
            try:
                local_file_obj = self._file_class(
                    file_path=file_path,
                    client=self._client_instance,
                    use_cache=not skip_cache,
                    offline=self.offline,
                    overwrite_cache=overwrite_cache,
                    follow_symlinks=follow_symlinks,
                )
                local_files.append(local_file_obj)
            except Exception as e:
                msg = f"Skipping '{os.path.basename(file_path)}' due to processing error: {e}"
                logger.error(msg)
                warnings.append(msg)

            if rich_progress:
                rich_progress.update(task_id, advance=1)

    logger.debug(
        "Successfully processed %d files into %s objects from directory %s.",
        len(local_files),
        self._file_class.__name__,
        dir_path,
    )

    if return_errors:
        return local_files, warnings
    return local_files

scan_file

scan_file(
    file_path,
    *,
    skip_cache,
    overwrite_cache=False,
    follow_symlinks=True
)

Runs the metadata extraction pipeline on a single file.

This method processes a single local file through the configured ModelRunner pipeline, generating all its metadata. This is an offline operation that does not contact the DorsalHub API.

Example
from dorsal.file import MetadataReader

reader = MetadataReader()
try:
    local_file = reader.scan_file("path/to/document.pdf")
    print(f"Successfully processed: {local_file.name}")
    if local_file.pdf:
        print(f"Page Count: {local_file.pdf.page_count}")
except FileNotFoundError:
    print("The specified file does not exist.")

Parameters:

Name Type Description Default
file_path str

The path to the local file to process.

required

Returns:

Name Type Description
LocalFile LocalFile

An initialized LocalFile instance containing the rich metadata extracted by the pipeline.

Raises:

Type Description
FileNotFoundError

If the file_path does not exist or is not a file.

IOError

If there are issues reading the file.

DorsalError

For other errors during model execution

Source code in venv/lib/python3.12/site-packages/dorsal/file/metadata_reader.py
def scan_file(
    self, file_path: str, *, skip_cache: bool, overwrite_cache: bool = False, follow_symlinks: bool = True
) -> LocalFile:
    """Runs the metadata extraction pipeline on a single file.

    This method processes a single local file through the configured
    `ModelRunner` pipeline, generating all its metadata. This is an
    offline operation that does not contact the DorsalHub API.

    Example:
        ```python
        from dorsal.file import MetadataReader

        reader = MetadataReader()
        try:
            local_file = reader.scan_file("path/to/document.pdf")
            print(f"Successfully processed: {local_file.name}")
            if local_file.pdf:
                print(f"Page Count: {local_file.pdf.page_count}")
        except FileNotFoundError:
            print("The specified file does not exist.")
        ```

    Args:
        file_path (str): The path to the local file to process.

    Returns:
        LocalFile: An initialized `LocalFile` instance containing the rich
            metadata extracted by the pipeline.

    Raises:
        FileNotFoundError: If the file_path does not exist or is not a file.
        IOError: If there are issues reading the file.
        DorsalError: For other errors during model execution

    """
    logger.debug("Starting file read for: %s", file_path)
    try:
        local_file_obj = self._file_class(
            file_path=file_path,
            client=self._client_instance,
            use_cache=not skip_cache,
            overwrite_cache=overwrite_cache,
            offline=self.offline,
            follow_symlinks=follow_symlinks,
        )
    except FileNotFoundError:
        logger.error("File not found for reading: %s", file_path)
        raise
    except IOError as e:
        logger.error("IOError reading file %s: %s", file_path, e)
        raise
    except Exception as e:
        logger.error("Failed to read/process file %s: %s", file_path, e)
        raise DorsalError(
            message=f"Failed to process local file: {file_path}.",
            original_exception=e,
        ) from e

    logger.debug(
        "Successfully processed file '%s' into a %s object (hash: %s).",
        file_path,
        self._file_class.__name__,
        local_file_obj.hash,
    )
    return local_file_obj

upload_records

upload_records(
    records,
    *,
    public=False,
    fail_fast=True,
    hash_to_path_map=None,
    console=None,
    palette=None
)

Uploads a list of file records to DorsalHub in batches.

Parameters:

Name Type Description Default
records list[FileRecordStrict]

List of validated FileRecordStrict objects to upload.

required
public bool

If True, uploads to the public index.

False
fail_fast bool

If True, raises BatchIndexingError on the first failed batch.

True
hash_to_path_map dict[str, str] | None

Optional mapping of {hash: local_path}.

None
console 'Console | None'

Rich Console instance for progress reporting.

None
palette dict | None

Theme palette for the progress bar.

None

Returns:

Name Type Description
dict dict[str, Any]

A summary of the upload operation.

Source code in venv/lib/python3.12/site-packages/dorsal/file/metadata_reader.py
def upload_records(
    self,
    records: list[FileRecordStrict],
    *,
    public: bool = False,
    fail_fast: bool = True,
    hash_to_path_map: dict[str, str] | None = None,
    console: "Console | None" = None,
    palette: dict | None = None,
) -> dict[str, Any]:
    """
    Uploads a list of file records to DorsalHub in batches.

    Args:
        records: List of validated FileRecordStrict objects to upload.
        public: If True, uploads to the public index.
        fail_fast: If True, raises BatchIndexingError on the first failed batch.
        hash_to_path_map: Optional mapping of {hash: local_path}.
        console: Rich Console instance for progress reporting.
        palette: Theme palette for the progress bar.

    Returns:
        dict: A summary of the upload operation.
    """
    from dorsal.common.exceptions import BatchIndexingError
    from dorsal.client.validators import FileIndexResponse
    from dorsal.common.environment import is_jupyter_environment
    from rich.progress import (
        Progress,
        BarColumn,
        TaskProgressColumn,
        MofNCompleteColumn,
        TextColumn,
        TimeElapsedColumn,
        TimeRemainingColumn,
    )

    total_records = len(records)
    summary: dict[str, Any] = {
        "total_records": total_records,
        "processed": 0,
        "success": 0,
        "failed": 0,
        "batches": [],
        "errors": [],
    }

    if not records:
        return summary

    batches = [
        records[i : i + constants.API_MAX_BATCH_SIZE] for i in range(0, total_records, constants.API_MAX_BATCH_SIZE)
    ]

    logger.debug(
        "Uploading %d records in %d batches (Public: %s).",
        total_records,
        len(batches),
        public,
    )

    # --- Progress Bar Setup ---
    rich_progress = None
    iterator: Iterable[list[FileRecordStrict]]

    if is_jupyter_environment():
        from tqdm import tqdm

        iterator = tqdm(batches, desc="Pushing batches")
    elif console:
        from dorsal.cli.themes.palettes import DEFAULT_PALETTE

        active_palette = palette if palette is not None else DEFAULT_PALETTE

        progress_columns = (
            TextColumn(
                "[progress.description]{task.description}",
                style=active_palette.get("progress_description", "default"),
            ),
            BarColumn(bar_width=None, style=active_palette.get("progress_bar", "default")),
            TaskProgressColumn(style=active_palette.get("progress_percentage", "default")),
            MofNCompleteColumn(),
            TextColumn("•", style="dim"),
            TimeElapsedColumn(),
            TextColumn("•", style="dim"),
            TimeRemainingColumn(),
        )
        rich_progress = Progress(
            *progress_columns,
            console=console,
            redirect_stdout=True,
            transient=True,
            redirect_stderr=True,
        )
        task_id = rich_progress.add_task("Pushing batches...", total=len(batches))
        iterator = batches
    else:
        iterator = batches

    # --- Upload Loop ---
    with rich_progress if rich_progress else open(os.devnull, "w"):
        for i, batch in enumerate(iterator):
            batch_size = len(batch)
            batch_num = i + 1

            try:
                api_response: FileIndexResponse
                if public:
                    api_response = self._client.index_public_file_records(file_records=batch)
                else:
                    api_response = self._client.index_private_file_records(file_records=batch)

                if api_response.results:
                    for result_item in api_response.results:
                        if hash_to_path_map and hasattr(result_item, "hash") and hasattr(result_item, "file_path"):
                            local_path = hash_to_path_map.get(result_item.hash)
                            if local_path:
                                result_item.file_path = local_path

                        if hasattr(result_item, "annotations") and result_item.annotations:
                            for ann in result_item.annotations:
                                if hasattr(ann, "status") and ann.status in ("error", "unauthorized"):
                                    summary["errors"].append(
                                        {
                                            "batch_index": i,
                                            "status": ann.status,
                                            "file_hash": result_item.hash,
                                            "file_path": getattr(result_item, "file_path", None),
                                            "error_message": f"Annotation '{ann.name}': {ann.detail or 'Unknown error'}",
                                        }
                                    )

                summary["success"] += api_response.success
                summary["processed"] += batch_size
                summary["batches"].append(
                    {
                        "batch_index": i,
                        "status": "success",
                        "records_in_batch": batch_size,
                        "records_accepted": api_response.success,
                    }
                )
                logger.debug("Batch %d/%d succeeded.", batch_num, len(batches))

            except Exception as err:
                summary["failed"] += batch_size
                summary["processed"] += batch_size
                error_entry = {
                    "batch_index": i,
                    "status": "failure",
                    "records_in_batch": batch_size,
                    "error_type": type(err).__name__,
                    "error_message": str(err),
                }
                summary["batches"].append(error_entry)
                summary["errors"].append(error_entry)
                logger.error("Batch %d failed: %s", batch_num, err)

                if fail_fast:
                    raise BatchIndexingError(
                        f"Batch indexing failed at batch {batch_num}: {err}",
                        summary=summary,
                        original_error=err,
                    ) from err

            if rich_progress:
                rich_progress.update(task_id, advance=1)

    return summary