Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion application_sdk/activities/metadata_extraction/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ async def transform_data(
dataframe=dataframe, **workflow_args
)
await transformed_output.write_daft_dataframe(transform_metadata)
return await transformed_output.get_statistics()
return await transformed_output.get_statistics(typename=typename)

@activity.defn
@auto_heartbeater
Expand Down
1 change: 1 addition & 0 deletions application_sdk/decorators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

42 changes: 35 additions & 7 deletions application_sdk/outputs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from temporalio import activity

from application_sdk.activities.common.models import ActivityStatistics
from application_sdk.activities.common.utils import get_object_store_prefix
from application_sdk.activities.common.utils import get_object_store_prefix, build_output_path
from application_sdk.common.dataframe_utils import is_empty_dataframe
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.observability.metrics_adaptor import MetricType
Expand All @@ -34,6 +34,7 @@
logger = get_logger(__name__)
activity.logger = logger


if TYPE_CHECKING:
import daft # type: ignore
import pandas as pd
Expand Down Expand Up @@ -71,6 +72,19 @@ class Output(ABC):
current_buffer_size_bytes: int
partitions: List[int]

def _infer_phase_from_path(self) -> Optional[str]:
"""Infer phase from output path by checking for raw/transformed directories.

Returns:
Optional[str]: "Extract" for raw, "Transform" for transformed, else None.
"""
path_parts = str(self.output_path).split("/")
if "raw" in path_parts:
return "Extract"
if "transformed" in path_parts:
return "Transform"
return None

def estimate_dataframe_record_size(self, dataframe: "pd.DataFrame") -> int:
"""Estimate File size of a DataFrame by sampling a few records."""
if len(dataframe) == 0:
Expand Down Expand Up @@ -330,7 +344,7 @@ async def get_statistics(
Exception: If there's an error writing the statistics
"""
try:
statistics = await self.write_statistics()
statistics = await self.write_statistics(typename)
if not statistics:
raise ValueError("No statistics data available")
statistics = ActivityStatistics.model_validate(statistics)
Expand Down Expand Up @@ -390,7 +404,7 @@ async def _flush_buffer(self, chunk: "pd.DataFrame", chunk_part: int):
logger.error(f"Error flushing buffer to files: {str(e)}")
raise e

async def write_statistics(self) -> Optional[Dict[str, Any]]:
async def write_statistics(self, typename: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Write statistics about the output to a JSON file.

This method writes statistics including total record count and chunk count
Expand All @@ -407,17 +421,31 @@ async def write_statistics(self) -> Optional[Dict[str, Any]]:
"partitions": self.partitions,
}

# Write the statistics to a json file
output_file_name = f"{self.output_path}/statistics.json.ignore"
with open(output_file_name, "w") as f:
f.write(orjson.dumps(statistics).decode("utf-8"))
# Ensure typename is included in the statistics payload (if provided)
if typename:
statistics["typename"] = typename

# Write the statistics to a json file inside a dedicated statistics/ folder
statistics_dir = os.path.join(self.output_path, "statistics")
os.makedirs(statistics_dir, exist_ok=True)
output_file_name = f"{statistics_dir}/statistics.json.ignore"
# If chunk_start is provided, include it in the statistics filename
try:
cs = getattr(self, "chunk_start", None)
if cs is not None:
output_file_name = f"{statistics_dir}/statistics-chunk-{cs}.json.ignore"
except Exception:
# If accessing chunk_start fails, fallback to default filename
pass

destination_file_path = get_object_store_prefix(output_file_name)
# Push the file to the object store
await ObjectStore.upload_file(
source=output_file_name,
destination=destination_file_path,
)

return statistics
except Exception as e:
logger.error(f"Error writing statistics: {str(e)}")

2 changes: 2 additions & 0 deletions tests/unit/outputs/test_output.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Unit tests for output interface."""

from typing import Any
import json
from unittest.mock import AsyncMock, mock_open, patch

import pandas as pd
Expand Down Expand Up @@ -161,3 +162,4 @@ async def test_write_statistics_error(self):
assert result is None
mock_logger.assert_called_once()
assert "Error writing statistics" in mock_logger.call_args[0][0]

Loading