Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition in awswrangler.athena.to_iceberg(): Athena query execution may fail due to incomplete S3 Parquet file upload #3026

Open
sikyeong opened this issue Nov 22, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@sikyeong
Copy link

sikyeong commented Nov 22, 2024

Describe the bug

Hi!

awswrangler.athena.to_iceberg() might have race condition betweens3.to_parquet() and _merge_iceberg().

The logical sequence of calling s3.to_parquet() followed by _merge_iceberg() seems correct. However, there's a potential race condition where the Athena query execution request might proceed before the Parquet file upload to S3 is complete. This could lead to errors in the Athena engine, such as failing to find the file or attempting to read an incomplete upload.

(In fact, in my dev environment, when using to_iceberg() with large datasets, I frequently encounter failures with the error "HIVE_BAD_DATA: Not valid Parquet file", which I suspect is due to the query attempting to access files before they're fully uploaded.)

For this, I propose two potential solutions:

  1. Implement S3 and Glue verification (more robust but requires additional API calls)
  2. Add a callback mechanism (enables asynchronous processing but may increase code complexity)

If development time is a concern those two,
I propose a simpler alternative like add delay_time param :

to_iceberg(delay_time: int | float | None)

And implement a simple delay:

# aws-sdk-pandas/awswrangler/athena/_write_iceberg.py

def to_iceberg():
        ...

        # Create temporary external table, write the results
        s3.to_parquet(
            df=df,
            ...
        )

        if isinstance(delay_time, (int | float)):
            time.sleep(delay_time)

        _merge_iceberg(
            df=df,
           ...
        )

This would provide a basic mechanism to ensure logic stability by allowing for a configurable delay between the upload and query execution. While not ideal, it could serve as a quick interim solution to improve reliability.

Best regards,

How to Reproduce

import awswrangler as wr

wr.athena.to_iceberg(**kwargs)

OS

Amazon Linux 2, Jupyter Lab 3(notebook-al2-v2)

Python version

3.10.8

AWS SDK for pandas version

3.10.0

@sikyeong sikyeong added the bug Something isn't working label Nov 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant