Skip to content

Commit

Permalink
Merge pull request #1 from apache/master
Browse files Browse the repository at this point in the history
Syncing with upstream repository
  • Loading branch information
kkalanda-score authored Jun 28, 2023
2 parents b36e7c4 + 60ac414 commit 8f1d202
Show file tree
Hide file tree
Showing 515 changed files with 25,481 additions and 6,520 deletions.
2 changes: 1 addition & 1 deletion .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ coverage:
range: "50...100"
status:
project: # settings affecting project coverage
enabled: no
enabled: yes

# do not run coverage on patch nor changes
patch: no
Expand Down
14 changes: 13 additions & 1 deletion .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ jobs:
sparkProfile: "spark3.3"
sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.4"
sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"

steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
Expand Down Expand Up @@ -152,6 +156,9 @@ jobs:
strategy:
matrix:
include:
- flinkProfile: 'flink1.17'
sparkProfile: 'spark3.4'
sparkRuntime: 'spark3.4.0'
- flinkProfile: 'flink1.17'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.2'
Expand All @@ -170,6 +177,9 @@ jobs:
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.0'
sparkRuntime: 'spark3.0.2'
- flinkProfile: 'flink1.13'
sparkProfile: 'spark2.4'
sparkRuntime: 'spark2.4.8'
steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
Expand All @@ -193,6 +203,7 @@ jobs:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
SCALA_PROFILE: 'scala-2.12'
if: ${{ env.SPARK_PROFILE >= 'spark3' }} # Only run validation on Spark 3
run: |
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
./packaging/bundle-validation/ci_run.sh $HUDI_VERSION openjdk8
Expand All @@ -201,6 +212,7 @@ jobs:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
SCALA_PROFILE: 'scala-2.12'
if: ${{ env.SPARK_PROFILE >= 'spark3' }} # Only run validation on Spark 3
run: |
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
./packaging/bundle-validation/ci_run.sh $HUDI_VERSION openjdk11
Expand All @@ -210,7 +222,7 @@ jobs:
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
SCALA_PROFILE: 'scala-2.12'
if: ${{ endsWith(env.SPARK_PROFILE, '3.3') }} # Only Spark 3.3 supports Java 17 as of now
if: ${{ env.SPARK_PROFILE >= 'spark3.3' }} # Only Spark 3.3 and above support Java 17
run: |
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
./packaging/bundle-validation/ci_run.sh $HUDI_VERSION openjdk17
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
name: Label the PR size
steps:
- uses: codelytv/pr-size-labeler@54ef367
- uses: codelytv/pr-size-labeler@v1
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
xs_label: 'size-xs'
Expand Down
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ mvn clean javadoc:aggregate -Pjavadocs
### Build with different Spark versions

The default Spark 2.x version supported is 2.4.4. The default Spark 3.x version, corresponding to `spark3` profile is
3.3.1. The default Scala version is 2.12. Refer to the table below for building with different Spark and Scala versions.
3.4.0. The default Scala version is 2.12. Refer to the table below for building with different Spark and Scala versions.

| Maven build options | Expected Spark bundle jar name | Notes |
|:--------------------------|:---------------------------------------------|:-------------------------------------------------|
Expand All @@ -95,17 +95,18 @@ The default Spark 2.x version supported is 2.4.4. The default Spark 3.x version,
| `-Dspark3.1` | hudi-spark3.1-bundle_2.12 | For Spark 3.1.x and Scala 2.12 |
| `-Dspark3.2` | hudi-spark3.2-bundle_2.12 | For Spark 3.2.x and Scala 2.12 (same as default) |
| `-Dspark3.3` | hudi-spark3.3-bundle_2.12 | For Spark 3.3.x and Scala 2.12 |
| `-Dspark3.4` | hudi-spark3.4-bundle_2.12 | For Spark 3.4.x and Scala 2.12 |
| `-Dspark2 -Dscala-2.11` | hudi-spark-bundle_2.11 (legacy bundle name) | For Spark 2.4.4 and Scala 2.11 |
| `-Dspark2 -Dscala-2.12` | hudi-spark-bundle_2.12 (legacy bundle name) | For Spark 2.4.4 and Scala 2.12 |
| `-Dspark3` | hudi-spark3-bundle_2.12 (legacy bundle name) | For Spark 3.3.x and Scala 2.12 |
| `-Dspark3` | hudi-spark3-bundle_2.12 (legacy bundle name) | For Spark 3.4.x and Scala 2.12 |

For example,
```
# Build against Spark 3.2.x
mvn clean package -DskipTests
# Build against Spark 3.1.x
mvn clean package -DskipTests -Dspark3.1
# Build against Spark 3.4.x
mvn clean package -DskipTests -Dspark3.4
# Build against Spark 2.4.4 and Scala 2.11
mvn clean package -DskipTests -Dspark2.4 -Dscala-2.11
Expand Down Expand Up @@ -156,6 +157,11 @@ Functional tests, which are tagged with `@Tag("functional")`, can be run with ma
mvn -Pfunctional-tests test
```

Integration tests can be run with maven profile `integration-tests`.
```
mvn -Pintegration-tests verify
```

To run tests with spark event logging enabled, define the Spark event log directory. This allows visualizing test DAG and stages using Spark History Server UI.
```
mvn -Punit-tests test -DSPARK_EVLOG_DIR=/path/for/spark/event/log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@
package org.apache.hudi.aws.sync;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.GlueCatalogSyncClientConfig;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;

import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.AWSGlueClientBuilder;
import com.amazonaws.services.glue.AWSGlueAsync;
import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder;
import com.amazonaws.services.glue.model.AlreadyExistsException;
import com.amazonaws.services.glue.model.BatchCreatePartitionRequest;
import com.amazonaws.services.glue.model.BatchCreatePartitionResult;
Expand Down Expand Up @@ -65,11 +68,13 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
import static org.apache.hudi.common.util.MapUtils.containsAll;
import static org.apache.hudi.common.util.MapUtils.isNullOrEmpty;
import static org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_METADATA_FILE_LISTING;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
Expand All @@ -88,17 +93,24 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient {

private static final Logger LOG = LoggerFactory.getLogger(AWSGlueCatalogSyncClient.class);
private static final int MAX_PARTITIONS_PER_REQUEST = 100;
private final AWSGlueAsync awsGlue;
private static final long BATCH_REQUEST_SLEEP_MILLIS = 1000L;
private final AWSGlue awsGlue;
/**
* athena v2/v3 table property
* see https://docs.aws.amazon.com/athena/latest/ug/querying-hudi.html
*/
private static final String ENABLE_MDT_LISTING = "hudi.metadata-listing-enabled";
private final String databaseName;

private final Boolean skipTableArchive;
private final String enableMetadataTable;

public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
super(config);
this.awsGlue = AWSGlueClientBuilder.standard().build();
this.awsGlue = AWSGlueAsyncClientBuilder.standard().build();
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
this.skipTableArchive = config.getBooleanOrDefault(GlueCatalogSyncClientConfig.GLUE_SKIP_TABLE_ARCHIVE);
this.enableMetadataTable = Boolean.toString(config.getBoolean(GLUE_METADATA_FILE_LISTING)).toUpperCase();
}

@Override
Expand Down Expand Up @@ -140,11 +152,16 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
}).collect(Collectors.toList());

List<Future<BatchCreatePartitionResult>> futures = new ArrayList<>();

for (List<PartitionInput> batch : CollectionUtils.batches(partitionInputs, MAX_PARTITIONS_PER_REQUEST)) {
BatchCreatePartitionRequest request = new BatchCreatePartitionRequest();
request.withDatabaseName(databaseName).withTableName(tableName).withPartitionInputList(batch);
futures.add(awsGlue.batchCreatePartitionAsync(request));
}

BatchCreatePartitionResult result = awsGlue.batchCreatePartition(request);
for (Future<BatchCreatePartitionResult> future : futures) {
BatchCreatePartitionResult result = future.get();
if (CollectionUtils.nonEmpty(result.getErrors())) {
if (result.getErrors().stream().allMatch((error) -> "AlreadyExistsException".equals(error.getErrorDetail().getErrorCode()))) {
LOG.warn("Partitions already exist in glue: " + result.getErrors());
Expand All @@ -153,7 +170,6 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
+ " with error(s): " + result.getErrors());
}
}
Thread.sleep(BATCH_REQUEST_SLEEP_MILLIS);
}
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to add partitions to " + tableId(databaseName, tableName), e);
Expand All @@ -179,16 +195,19 @@ public void updatePartitionsToTable(String tableName, List<String> changedPartit
return new BatchUpdatePartitionRequestEntry().withPartitionInput(partitionInput).withPartitionValueList(partitionValues);
}).collect(Collectors.toList());

List<Future<BatchUpdatePartitionResult>> futures = new ArrayList<>();
for (List<BatchUpdatePartitionRequestEntry> batch : CollectionUtils.batches(updatePartitionEntries, MAX_PARTITIONS_PER_REQUEST)) {
BatchUpdatePartitionRequest request = new BatchUpdatePartitionRequest();
request.withDatabaseName(databaseName).withTableName(tableName).withEntries(batch);
futures.add(awsGlue.batchUpdatePartitionAsync(request));
}

BatchUpdatePartitionResult result = awsGlue.batchUpdatePartition(request);
for (Future<BatchUpdatePartitionResult> future : futures) {
BatchUpdatePartitionResult result = future.get();
if (CollectionUtils.nonEmpty(result.getErrors())) {
throw new HoodieGlueSyncException("Fail to update partitions to " + tableId(databaseName, tableName)
+ " with error(s): " + result.getErrors());
+ " with error(s): " + result.getErrors());
}
Thread.sleep(BATCH_REQUEST_SLEEP_MILLIS);
}
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to update partitions to " + tableId(databaseName, tableName), e);
Expand All @@ -203,6 +222,7 @@ public void dropPartitions(String tableName, List<String> partitionsToDrop) {
}
LOG.info("Drop " + partitionsToDrop.size() + "partition(s) in table " + tableId(databaseName, tableName));
try {
List<Future<BatchDeletePartitionResult>> futures = new ArrayList<>();
for (List<String> batch : CollectionUtils.batches(partitionsToDrop, MAX_PARTITIONS_PER_REQUEST)) {

List<PartitionValueList> partitionValueLists = batch.stream().map(partition -> {
Expand All @@ -215,13 +235,15 @@ public void dropPartitions(String tableName, List<String> partitionsToDrop) {
.withDatabaseName(databaseName)
.withTableName(tableName)
.withPartitionsToDelete(partitionValueLists);
futures.add(awsGlue.batchDeletePartitionAsync(batchDeletePartitionRequest));
}

BatchDeletePartitionResult result = awsGlue.batchDeletePartition(batchDeletePartitionRequest);
for (Future<BatchDeletePartitionResult> future : futures) {
BatchDeletePartitionResult result = future.get();
if (CollectionUtils.nonEmpty(result.getErrors())) {
throw new HoodieGlueSyncException("Fail to drop partitions to " + tableId(databaseName, tableName)
+ " with error(s): " + result.getErrors());
+ " with error(s): " + result.getErrors());
}
Thread.sleep(BATCH_REQUEST_SLEEP_MILLIS);
}
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to drop partitions to " + tableId(databaseName, tableName), e);
Expand All @@ -231,12 +253,81 @@ public void dropPartitions(String tableName, List<String> partitionsToDrop) {
@Override
public boolean updateTableProperties(String tableName, Map<String, String> tableProperties) {
try {
tableProperties.put(ENABLE_MDT_LISTING, enableMetadataTable);
return updateTableParameters(awsGlue, databaseName, tableName, tableProperties, skipTableArchive);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to update properties for table " + tableId(databaseName, tableName), e);
}
}

private void setComments(List<Column> columns, Map<String, Option<String>> commentsMap) {
columns.forEach(column -> {
String comment = commentsMap.getOrDefault(column.getName(), Option.empty()).orElse(null);
column.setComment(comment);
});
}

private String getTableDoc() {
try {
return new TableSchemaResolver(metaClient).getTableAvroSchema(true).getDoc();
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get schema's doc from storage : ", e);
}
}

@Override
public List<FieldSchema> getStorageFieldSchemas() {
try {
return new TableSchemaResolver(metaClient).getTableAvroSchema(true)
.getFields()
.stream()
.map(f -> new FieldSchema(f.name(), f.schema().getType().getName(), f.doc()))
.collect(Collectors.toList());
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get field schemas from storage : ", e);
}
}

@Override
public boolean updateTableComments(String tableName, List<FieldSchema> fromMetastore, List<FieldSchema> fromStorage) {
Table table = getTable(awsGlue, databaseName, tableName);

Map<String, Option<String>> commentsMap = fromStorage.stream().collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getComment));

StorageDescriptor storageDescriptor = table.getStorageDescriptor();
List<Column> columns = storageDescriptor.getColumns();
setComments(columns, commentsMap);

List<Column> partitionKeys = table.getPartitionKeys();
setComments(partitionKeys, commentsMap);

String tableDescription = getTableDoc();

if (getTable(awsGlue, databaseName, tableName).getStorageDescriptor().equals(storageDescriptor)
&& getTable(awsGlue, databaseName, tableName).getPartitionKeys().equals(partitionKeys)) {
// no comments have been modified / added
return false;
} else {
final Date now = new Date();
TableInput updatedTableInput = new TableInput()
.withName(tableName)
.withDescription(tableDescription)
.withTableType(table.getTableType())
.withParameters(table.getParameters())
.withPartitionKeys(partitionKeys)
.withStorageDescriptor(storageDescriptor)
.withLastAccessTime(now)
.withLastAnalyzedTime(now);

UpdateTableRequest request = new UpdateTableRequest()
.withDatabaseName(databaseName)
.withTableInput(updatedTableInput);

awsGlue.updateTable(request);
return true;
}
}

@Override
public void updateTableSchema(String tableName, MessageType newSchema) {
// ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
Expand Down Expand Up @@ -285,6 +376,7 @@ public void createTable(String tableName,
if (!config.getBoolean(HIVE_CREATE_MANAGED_TABLE)) {
params.put("EXTERNAL", "TRUE");
}
params.put(ENABLE_MDT_LISTING, this.enableMetadataTable);
params.putAll(tableProperties);

try {
Expand Down
Loading

0 comments on commit 8f1d202

Please sign in to comment.