Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Iceberg integration #47

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open

WIP Iceberg integration #47

wants to merge 25 commits into from

Conversation

margon8
Copy link
Contributor

@margon8 margon8 commented Sep 9, 2024

Add Iceberg support to Metabolic

@margon8 margon8 added the wip/donotmerge Do not merge label Sep 9, 2024
@braislchao braislchao self-assigned this Sep 10, 2024
@braislchao
Copy link
Collaborator

Generic Table source integration

  • A Table source can be format Iceberg or format Delta
  • We need to know its catalog and its name
  • With the catalog and the name we can find it in the corresponding implementation and get a generic Dataframe in both cases
    sources: [
        {
            catalog: "data_lake.silver_events_v3"
            name: "silver_events_v3"
            format: TABLE 
        },
    ]

@braislchao
Copy link
Collaborator

braislchao commented Sep 11, 2024

Test catalog based readstream in Iceberg reads:

data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("checkpointLocation", checkpointPath)
    .toTable("database.table_name")

Queries with streaming sources must be executed with writeStream.start();
local.data_lake.letters
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

@braislchao
Copy link
Collaborator

braislchao commented Sep 11, 2024

For catalog compatibility we can use Iceberg Session Catalog to add compatibility with Delta:

Spark's built-in catalog supports existing v1 and v2 tables tracked in a Hive Metastore. This configures Spark to use Iceberg's SparkSessionCatalog as a wrapper around that session catalog. When a table is not an Iceberg table, the built-in catalog will be used to load it instead.

@braislchao
Copy link
Collaborator

braislchao commented Sep 17, 2024

We have to find a way to standarize tests in local environment between this two options:

  • Using local directories and catalog
  • Referencing a glue catalog and s3 dev environments

For the moment, we are going to test it directly in glue dev environment with the following Spark configuration:

    implicit val spark = sparkBuilder
      .appName(s" Metabolic Mapper - $configPath")
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension")
      .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
      .config("spark.databricks.delta.schema.autoMerge.enabled", "true")
      .config("spark.databricks.delta.optimize.repartition.enabled", "true")
      .config("spark.databricks.delta.vacuum.parallelDelete.enabled", "true")
      .config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
      .config("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
      .config("spark.sql.catalog.spark_catalog.client.region", "eu-central-1")
      .config("spark.sql.defaultCatalog", "spark_catalog")
      .getOrCreate()

Since we are using iceberg 1.6.1, it's important to take into account this configuration options and not include iceberg in the --datalake-formats property:

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html#aws-glue-programming-etl-format-iceberg-enable

@braislchao braislchao added the enhancement New feature or request label Sep 18, 2024
@braislchao
Copy link
Collaborator

braislchao commented Sep 19, 2024

To make the iceberg integration work, we needed to add the iceberg libraries as --extra-jars.
This worked maintaining the --datalake-formats property as delta:

"--extra-jars" = "s3://factorial-metabolic/extra-libs/metabolic-core-iceberg-assembly.jar,s3://factorial-metabolic/extra-libs/iceberg-aws-bundle-1.6.1.jar,s3://factorial-metabolic/extra-libs/iceberg-spark-runtime-3.3_2.12-1.6.1.jar"
"--datalake-formats" = "delta"

In the previous configuration we received an error of missing warehouse location. Seems like the warehouse is mandatory for Flue to be able to write in a s3 folder as the main catalog:

org.apache.iceberg.exceptions.ValidationException: Cannot derive default warehouse location, warehouse path must not be null or empty at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)

This property must be configured in Metabolic Spark configuration:

.config("spark.sql.catalog.spark_catalog.warehouse", "s3://factorial-metabolic/data-lake/dev/feature_test_iceberg_write/")

@braislchao
Copy link
Collaborator

Added two changes not related with this PR but needed for test integrity:

  • Change log4j2.properties configuration in test environment
  • Ignore Secret Manager test (probably we need to solve the Token issues but first I need to understand the functionality of this test)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request wip/donotmerge Do not merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants