Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming transformer: parquet files should have identical schema within a batch #1197

Open
istreeter opened this issue Mar 1, 2023 · 0 comments

Comments

@istreeter
Copy link
Contributor

istreeter commented Mar 1, 2023

Currently, a single batch of parquet files can contain parquet files with different columns. For example, if there are 1000 events per file, but context_1 is not seen until the 1001th event, then the first parquet file is missing the context_1 column.

For Databricks loading, we account for this by setting the MERGESCHEMA format option. This works, but it is not ideal because it is slightly inefficient.

When we add BigQuery into RDB loader, then this is going to be a bigger problem. For BigQuery, the load statement looks something like:

LOAD DATA INTO atomic.events
FROM FILES(
  format='PARQUET',
  uris = ['gs://bucket/path/to/batch'],
  enable_list_inference = true
)

This load is successful, but when you query the table you find it is missing data for columns that were not present in every parquet file. It's because BigQuery only checks the first parquet file for the schema.


I have two suggested implementations, I don't know yet which is better.

Option 1: The transformer could emit a batch early if it sees a new schema for the first time. For example, if there are 1000 events per file, and the 1001th event contains context_1, then it emits the first 1000 events as a single batch without context_1, even if the 5 minute window has not completed yet. See #1198 which is relevant for this.

Option 2: Elsewhere, we have been experimenting with using a local spark context to write the parquet file. It spills pending events to local disk, and only starts creating the output file once the window is complete. If we change the transformer to use this approach, then it also solves the parquet schema problem. However, it means the transformer needs access to disk, and this could add expense and complexity for some deployments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant