Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion application_sdk/activities/common/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
needed by activities, such as statistics and configuration.
"""

from typing import List, Optional
from typing import Any, Dict, List, Optional

from pydantic import BaseModel

Expand Down Expand Up @@ -33,6 +33,8 @@ class ActivityStatistics(BaseModel):
"""

total_record_count: int = 0
total_file_size_bytes: int = 0
chunk_count: int = 0
partitions: Optional[List[int]] = []
typename: Optional[str] = None
chunks_info: Optional[List[Dict[str, Any]]] = []
1 change: 1 addition & 0 deletions application_sdk/activities/metadata_extraction/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,7 @@ async def transform_data(
output_prefix=output_prefix,
typename=typename,
chunk_start=workflow_args.get("chunk_start"),
chunk_size=int(os.getenv("EXP_CHUNK_SIZE", 100000)),
)
if state.transformer:
workflow_args["connection_name"] = workflow_args.get("connection", {}).get(
Expand Down
14 changes: 7 additions & 7 deletions application_sdk/activities/query_extraction/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from application_sdk.handlers import HandlerInterface
from application_sdk.handlers.sql import BaseSQLHandler
from application_sdk.inputs.sql_query import SQLQueryInput
from application_sdk.io.parquet import ParquetWriter
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.outputs.parquet import ParquetOutput
from application_sdk.services.objectstore import ObjectStore
from application_sdk.services.secretstore import SecretStore
from application_sdk.transformers import TransformerInterface
Expand Down Expand Up @@ -209,18 +209,18 @@ async def fetch_queries(
)
sql_input = await sql_input.get_dataframe()

raw_output = ParquetOutput(
output_prefix=workflow_args["output_prefix"],
output_path=workflow_args["output_path"],
output_suffix="raw/query",
raw_output = ParquetWriter(
output_path=os.path.join(workflow_args["output_path"], "raw/query"),
chunk_size=workflow_args["miner_args"].get("chunk_size", 100000),
start_marker=workflow_args["start_marker"],
end_marker=workflow_args["end_marker"],
typename="query",
)
await raw_output.write_dataframe(sql_input)
await raw_output.write(sql_input)
await raw_output.close()

logger.info(
f"Query fetch completed, {raw_output.total_record_count} records processed",
f"Query fetch completed, {raw_output.total_record_count} records processed"
)

except Exception as e:
Expand Down
78 changes: 63 additions & 15 deletions application_sdk/inputs/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(
"""
self.path = path
self.chunk_size = chunk_size
self.buffer_size = 5000
self.input_prefix = input_prefix
self.file_names = file_names

Expand Down Expand Up @@ -92,6 +93,10 @@ async def get_dataframe(self) -> "pd.DataFrame":
path = self.path
if self.input_prefix and self.path:
path = await self.download_files(self.path)

if not path:
raise ValueError("No path specified for parquet input")

# Use pandas native read_parquet which can handle both single files and directories
return pd.read_parquet(path)
except Exception as e:
Expand All @@ -115,6 +120,10 @@ async def get_batched_dataframe(
path = self.path
if self.input_prefix and self.path:
path = await self.download_files(self.path)

if not path:
raise ValueError("No path specified for parquet input")

df = pd.read_parquet(path)
if self.chunk_size:
for i in range(0, len(df), self.chunk_size):
Expand Down Expand Up @@ -144,6 +153,7 @@ async def get_daft_dataframe(self) -> "daft.DataFrame": # noqa: F821
path = self.path
if self.input_prefix and path:
await self.download_files(path)

return daft.read_parquet(f"{path}/*.parquet")
except Exception as e:
logger.error(
Expand All @@ -152,30 +162,68 @@ async def get_daft_dataframe(self) -> "daft.DataFrame": # noqa: F821
# Re-raise to match IcebergInput behavior
raise

async def get_batched_daft_dataframe(self) -> AsyncIterator["daft.DataFrame"]: # type: ignore
async def get_batched_daft_dataframe(
self,
) -> Union[AsyncIterator["daft.DataFrame"], Iterator["daft.DataFrame"]]:
"""
Get batched daft dataframe from parquet file(s)
Get batched daft dataframe with lazy loading for memory efficiency.

Returns:
AsyncIterator[daft.DataFrame]: An async iterator of daft DataFrames, each containing
a batch of data from the parquet file(s).
This method uses lazy loading to process chunks without loading the entire
dataframe into memory first, making it suitable for large datasets.

Yields:
daft.DataFrame: Individual chunks of the dataframe, each containing
up to buffer_size rows.
"""
try:
import daft # type: ignore

if self.file_names:
for file_name in self.file_names:
path = f"{self.path}/{file_name}"
if self.input_prefix and path:
await self.download_files(path)
yield daft.read_parquet(path)
else:
if self.path and self.input_prefix:
await self.download_files(self.path)
yield daft.read_parquet(f"{self.path}/*.parquet")
# Prepare the file path(s) for reading
file_paths = await self._prepare_file_paths()

# Create a lazy dataframe without loading data into memory
lazy_df = daft.read_parquet(file_paths)

# Get total count efficiently
total_rows = lazy_df.count_rows()

# Yield chunks without loading everything into memory
for offset in range(0, total_rows, self.buffer_size):
chunk = lazy_df.offset(offset).limit(self.buffer_size)
yield chunk

del lazy_df
except Exception as error:
logger.error(
f"Error reading data from parquet file(s) in batches using daft: {error}"
)
raise

async def _prepare_file_paths(self) -> Union[str, List[str]]:
"""
Helper method to prepare file paths for reading.

Handles both single files and multiple files, with optional object store downloads.

Returns:
Union[str, List[str]]: File path(s) ready for daft.read_parquet()

Raises:
ValueError: If no valid file paths can be prepared
"""
if self.file_names:
all_files: List[str] = []
for file_name in self.file_names:
path = f"{self.path}/{file_name}"
if self.input_prefix:
await self.download_files(path)
all_files.append(path)
return all_files
else:
if not self.path:
raise ValueError("No path specified for parquet input")

if self.input_prefix:
await self.download_files(self.path)

return f"{self.path}/*.parquet"
4 changes: 2 additions & 2 deletions application_sdk/inputs/sql_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ def __init__(
self,
query: str,
engine: Union["Engine", str],
chunk_size: Optional[int] = 100000,
chunk_size: Optional[int] = 5000,
):
"""Initialize the async SQL query input handler.

Args:
engine (Union[Engine, str]): SQLAlchemy engine or connection string.
query (str): The SQL query to execute.
chunk_size (Optional[int], optional): Number of rows per batch.
Defaults to 100000.
Defaults to 5000.
"""
self.query = query
self.engine = engine
Expand Down
1 change: 0 additions & 1 deletion application_sdk/interceptors/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
retry_policy=RetryPolicy(
maximum_attempts=3,
),
summary="This activity is used to cleanup the local artifacts and the activity state after the workflow is completed.",
)

logger.info("Cleanup completed successfully")
Expand Down
Loading
Loading