Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/datapilot/clients/altimate/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Constants for the Altimate API client."""

# Supported dbt artifact file types for onboarding
SUPPORTED_ARTIFACT_TYPES = {
"manifest",
"catalog",
"run_results",
"sources",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add semantic manifest, we can already collect it, we should not validate it.

}
22 changes: 22 additions & 0 deletions src/datapilot/clients/altimate/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from requests import Response

from datapilot.clients.altimate.client import APIClient
from datapilot.clients.altimate.constants import SUPPORTED_ARTIFACT_TYPES


def check_token_and_instance(
Expand Down Expand Up @@ -56,6 +57,27 @@ def validate_permissions(


def onboard_file(api_token, tenant, dbt_core_integration_id, dbt_core_integration_environment, file_type, file_path, backend_url) -> Dict:
"""
Upload a dbt artifact file to the Altimate backend.

Args:
api_token: API authentication token
tenant: Tenant/instance name
dbt_core_integration_id: ID of the dbt integration
dbt_core_integration_environment: Environment type (e.g., PROD)
file_type: Type of artifact - one of: manifest, catalog, run_results, sources, semantic_manifest
file_path: Path to the artifact file
backend_url: URL of the Altimate backend

Returns:
Dict with 'ok' boolean and optional 'message' on failure
"""
Comment on lines +60 to +74
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation added for the onboard_file function is helpful and clear. However, it should note that validation is the caller's responsibility. Currently, semantic_manifest files are not validated before being uploaded, which differs from other artifact types.

Copilot uses AI. Check for mistakes.
if file_type not in SUPPORTED_ARTIFACT_TYPES:
return {
"ok": False,
"message": f"Unsupported file type: {file_type}. Supported types: {', '.join(sorted(SUPPORTED_ARTIFACT_TYPES))}",
}

api_client = APIClient(api_token, base_url=backend_url, tenant=tenant)

params = {
Expand Down
71 changes: 62 additions & 9 deletions src/datapilot/core/platforms/dbt/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from datapilot.core.platforms.dbt.formatting import generate_project_insights_table
from datapilot.core.platforms.dbt.utils import load_catalog
from datapilot.core.platforms.dbt.utils import load_manifest
from datapilot.core.platforms.dbt.utils import load_run_results
from datapilot.core.platforms.dbt.utils import load_sources
from datapilot.utils.formatting.utils import tabulate_data
from datapilot.utils.utils import map_url_to_instance

Expand Down Expand Up @@ -155,6 +157,8 @@ def project_health(
)
@click.option("--manifest-path", required=True, prompt="Manifest Path", help="Path to the manifest file.")
@click.option("--catalog-path", required=False, prompt=False, help="Path to the catalog file.")
@click.option("--run-results-path", required=False, prompt=False, help="Path to the run_results.json file.")
@click.option("--sources-path", required=False, prompt=False, help="Path to the sources.json file (source freshness results).")
def onboard(
token,
instance_name,
Expand All @@ -164,6 +168,8 @@ def onboard(
dbt_integration_environment,
manifest_path,
catalog_path,
run_results_path,
sources_path,
):
"""Onboard a manifest file to DBT. You can specify either --dbt_integration_id or --dbt_integration_name."""

Expand Down Expand Up @@ -198,34 +204,81 @@ def onboard(
elif dbt_integration_name and dbt_integration_id:
click.echo("Warning: Both integration ID and name provided. Using ID and ignoring name.")

# Validate manifest (required)
try:
load_manifest(manifest_path)
except Exception as e:
click.echo(f"Error: {e}")
return

# Validate optional artifacts if provided
if catalog_path:
try:
load_catalog(catalog_path)
except Exception as e:
click.echo(f"Error validating catalog: {e}")
return

if run_results_path:
try:
load_run_results(run_results_path)
except Exception as e:
click.echo(f"Error validating run_results: {e}")
return

if sources_path:
try:
load_sources(sources_path)
except Exception as e:
click.echo(f"Error validating sources: {e}")
return

# Onboard manifest (required)
response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "manifest", manifest_path, backend_url)
if response["ok"]:
click.echo("Manifest onboarded successfully!")
else:
click.echo(f"{response['message']}")

if not catalog_path:
return

response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "catalog", catalog_path, backend_url)
if response["ok"]:
click.echo("Catalog onboarded successfully!")
else:
click.echo(f"{response['message']}")
# Onboard optional artifacts
artifacts_uploaded = ["manifest"]

if catalog_path:
response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "catalog", catalog_path, backend_url)
if response["ok"]:
click.echo("Catalog onboarded successfully!")
artifacts_uploaded.append("catalog")
else:
click.echo(f"{response['message']}")

if run_results_path:
response = onboard_file(
token, instance_name, dbt_integration_id, dbt_integration_environment, "run_results", run_results_path, backend_url
)
if response["ok"]:
click.echo("Run results onboarded successfully!")
artifacts_uploaded.append("run_results")
else:
click.echo(f"{response['message']}")

if sources_path:
response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "sources", sources_path, backend_url)
if response["ok"]:
click.echo("Sources onboarded successfully!")
artifacts_uploaded.append("sources")
else:
click.echo(f"{response['message']}")

# Start ingestion
response = start_dbt_ingestion(token, instance_name, dbt_integration_id, dbt_integration_environment, backend_url)
if response["ok"]:
url = map_url_to_instance(backend_url, instance_name)
artifacts_str = ", ".join(artifacts_uploaded)
if not url:
click.echo("Manifest and catalog ingestion has started.")
click.echo(f"Ingestion has started for: {artifacts_str}")
else:
url = f"{url}/settings/integrations/{dbt_integration_id}/{dbt_integration_environment}"
click.echo(f"Manifest and catalog ingestion has started. You can check the status at {url}")
click.echo(f"Ingestion has started for: {artifacts_str}. You can check the status at {url}")
else:
click.echo(f"{response['message']}")
44 changes: 44 additions & 0 deletions src/datapilot/core/platforms/dbt/schemas/run_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Union

from pydantic import ConfigDict

from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v1 import RunResultsV1 as BaseRunResultsV1
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v2 import RunResultsV2 as BaseRunResultsV2
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v3 import RunResultsV3 as BaseRunResultsV3
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v4 import RunResultsV4 as BaseRunResultsV4
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v5 import RunResultsV5 as BaseRunResultsV5
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v6 import RunResultsV6 as BaseRunResultsV6


class RunResultsV1(BaseRunResultsV1):
model_config = ConfigDict(extra="allow")


class RunResultsV2(BaseRunResultsV2):
model_config = ConfigDict(extra="allow")


class RunResultsV3(BaseRunResultsV3):
model_config = ConfigDict(extra="allow")


class RunResultsV4(BaseRunResultsV4):
model_config = ConfigDict(extra="allow")


class RunResultsV5(BaseRunResultsV5):
model_config = ConfigDict(extra="allow")


class RunResultsV6(BaseRunResultsV6):
model_config = ConfigDict(extra="allow")
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The model_config = ConfigDict(extra="allow") is redundant here since the base classes (BaseRunResultsV1 through BaseRunResultsV6) already have this configuration set. While this doesn't cause issues, it adds unnecessary repetition. Consider removing these redundant configurations unless there's a specific need to override them.

Copilot uses AI. Check for mistakes.


RunResults = Union[
RunResultsV6,
RunResultsV5,
RunResultsV4,
RunResultsV3,
RunResultsV2,
RunResultsV1,
]
26 changes: 26 additions & 0 deletions src/datapilot/core/platforms/dbt/schemas/sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Union

from pydantic import ConfigDict

from vendor.dbt_artifacts_parser.parsers.sources.sources_v1 import SourcesV1 as BaseSourcesV1
from vendor.dbt_artifacts_parser.parsers.sources.sources_v2 import SourcesV2 as BaseSourcesV2
from vendor.dbt_artifacts_parser.parsers.sources.sources_v3 import SourcesV3 as BaseSourcesV3


class SourcesV1(BaseSourcesV1):
model_config = ConfigDict(extra="allow")


class SourcesV2(BaseSourcesV2):
model_config = ConfigDict(extra="allow")


class SourcesV3(BaseSourcesV3):
model_config = ConfigDict(extra="allow")
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The model_config = ConfigDict(extra="allow") is redundant here since the base classes (BaseSourcesV1 through BaseSourcesV3) already have this configuration set. While this doesn't cause issues, it adds unnecessary repetition. Consider removing these redundant configurations unless there's a specific need to override them.

Suggested change
from pydantic import ConfigDict
from vendor.dbt_artifacts_parser.parsers.sources.sources_v1 import SourcesV1 as BaseSourcesV1
from vendor.dbt_artifacts_parser.parsers.sources.sources_v2 import SourcesV2 as BaseSourcesV2
from vendor.dbt_artifacts_parser.parsers.sources.sources_v3 import SourcesV3 as BaseSourcesV3
class SourcesV1(BaseSourcesV1):
model_config = ConfigDict(extra="allow")
class SourcesV2(BaseSourcesV2):
model_config = ConfigDict(extra="allow")
class SourcesV3(BaseSourcesV3):
model_config = ConfigDict(extra="allow")
from vendor.dbt_artifacts_parser.parsers.sources.sources_v1 import SourcesV1 as BaseSourcesV1
from vendor.dbt_artifacts_parser.parsers.sources.sources_v2 import SourcesV2 as BaseSourcesV2
from vendor.dbt_artifacts_parser.parsers.sources.sources_v3 import SourcesV3 as BaseSourcesV3
class SourcesV1(BaseSourcesV1):
...
class SourcesV2(BaseSourcesV2):
...
class SourcesV3(BaseSourcesV3):
...

Copilot uses AI. Check for mistakes.


Sources = Union[
SourcesV3,
SourcesV2,
SourcesV1,
]
36 changes: 34 additions & 2 deletions src/datapilot/core/platforms/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
from datapilot.core.platforms.dbt.schemas.manifest import AltimateManifestSourceNode
from datapilot.core.platforms.dbt.schemas.manifest import AltimateManifestTestNode
from datapilot.core.platforms.dbt.schemas.manifest import Manifest
from datapilot.core.platforms.dbt.schemas.run_results import RunResults
from datapilot.core.platforms.dbt.schemas.sources import Sources
from datapilot.exceptions.exceptions import AltimateFileNotFoundError
from datapilot.exceptions.exceptions import AltimateInvalidJSONError
from datapilot.utils.utils import extract_dir_name_from_file_path
from datapilot.utils.utils import extract_folders_in_path
from datapilot.utils.utils import is_superset_path
from datapilot.utils.utils import load_json
from vendor.dbt_artifacts_parser.parser import parse_manifest
from vendor.dbt_artifacts_parser.parser import parse_run_results
from vendor.dbt_artifacts_parser.parser import parse_sources

MODEL_TYPE_PATTERNS = {
STAGING: r"^stg_.*", # Example: models starting with 'stg_'
Expand Down Expand Up @@ -94,8 +98,36 @@ def load_catalog(catalog_path: str) -> Catalog:
return catalog


def load_run_results(run_results_path: str) -> Manifest:
raise NotImplementedError
def load_run_results(run_results_path: str) -> RunResults:
try:
run_results_dict = load_json(run_results_path)
except FileNotFoundError as e:
raise AltimateFileNotFoundError(f"Run results file not found: {run_results_path}. Error: {e}") from e
except ValueError as e:
raise AltimateInvalidJSONError(f"Invalid JSON file: {run_results_path}. Error: {e}") from e

try:
run_results: RunResults = parse_run_results(run_results_dict)
except ValueError as e:
raise AltimateInvalidManifestError(f"Invalid run results file: {run_results_path}. Error: {e}") from e
Comment on lines +110 to +112

This comment was marked as outdated.

Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception type AltimateInvalidManifestError is misleading when used for non-manifest files like run_results. Consider creating a more generic exception type such as AltimateInvalidArtifactError or using specific exceptions for each artifact type.

Copilot uses AI. Check for mistakes.

return run_results


def load_sources(sources_path: str) -> Sources:
try:
sources_dict = load_json(sources_path)
except FileNotFoundError as e:
raise AltimateFileNotFoundError(f"Sources file not found: {sources_path}. Error: {e}") from e
except ValueError as e:
raise AltimateInvalidJSONError(f"Invalid JSON file: {sources_path}. Error: {e}") from e

try:
sources: Sources = parse_sources(sources_dict)
except ValueError as e:
raise AltimateInvalidManifestError(f"Invalid sources file: {sources_path}. Error: {e}") from e
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception type AltimateInvalidManifestError is misleading when used for non-manifest files like sources. Consider creating a more generic exception type such as AltimateInvalidArtifactError or using specific exceptions for each artifact type.

Copilot uses AI. Check for mistakes.

return sources


# TODO: Add tests!
Expand Down
Empty file added tests/clients/__init__.py
Empty file.
Empty file.
26 changes: 26 additions & 0 deletions tests/clients/altimate/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from datapilot.clients.altimate.constants import SUPPORTED_ARTIFACT_TYPES
from datapilot.clients.altimate.utils import onboard_file


class TestOnboardFile:
def test_supported_artifact_types(self):
"""Test that all expected artifact types are supported."""
expected_types = {"manifest", "catalog", "run_results", "sources"}
assert SUPPORTED_ARTIFACT_TYPES == expected_types

def test_unsupported_file_type_returns_error(self):
"""Test that unsupported file types return an error without making API calls."""
test_token = "test_token" # noqa: S105
result = onboard_file(
api_token=test_token,
tenant="test_tenant",
dbt_core_integration_id="test_id",
dbt_core_integration_environment="PROD",
file_type="unsupported_type",
file_path="test_path.json",
backend_url="http://localhost",
)

assert result["ok"] is False
assert "Unsupported file type" in result["message"]
assert "unsupported_type" in result["message"]
36 changes: 36 additions & 0 deletions tests/core/platform/dbt/test_artifact_loaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import pytest

from datapilot.core.platforms.dbt.utils import load_run_results
from datapilot.core.platforms.dbt.utils import load_sources
from datapilot.exceptions.exceptions import AltimateFileNotFoundError


class TestLoadRunResults:
def test_load_run_results_v6(self):
run_results_path = "tests/data/run_results_v6.json"
run_results = load_run_results(run_results_path)

assert run_results is not None
assert run_results.metadata.dbt_schema_version == "https://schemas.getdbt.com/dbt/run-results/v6.json"
assert len(run_results.results) == 1
assert run_results.results[0].status.value == "success"
assert run_results.results[0].unique_id == "model.jaffle_shop.stg_customers"

def test_load_run_results_file_not_found(self):
with pytest.raises(AltimateFileNotFoundError):
load_run_results("nonexistent_file.json")


class TestLoadSources:
def test_load_sources_v3(self):
sources_path = "tests/data/sources_v3.json"
sources = load_sources(sources_path)

assert sources is not None
assert sources.metadata.dbt_schema_version == "https://schemas.getdbt.com/dbt/sources/v3.json"
assert len(sources.results) == 1
assert sources.results[0].unique_id == "source.jaffle_shop.raw.customers"

def test_load_sources_file_not_found(self):
with pytest.raises(AltimateFileNotFoundError):
load_sources("nonexistent_file.json")
13 changes: 13 additions & 0 deletions tests/core/platform/dbt/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,16 @@ def test_project_health_with_required_and_optional_args_v12():
# Add more assertions here to validate the behavior of your command,
# for example, checking that the output contains expected text.
assert "-----------" in result.output


def test_onboard_help_shows_all_artifact_options():
"""Test that the onboard command shows all artifact options in help."""
runner = CliRunner()

result = runner.invoke(datapilot, ["dbt", "onboard", "--help"])

assert result.exit_code == 0
assert "--manifest-path" in result.output
assert "--catalog-path" in result.output
assert "--run-results-path" in result.output
assert "--sources-path" in result.output
Loading