Skip to content

FileAnnotator

dorsal.file.file_annotator.FileAnnotator

Orchestrates on-demand annotation of local files.

Acts as a bridge between high-level callers (like LocalFile) and the ModelRunner, handling single annotation tasks, validating manual data, and wrapping results into a standardized format.

annotate_file_using_model_and_validator

annotate_file_using_model_and_validator(
    *,
    file_path,
    model_runner,
    annotation_model_cls,
    schema_id,
    schema_version=None,
    private,
    options=None,
    validation_model=None,
    ignore_linter_errors=False
)

Runs a given annotation model class directly.

Parameters:

Name Type Description Default
file_path str

Path to the local file.

required
model_runner ModelRunner

An instance of the ModelRunner.

required
annotation_model_cls Type[AnnotationModel]

The annotation model class to execute.

required
schema_id str

The dataset ID for the resulting annotation.

required
options dict | None

Optional keyword arguments for the model's main() method.

None
validation_model Type[BaseModel] | JsonSchemaValidator | None

Optional validator for the model's output.

None

Returns:

Type Description
Annotation | AnnotationGroup

An Annotation object with the model's output.

Raises:

Type Description
AnnotationConfigurationError

If schema_id is not provided.

AnnotationExecutionError

If the model fails to run.

Source code in venv/lib/python3.13/site-packages/dorsal/file/file_annotator.py
def annotate_file_using_model_and_validator(
    self,
    *,
    file_path: str,
    model_runner: ModelRunner,
    annotation_model_cls: Type[AnnotationModel],
    schema_id: str,
    schema_version: str | None = None,
    private: bool,
    options: dict | None = None,
    validation_model: Type[BaseModel] | JsonSchemaValidator | None = None,
    ignore_linter_errors: bool = False,
) -> Annotation | AnnotationGroup:
    """
    Runs a given annotation model class directly.

    Args:
        file_path: Path to the local file.
        model_runner: An instance of the ModelRunner.
        annotation_model_cls: The annotation model class to execute.
        schema_id: The dataset ID for the resulting annotation.
        options: Optional keyword arguments for the model's main() method.
        validation_model: Optional validator for the model's output.

    Returns:
        An `Annotation` object with the model's output.

    Raises:
        AnnotationConfigurationError: If `schema_id` is not provided.
        AnnotationExecutionError: If the model fails to run.
    """
    logger.debug(
        "Annotating file '%s' with model '%s' for dataset '%s'.",
        file_path,
        annotation_model_cls.__name__,
        schema_id,
    )
    if schema_id is None:
        raise AnnotationConfigurationError("`schema_id` must be provided.")

    if not (
        hasattr(annotation_model_cls, "id") and isinstance(annotation_model_cls.id, str) and annotation_model_cls.id
    ):
        raise AnnotationConfigurationError(
            f"The provided AnnotationModel class '{annotation_model_cls.__name__}' "
            "is missing a required, non-empty 'id' string attribute."
        )

    run_model_result = self._execute(
        model_runner=model_runner,
        annotation_model=annotation_model_cls,
        validation_model=validation_model,
        file_path=file_path,
        schema_id=schema_id,
        options=options,
        ignore_linter_errors=ignore_linter_errors,
    )

    return self._make_annotation(
        validated_annotation=cast(dict, run_model_result.record),
        schema_id=schema_id,
        schema_version=schema_version,
        private=private,
        source=run_model_result.source.model_dump(),
    )

annotate_file_using_pipeline_step

annotate_file_using_pipeline_step(
    *,
    file_path,
    model_runner,
    pipeline_step,
    schema_id=None,
    schema_version=None,
    private
)

Runs an annotation model defined by a single pipeline step.

Note: This ignores any dependency rules within the pipeline step.

Parameters:

Name Type Description Default
file_path str

Absolute or relative path to the local file.

required
model_runner ModelRunner

An instance of the ModelRunner.

required
pipeline_step ModelRunnerPipelineStep | dict[str, Any]

A ModelRunnerPipelineStep object or a dict defining the step.

required
schema_id str | None

Optional. Overrides the schema_id from the pipeline_step.

None

Returns:

Type Description
Annotation | AnnotationGroup

An Annotation object containing the model's output.

Raises:

Type Description
AnnotationConfigurationError

If the pipeline_step config is invalid.

AnnotationImportError

If the specified model or validator cannot be imported.

AnnotationExecutionError

If the model fails to run or its output is invalid.

Source code in venv/lib/python3.13/site-packages/dorsal/file/file_annotator.py
def annotate_file_using_pipeline_step(
    self,
    *,
    file_path: str,
    model_runner: ModelRunner,
    pipeline_step: ModelRunnerPipelineStep | dict[str, Any],
    schema_id: str | None = None,
    schema_version: str | None = None,
    private: bool,
) -> Annotation | AnnotationGroup:
    """
    Runs an annotation model defined by a single pipeline step.

    Note: This ignores any dependency rules within the pipeline step.

    Args:
        file_path: Absolute or relative path to the local file.
        model_runner: An instance of the ModelRunner.
        pipeline_step: A `ModelRunnerPipelineStep` object or a dict defining the step.
        schema_id: Optional. Overrides the `schema_id` from the pipeline_step.

    Returns:
        An `Annotation` object containing the model's output.

    Raises:
        AnnotationConfigurationError: If the pipeline_step config is invalid.
        AnnotationImportError: If the specified model or validator cannot be imported.
        AnnotationExecutionError: If the model fails to run or its output is invalid.
    """
    logger.debug("Annotating file '%s' using pipeline step.", file_path)
    if isinstance(pipeline_step, dict):
        try:
            pipeline_step_obj = ModelRunnerPipelineStep(**pipeline_step)
        except PydanticValidationError as err:
            raise AnnotationConfigurationError(f"Invalid `pipeline_step` dictionary provided: {err}") from err
    elif isinstance(pipeline_step, ModelRunnerPipelineStep):
        pipeline_step_obj = pipeline_step
    else:
        raise AnnotationConfigurationError(
            f"pipeline_step must be a dict or ModelRunnerPipelineStep, not {type(pipeline_step).__name__}."
        )

    effective_schema_id = schema_id if schema_id is not None else pipeline_step_obj.schema_id

    try:
        annotator_callable = import_callable(import_path=pipeline_step_obj.annotation_model)
        if not (inspect.isclass(annotator_callable) and issubclass(annotator_callable, AnnotationModel)):
            raise TypeError(
                f"Imported callable '{annotator_callable.__name__}' is not a subclass of AnnotationModel."
            )
        annotator_class = cast(Type[AnnotationModel], annotator_callable)

        validator: Callable | JsonSchemaValidatorType | None = None
        if pipeline_step_obj.validation_model:
            if isinstance(pipeline_step_obj.validation_model, dict):
                validator = get_json_schema_validator(schema=pipeline_step_obj.validation_model, strict=True)
            else:
                validator_callable = import_callable(import_path=pipeline_step_obj.validation_model)
                if not (
                    is_pydantic_model_class(validator_callable)
                    or isinstance(validator_callable, JsonSchemaValidator)
                ):
                    raise TypeError(
                        f"Imported validator '{pipeline_step_obj.validation_model.name}' is not a supported type."
                    )
                validator = validator_callable
    except (ImportError, AttributeError, TypeError) as err:
        msg = (
            "Failed to import model/validator from config: "
            f"{pipeline_step_obj.annotation_model.module}.{pipeline_step_obj.annotation_model.name}"
        )
        logger.exception("AnnotationImportError: %s.", msg)
        raise AnnotationImportError(msg) from err

    run_model_result = self._execute(
        model_runner=model_runner,
        annotation_model=annotator_class,
        validation_model=validator,
        file_path=file_path,
        schema_id=effective_schema_id,
        options=pipeline_step_obj.options,
        ignore_linter_errors=pipeline_step_obj.ignore_linter_errors,
    )

    final_version = schema_version
    if final_version is None and hasattr(run_model_result, "schema_version"):
        final_version = run_model_result.schema_version

    return self._make_annotation(
        validated_annotation=cast(dict, run_model_result.record),
        schema_id=effective_schema_id,
        schema_version=final_version,
        private=private,
        source=run_model_result.source.model_dump(),
    )

make_manual_annotation

make_manual_annotation(
    *,
    annotation,
    schema_id,
    schema_version=None,
    source_id,
    validator=None,
    private,
    ignore_linter_errors=False,
    force=False
)

Creates a fully-formed Annotation object from a manual payload.

Parameters:

Name Type Description Default
annotation BaseModel | dict[str, Any]

The annotation data (dict or Pydantic model).

required
schema_id str

The validation schema for this annotation.

required
detail

A string describing the source of the manual annotation.

required
validator Type[BaseModel] | JsonSchemaValidator | None

An optional validator for the payload.

None

Returns:

Type Description
Annotation | AnnotationGroup

A constructed and validated Annotation object.

Raises:

Type Description
AnnotationConfigurationError

If config/types are invalid.

AnnotationValidationError

If the payload fails validation.

DataQualityError

If the payload fails post-validation data quality linting.

Source code in venv/lib/python3.13/site-packages/dorsal/file/file_annotator.py
def make_manual_annotation(
    self,
    *,
    annotation: BaseModel | dict[str, Any],
    schema_id: str,
    schema_version: str | None = None,
    source_id: str | None,
    validator: Type[BaseModel] | JsonSchemaValidator | None = None,
    private: bool,
    ignore_linter_errors: bool = False,
    force: bool = False,
) -> Annotation | AnnotationGroup:
    """
    Creates a fully-formed `Annotation` object from a manual payload.

    Args:
        annotation: The annotation data (dict or Pydantic model).
        schema_id: The validation schema for this annotation.
        detail: A string describing the source of the manual annotation.
        validator: An optional validator for the payload.

    Returns:
        A constructed and validated `Annotation` object.

    Raises:
        AnnotationConfigurationError: If config/types are invalid.
        AnnotationValidationError: If the payload fails validation.
        DataQualityError: If the payload fails post-validation data quality linting.
    """
    logger.debug("Creating manual annotation for validation schema '%s'.", schema_id)

    if force:
        logger.debug("`force=True: skipping all validation checks.")
        if is_pydantic_model_instance(annotation):
            validated_annotation = annotation.model_dump(by_alias=True, exclude_none=True)
        else:
            validated_annotation = cast(dict[str, Any], annotation)
    else:
        validated_annotation = self.validate_manual_annotation(annotation=annotation, validator=validator)

        raise_on_error = not ignore_linter_errors
        apply_linter(schema_id=schema_id, record=validated_annotation, raise_on_error=raise_on_error)

    if source_id is None:
        source_id = secrets.token_hex(12)

    source = AnnotationManualSource(id=source_id).model_dump()

    return self._make_annotation(
        validated_annotation=validated_annotation,
        schema_id=schema_id,
        schema_version=schema_version,
        private=private,
        source=source,
        force=force,
    )

validate_manual_annotation

validate_manual_annotation(annotation, validator)

Validates a user-provided annotation payload against an optional validator.

Parameters:

Name Type Description Default
annotation BaseModel | dict[str, Any]

The annotation data payload (dict or Pydantic model).

required
validator Type[BaseModel] | JsonSchemaValidator | None

The validator to use (Pydantic class or JsonSchemaValidator instance).

required
file_hash

(Deprecated) The primary hash of the file being annotated. No longer injected into the record.

required

Returns:

Type Description
dict[str, Any]

The validated annotation as a dictionary.

Raises:

Type Description
AnnotationConfigurationError

If the annotation or validator type is unsupported.

AnnotationValidationError

If the annotation fails validation.

Source code in venv/lib/python3.13/site-packages/dorsal/file/file_annotator.py
def validate_manual_annotation(
    self,
    annotation: BaseModel | dict[str, Any],
    validator: Type[BaseModel] | JsonSchemaValidator | None,
) -> dict[str, Any]:
    """
    Validates a user-provided annotation payload against an optional validator.

    Args:
        annotation: The annotation data payload (dict or Pydantic model).
        validator: The validator to use (Pydantic class or JsonSchemaValidator instance).
        file_hash: (Deprecated) The primary hash of the file being annotated.
                   No longer injected into the record.

    Returns:
        The validated annotation as a dictionary.

    Raises:
        AnnotationConfigurationError: If the annotation or validator type is unsupported.
        AnnotationValidationError: If the annotation fails validation.
    """
    validator_type_name = type(validator).__name__ if validator else "None"
    logger.debug(
        "Validating manual annotation. Input type: %s, Validator type: %s.",
        type(annotation).__name__,
        validator_type_name,
    )

    if validator is None:
        if isinstance(annotation, BaseModel):
            return annotation.model_dump(by_alias=True, exclude_none=True)
        elif isinstance(annotation, dict):
            return annotation.copy()
        else:
            raise AnnotationConfigurationError(
                f"Unsupported annotation type for manual validation: {type(annotation).__name__}"
            )

    if not (is_pydantic_model_class(validator) or isinstance(validator, JsonSchemaValidator)):
        raise AnnotationConfigurationError(f"Unsupported validator type: {type(validator).__name__}")

    try:
        if isinstance(annotation, BaseModel):
            annotation_dict = annotation.model_dump(by_alias=True, exclude_none=True)
            if is_pydantic_model_class(validator) and validator.__name__ != annotation.__class__.__name__:
                logger.debug("Re-validating Pydantic model against different validator model.")
                validator.model_validate(annotation_dict)
            elif isinstance(validator, JsonSchemaValidator):
                logger.debug("Validating Pydantic model against JSON schema.")
                summary = json_schema_validate_records(records=[annotation_dict], validator=validator)
                if summary.get("valid_records") != 1:
                    raise AnnotationValidationError(
                        f"Schema validation failed: {summary.get('error_details')}",
                        validation_errors=summary.get("error_details"),
                    )
            return annotation_dict

        elif isinstance(annotation, dict):
            annotation_dict = annotation.copy()

            if is_pydantic_model_class(validator):
                logger.debug("Validating dict against Pydantic model.")
                validator.model_validate(annotation_dict)
            elif isinstance(validator, JsonSchemaValidator):
                logger.debug("Validating dict against JSON schema.")
                summary = json_schema_validate_records(records=[annotation_dict], validator=validator)
                if summary.get("valid_records") != 1:
                    raise AnnotationValidationError(
                        f"Schema validation failed: {summary.get('error_details')}",
                        validation_errors=summary.get("error_details"),
                    )
            return annotation_dict

        else:
            raise AnnotationConfigurationError(
                f"Unsupported annotation type for manual validation: {type(annotation).__name__}"
            )

    except PydanticValidationError as err:
        logger.debug("Pydantic validation failed for manual annotation.")
        raise AnnotationValidationError(
            "Manual annotation failed Pydantic validation.",
            validation_errors=err.errors(),
        ) from err
    except ValidationError as err:
        logger.debug("Schema validation failed for manual annotation.")
        raise err