Skip to content

ModelRunner

dorsal.file.model_runner.ModelRunner

ModelRunner(
    pipeline_config="default", debug=False, testing=False
)

Initializes the ModelRunner.

Parameters:

Name Type Description Default
pipeline_config str | list[dict[str, Any]] | None

Configuration for models to run after the base model. - str: "default", uses the default built-in pipeline. - str: Any string other than "default" is a file path to a JSON file containing the pipeline configuration list. - list: A direct list of configuration dictionaries. An empty list [] means only the base model will be run with its effective options. - None: No pipeline will be set up. Useful for when you only need

'default'
debug bool

If True, enables timing of model executions.

False
testing bool

If True, supresses warning about the pipeline_config being set to None

False
Source code in venv/lib/python3.13/site-packages/dorsal/file/model_runner.py
def __init__(
    self, pipeline_config: str | list[dict[str, Any]] | None = "default", debug: bool = False, testing: bool = False
):
    """
    Initializes the ModelRunner.

    Args:
        pipeline_config: Configuration for models to run after the base model.
            - str: "default", uses the default built-in pipeline.
            - str: Any string other than "default" is a file path to a JSON file containing the pipeline configuration list.
            - list: A direct list of configuration dictionaries. An empty list [] means
                    only the base model will be run with its effective options.
            - None: No pipeline will be set up. Useful for when you only need
        debug: If True, enables timing of model executions.
        testing: If True, supresses warning about the pipeline_config being set to None
    """
    from dorsal.common.config import load_config

    self.pipeline: "list[ModelRunnerPipelineStep]" = []
    self.pipeline_config_source: str
    if pipeline_config is None:
        self.pipeline_config_source = "No pipeline"
    elif pipeline_config == "default":
        _, config_path = load_config()
        self.pipeline_config_source = f"dorsal.toml {config_path}"
    elif isinstance(pipeline_config, str):
        self.pipeline_config_source = f"JSON Path: {pipeline_config}"
    elif isinstance(pipeline_config, list):
        self.pipeline_config_source = f"Custom {len(pipeline_config)} step pipeline"
    else:
        raise ModelRunnerConfigError(f"Invalid pipeline_config type: {type(pipeline_config).__name__}")

    self.debug = debug
    self.time_taken: dict[str, float] = {}

    self.pre_model = self._load_pre_pipeline_model_step()

    if pipeline_config is None:
        if not testing:
            logger.warning(
                "Pipeline is being executed with pipeline_config set to `None` - this means any config will be ignored."
            )
    else:
        self.pipeline = self._load_raw_pipeline_config_steps(config=pipeline_config)

    self._log_warnings_for_duplicate_models(self.pipeline)

    self.pre_model_options: dict[str, Any] | None = self.pre_model.options

    if self.pipeline and self._is_matching_model_step(
        self.pipeline[0],
        self.pre_model.annotation_model.module,
        self.pre_model.annotation_model.name,
    ):
        logger.debug(
            "First step of the loaded pipeline matches the base model definition. "
            "Using options from this step for the base model execution."
        )
        self.pre_model_options = self.pipeline[0].options
        self.pipeline = self.pipeline[1:]
        logger.debug(
            "Base model options have been updated. The effective pipeline to run will start "
            "from the second step of the loaded configuration."
        )
    else:
        logger.debug(
            "Using default options for base model execution. The effective pipeline to run will "
            "consist of all loaded steps (if any)."
        )

    logger.debug(
        "ModelRunner initialized. Debug mode: %s. Pipeline models to run: %d. Config source: '%s'",
        "ON" if self.debug else "OFF",
        len(self.pipeline),
        self.pipeline_config_source,
    )

run_single_model

run_single_model(
    annotation_model,
    validation_model,
    file_path,
    base_model_result=None,
    schema_id=None,
    options=None,
    ignore_linter_errors=False,
)

Runs a single annotation model and validates its output.

This is the core execution unit for all models in the pipeline, including the base model.

  • Populates Model: Injects base file info (size, media type, etc.) into the model instance before running.
  • Runs Model: Calls the model's .main() method.
  • Validates Output: (Optional) Validates the output against the provided Pydantic or JSON Schema validator.

Parameters:

Name Type Description Default
annotation_model Type[AnnotationModel]

The AnnotationModel class to execute.

required
validation_model Type[BaseModel] | JsonSchemaValidator | None

The Pydantic model or JsonSchemaValidator to validate the output against.

required
file_path str

The absolute path to the file being processed.

required
base_model_result 'RunModelResult' | None

The result from the initial FileCoreAnnotationModel. This is None only when running the base model itself.

None
schema_id str | None

The target schema ID for this annotation.

None
options dict[str, Any] | None

A dictionary of options to pass to the model's .main() method.

None

Returns:

Type Description
'RunModelResult'

A RunModelResult object containing the model's output or an error.

Source code in venv/lib/python3.13/site-packages/dorsal/file/model_runner.py
@model_debug_timer
def run_single_model(
    self,
    annotation_model: Type[AnnotationModel],
    validation_model: Type[BaseModel] | JsonSchemaValidator | None,
    file_path: str,
    base_model_result: "RunModelResult" | None = None,
    schema_id: str | None = None,
    options: dict[str, Any] | None = None,
    ignore_linter_errors: bool = False,
) -> "RunModelResult":
    """
    Runs a single annotation model and validates its output.

    This is the core execution unit for all models in the pipeline,
    including the base model.

    - **Populates Model:** Injects base file info (size, media type, etc.)
      into the model instance before running.
    - **Runs Model:** Calls the model's `.main()` method.
    - **Validates Output:** (Optional) Validates the output
      against the provided Pydantic or JSON Schema validator.

    Args:
        annotation_model: The AnnotationModel class to execute.
        validation_model: The Pydantic model or JsonSchemaValidator to
                          validate the output against.
        file_path: The absolute path to the file being processed.
        base_model_result: The result from the initial FileCoreAnnotationModel.
                           This is `None` only when running the base model itself.
        schema_id: The target schema ID for this annotation.
        options: A dictionary of options to pass to the model's `.main()` method.

    Returns:
        A RunModelResult object containing the model's output or an error.
    """
    from dorsal.file.configs.model_runner import RunModelResult

    model_name = annotation_model.__name__
    result_data: dict[str, Any] = {
        "name": model_name,
        "source": {
            "type": "Model",
            "id": annotation_model.id,
            "variant": None,
            "version": annotation_model.version,
        },
        "record": None,
        "schema_id": schema_id,
        "schema_version": None,
        "error": None,
    }
    raw_model_output: dict[str, Any] | None = None
    validated_data: dict[str, Any] | None = None
    validation_error_payload: list[dict[str, Any]] | str | None = None

    try:
        logger.debug("Instantiating model '%s' for file '%s'", model_name, file_path)
        annotation_model_instance = annotation_model(file_path=file_path)  # type: ignore[call-arg]

        if base_model_result and base_model_result.record:
            for key, value in base_model_result.record.items():
                if key == "all_hashes":
                    continue
                setattr(annotation_model_instance, key, value)

        logger.debug(
            "Running model '%s' main method with options: %s",
            model_name,
            options or "None",
        )

        raw_model_output = (
            annotation_model_instance.main(**options) if options else annotation_model_instance.main()
        )

        result_data["source"]["variant"] = annotation_model_instance.variant

        if raw_model_output is None:
            error_msg_from_instance = getattr(annotation_model_instance, "error", None)
            error_msg = (
                error_msg_from_instance
                if error_msg_from_instance
                else f"Model '{model_name}' returned None without specific error."
            )
            logger.warning(
                "Model '%s' returned None for file '%s'. Detail: %s",
                model_name,
                file_path,
                error_msg,
            )
            result_data["error"] = error_msg
            return RunModelResult(**result_data)

        validator_display_name = "None"
        if validation_model is not None:
            if inspect.isclass(validation_model):
                validator_display_name = validation_model.__name__
            else:
                validator_display_name = type(validation_model).__name__

        logger.debug(
            "Attempting to validate output of model '%s' with validator '%s'...",
            model_name,
            validator_display_name,
        )

        if validation_model is None:
            logger.debug(
                "No validator provided for model '%s', file '%s'. Skipping output validation.",
                model_name,
                file_path,
            )
            validated_data = raw_model_output
        elif isinstance(validation_model, JsonSchemaValidator):
            result_data["schema_version"] = validation_model.schema.get("version")
            validated_data, validation_error_payload = self._json_schema_validate_raw_annotation_model_output(
                raw_model_output=raw_model_output,
                schema_validator_instance=validation_model,
                annotator_model_name=model_name,
                file_path=file_path,
            )
        elif inspect.isclass(validation_model) and issubclass(validation_model, BaseModel):
            validated_data, validation_error_payload = self._pydantic_validate_raw_annotation_model_output(
                raw_model_output=raw_model_output,
                pydantic_class=validation_model,
                annotator_model_name=model_name,
                file_path=file_path,
            )
        else:
            config_err_msg = (
                f"Unsupported validator type '{type(validation_model).__name__}' provided for model '{model_name}'."
            )
            logger.error(config_err_msg + " This indicates a misconfiguration in the pipeline step.")
            raise ModelRunnerConfigError(config_err_msg)

        if validation_error_payload is not None:
            val_error = ModelOutputValidationError(
                model_name=model_name,
                validator_name=validator_display_name,
                errors=cast(list[Any], validation_error_payload),
                original_exception=None,
            )

            first_error_msg = ""
            if isinstance(validation_error_payload, list) and validation_error_payload:
                first_error = validation_error_payload[0]
                if isinstance(first_error, dict):
                    if "error_message" in first_error:
                        first_error_msg = first_error["error_message"]
                    elif "msg" in first_error:
                        first_error_msg = first_error["msg"]

            base_error_str = str(val_error)
            if first_error_msg:
                result_data["error"] = f"{base_error_str}: {first_error_msg}"
            else:
                result_data["error"] = base_error_str
        elif validated_data is None and validation_model is not None:
            internal_err_msg = (
                f"Validation failed for model '{model_name}' with validator '{validator_display_name}', "
                "but no specific error details were captured by helpers."
            )
            logger.error(internal_err_msg + " This may indicate an issue in the validation helper logic.")
            result_data["error"] = str(
                ModelExecutionError(
                    model_name=model_name,
                    file_path=file_path,
                    original_exception=Exception(internal_err_msg),
                )
            )
        else:
            if validated_data is not None:
                try:
                    raise_on_linter_error = not ignore_linter_errors

                    apply_linter(schema_id=schema_id, record=validated_data, raise_on_error=raise_on_linter_error)

                except DataQualityError as e:
                    logger.warning(
                        "Model '%s' output for file '%s' passed schema validation "
                        "but FAILED data quality linting. Error: %s",
                        model_name,
                        file_path,
                        e,
                    )
                    result_data["error"] = str(e)
                    return RunModelResult(**result_data)

            result_data["record"] = validated_data
            logger.debug(
                "Model '%s' successfully executed for file '%s'. Output %s.",
                model_name,
                file_path,
                ("validated" if validation_model is not None else "accepted (no validation)"),
            )

    except ModelRunnerConfigError as err:
        logger.debug("Propagating ModelRunnerConfigError from run_single_model: %s", err)
        raise
    except PydanticValidationError as err:
        error_payload = err.errors()
        logger.exception(
            "PydanticValidationError encountered, likely during RunModelResult creation for model '%s'.",
            model_name,
        )
        val_name_for_err = "RunModelResultInternal"
        if "validator_display_name" in locals() and validation_model:
            val_name_for_err = validator_display_name
        elif validation_model:
            val_name_for_err = type(validation_model).__name__

        val_error = ModelOutputValidationError(
            model_name=model_name,
            validator_name=val_name_for_err,
            errors=error_payload,
            original_exception=err,
        )
        result_data["error"] = str(val_error)

    except Exception as err:
        logger.exception(
            "Unexpected error during execution or instantiation of model '%s' for file '%s'.",
            model_name,
            file_path,
        )
        exec_error = ModelExecutionError(model_name=model_name, file_path=file_path, original_exception=err)
        result_data["error"] = str(exec_error)

    return RunModelResult(**result_data)

dorsal.file.model_runner.model_debug_timer

model_debug_timer(method)

Decorates method - provides execution time. - Stores execution time of each model in time_taken instance mapping - Requires: self.debug = True on ModelRunner instance

Source code in venv/lib/python3.13/site-packages/dorsal/file/model_runner.py
def model_debug_timer(method):
    """Decorates method - provides execution time.
    - Stores execution time of each model in `time_taken` instance mapping
    - Requires: `self.debug = True` on ModelRunner instance

    """

    @wraps(method)
    def _impl(instance, annotation_model: AnnotationModel, *method_args, **method_kwargs):
        if instance.debug:
            model_name = getattr(annotation_model, "__name__", "unknown")
            start_time = time.perf_counter()
            result: RunModelResult = method(instance, annotation_model, *method_args, **method_kwargs)
            end_time = time.perf_counter()
            instance.time_taken[model_name] = end_time - start_time
            result.time_taken = instance.time_taken.get(model_name)
            return result
        return method(instance, annotation_model, *method_args, **method_kwargs)

    return _impl