Skip to content

Commit da80b8c

Browse files
authored
[HUDI-5673] Support multi writer for bucket index with guarded lock (apache#7860)
1 parent 25f6927 commit da80b8c

23 files changed

+521
-100
lines changed

Diff for: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKQUORUM;
5050
import static org.apache.hudi.index.HoodieIndex.IndexType.BLOOM;
5151
import static org.apache.hudi.index.HoodieIndex.IndexType.BUCKET;
52+
import static org.apache.hudi.index.HoodieIndex.IndexType.FLINK_STATE;
5253
import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_BLOOM;
5354
import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_SIMPLE;
5455
import static org.apache.hudi.index.HoodieIndex.IndexType.HBASE;
@@ -73,7 +74,7 @@ public class HoodieIndexConfig extends HoodieConfig {
7374
// Builder#getDefaultIndexType has already set it according to engine type
7475
.noDefaultValue()
7576
.withValidValues(HBASE.name(), INMEMORY.name(), BLOOM.name(), GLOBAL_BLOOM.name(),
76-
SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name())
77+
SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name(), FLINK_STATE.name())
7778
.withDocumentation("Type of index to use. Default is SIMPLE on Spark engine, "
7879
+ "and INMEMORY on Flink and Java engines. "
7980
+ "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
@@ -651,6 +652,11 @@ public Builder withIndexKeyField(String keyField) {
651652
return this;
652653
}
653654

655+
public Builder withRecordKeyField(String keyField) {
656+
hoodieIndexConfig.setValue(KeyGeneratorOptions.RECORDKEY_FIELD_NAME, keyField);
657+
return this;
658+
}
659+
654660
public HoodieIndexConfig build() {
655661
hoodieIndexConfig.setDefaultValue(INDEX_TYPE, getDefaultIndexType(engineType));
656662
hoodieIndexConfig.setDefaults(HoodieIndexConfig.class.getName());

Diff for: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class HoodieConsistentBucketLayout extends HoodieStorageLayout {
3737
WriteOperationType.UPSERT,
3838
WriteOperationType.UPSERT_PREPPED,
3939
WriteOperationType.INSERT_OVERWRITE,
40+
WriteOperationType.INSERT_OVERWRITE_TABLE,
4041
WriteOperationType.DELETE,
4142
WriteOperationType.COMPACT,
4243
WriteOperationType.DELETE_PARTITION,

Diff for: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class HoodieSimpleBucketLayout extends HoodieStorageLayout {
3737
WriteOperationType.UPSERT,
3838
WriteOperationType.UPSERT_PREPPED,
3939
WriteOperationType.INSERT_OVERWRITE,
40+
WriteOperationType.INSERT_OVERWRITE_TABLE,
4041
WriteOperationType.DELETE,
4142
WriteOperationType.COMPACT,
4243
WriteOperationType.DELETE_PARTITION

Diff for: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java

+8
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,14 @@ public void writeTableMetadata(HoodieTable table, String instantTime, String act
182182
}
183183
}
184184

185+
@Override
186+
protected void preCommit(HoodieCommitMetadata metadata) {
187+
// Create a Hoodie table after startTxn which encapsulated the commits and files visible.
188+
// Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
189+
HoodieTable table = createTable(config, hadoopConf);
190+
resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants);
191+
}
192+
185193
/**
186194
* Initialize the table metadata writer, for e.g, bootstrap the metadata table
187195
* from the filesystem if it does not exist.

Diff for: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java

+20-24
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hudi.client;
2020

2121
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
22+
import org.apache.hudi.client.utils.TransactionUtils;
2223
import org.apache.hudi.common.data.HoodieListData;
2324
import org.apache.hudi.common.engine.HoodieEngineContext;
2425
import org.apache.hudi.common.fs.FSUtils;
@@ -32,6 +33,7 @@
3233
import org.apache.hudi.common.model.WriteOperationType;
3334
import org.apache.hudi.common.table.HoodieTableMetaClient;
3435
import org.apache.hudi.common.table.HoodieTableVersion;
36+
import org.apache.hudi.common.table.timeline.HoodieInstant;
3537
import org.apache.hudi.common.util.Option;
3638
import org.apache.hudi.config.HoodieWriteConfig;
3739
import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -44,7 +46,6 @@
4446
import org.apache.hudi.table.HoodieFlinkTable;
4547
import org.apache.hudi.table.HoodieTable;
4648
import org.apache.hudi.table.action.HoodieWriteMetadata;
47-
import org.apache.hudi.table.marker.WriteMarkersFactory;
4849
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
4950
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
5051
import org.apache.hudi.util.WriteStatMerger;
@@ -274,6 +275,19 @@ public void preWrite(String instantTime, WriteOperationType writeOperationType,
274275
// remove the async cleaning
275276
}
276277

278+
/**
279+
* Refresh the last transaction metadata,
280+
* should be called before the Driver starts a new transaction.
281+
*/
282+
public void preTxn(HoodieTableMetaClient metaClient) {
283+
if (txnManager.isOptimisticConcurrencyControlEnabled()) {
284+
// refresh the meta client which is reused
285+
metaClient.reloadActiveTimeline();
286+
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
287+
this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
288+
}
289+
}
290+
277291
@Override
278292
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
279293
tableServiceClient.writeTableMetadata(table, instantTime, actionType, metadata);
@@ -322,30 +336,12 @@ protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> res
322336
return result.getWriteStatuses();
323337
}
324338

325-
/**
326-
* Post commit is rewrite to be invoked after a successful commit.
327-
*
328-
* <p>The Flink write client is designed to write data set as buckets
329-
* but cleaning action should trigger after all the write actions within a
330-
* checkpoint finish.
331-
*
332-
* @param table Table to commit on
333-
* @param metadata Commit Metadata corresponding to committed instant
334-
* @param instantTime Instant Time
335-
* @param extraMetadata Additional Metadata passed by user
336-
*/
337339
@Override
338-
protected void postCommit(HoodieTable table,
339-
HoodieCommitMetadata metadata,
340-
String instantTime,
341-
Option<Map<String, String>> extraMetadata) {
342-
try {
343-
// Delete the marker directory for the instant.
344-
WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime)
345-
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
346-
} finally {
347-
this.heartbeatClient.stop(instantTime);
348-
}
340+
protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
341+
// Create a Hoodie table after startTxn which encapsulated the commits and files visible.
342+
// Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
343+
HoodieTable table = createTable(config, hadoopConf);
344+
resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants);
349345
}
350346

351347
@Override

Diff for: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@ public static HoodieIndex createIndex(HoodieFlinkEngineContext context, HoodieWr
4646
return (HoodieIndex) instance;
4747
}
4848

49-
// TODO more indexes to be added
5049
switch (config.getIndexType()) {
50+
case FLINK_STATE:
51+
// Flink state index stores the index mappings with a state-backend,
52+
// instantiates an in-memory HoodieIndex component as a placeholder.
5153
case INMEMORY:
5254
return new FlinkInMemoryStateIndex(context, config);
5355
case BLOOM:

Diff for: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java

+8
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ private FlinkOptions() {
197197
.key("index.type")
198198
.stringType()
199199
.defaultValue(HoodieIndex.IndexType.FLINK_STATE.name())
200+
.withFallbackKeys(HoodieIndexConfig.INDEX_TYPE.key())
200201
.withDescription("Index type of Flink write job, default is using state backed index.");
201202

202203
public static final ConfigOption<Boolean> INDEX_BOOTSTRAP_ENABLED = ConfigOptions
@@ -562,6 +563,13 @@ private FlinkOptions() {
562563
.defaultValue(128)
563564
.withDescription("Sort memory in MB, default 128MB");
564565

566+
// this is only for internal use
567+
public static final ConfigOption<String> WRITE_CLIENT_ID = ConfigOptions
568+
.key("write.client.id")
569+
.stringType()
570+
.defaultValue("")
571+
.withDescription("Unique identifier used to distinguish different writer pipelines for concurrent mode");
572+
565573
// ------------------------------------------------------------------------
566574
// Compaction Options
567575
// ------------------------------------------------------------------------

Diff for: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java

+37
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@
1818

1919
package org.apache.hudi.configuration;
2020

21+
import org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy;
22+
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
23+
import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
2124
import org.apache.hudi.common.config.HoodieCommonConfig;
2225
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
26+
import org.apache.hudi.common.model.WriteConcurrencyMode;
2327
import org.apache.hudi.common.model.WriteOperationType;
2428
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
2529
import org.apache.hudi.common.util.StringUtils;
30+
import org.apache.hudi.config.HoodieWriteConfig;
2631
import org.apache.hudi.exception.HoodieException;
2732
import org.apache.hudi.index.HoodieIndex;
2833
import org.apache.hudi.table.format.FilePathUtils;
@@ -227,6 +232,38 @@ public static boolean isIncrementalQuery(Configuration conf) {
227232
return conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() || conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent();
228233
}
229234

235+
/**
236+
* Returns whether the writer txn should be guarded by lock.
237+
*/
238+
public static boolean needsGuardByLock(Configuration conf) {
239+
return conf.getBoolean(FlinkOptions.METADATA_ENABLED)
240+
|| conf.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
241+
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
242+
}
243+
244+
/**
245+
* Returns the index type.
246+
*/
247+
public static HoodieIndex.IndexType getIndexType(Configuration conf) {
248+
return HoodieIndex.IndexType.valueOf(conf.getString(FlinkOptions.INDEX_TYPE));
249+
}
250+
251+
/**
252+
* Returns the index key field.
253+
*/
254+
public static String getIndexKeyField(Configuration conf) {
255+
return conf.getString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD));
256+
}
257+
258+
/**
259+
* Returns the conflict resolution strategy.
260+
*/
261+
public static ConflictResolutionStrategy getConflictResolutionStrategy(Configuration conf) {
262+
return isBucketIndexType(conf)
263+
? new BucketIndexConcurrentFileWritesConflictResolutionStrategy()
264+
: new SimpleConcurrentFileWritesConflictResolutionStrategy();
265+
}
266+
230267
// -------------------------------------------------------------------------
231268
// Utilities
232269
// -------------------------------------------------------------------------

Diff for: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.apache.hudi.util.ClusteringUtil;
4242
import org.apache.hudi.util.CompactionUtil;
4343
import org.apache.hudi.util.FlinkWriteClients;
44-
import org.apache.hudi.util.StreamerUtil;
4544

4645
import org.apache.flink.annotation.VisibleForTesting;
4746
import org.apache.flink.configuration.Configuration;
@@ -180,7 +179,7 @@ public void start() throws Exception {
180179
this.gateways = new SubtaskGateway[this.parallelism];
181180
// init table, create if not exists.
182181
this.metaClient = initTableIfNotExists(this.conf);
183-
this.ckpMetadata = initCkpMetadata(this.metaClient);
182+
this.ckpMetadata = initCkpMetadata(this.metaClient, this.conf);
184183
// the write client must create after the table creation
185184
this.writeClient = FlinkWriteClients.createWriteClient(conf);
186185
initMetadataTable(this.writeClient);
@@ -342,8 +341,8 @@ private static void initMetadataTable(HoodieFlinkWriteClient<?> writeClient) {
342341
writeClient.initMetadataTable();
343342
}
344343

345-
private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) throws IOException {
346-
CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePath());
344+
private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient, Configuration conf) throws IOException {
345+
CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient, conf.getString(FlinkOptions.WRITE_CLIENT_ID));
347346
ckpMetadata.bootstrap();
348347
return ckpMetadata;
349348
}
@@ -372,6 +371,8 @@ private void addEventToBuffer(WriteMetadataEvent event) {
372371
}
373372

374373
private void startInstant() {
374+
// refresh the last txn metadata
375+
this.writeClient.preTxn(this.metaClient);
375376
// put the assignment in front of metadata generation,
376377
// because the instant request from write task is asynchronous.
377378
this.instant = this.writeClient.startCommit(tableState.commitAction, this.metaClient);
@@ -391,8 +392,7 @@ private void startInstant() {
391392
* until it finds a new inflight instant on the timeline.
392393
*/
393394
private void initInstant(String instant) {
394-
HoodieTimeline completedTimeline =
395-
StreamerUtil.createMetaClient(conf).getActiveTimeline().filterCompletedInstants();
395+
HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants();
396396
executor.execute(() -> {
397397
if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) || completedTimeline.containsInstant(instant)) {
398398
// the last instant committed successfully

Diff for: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void initializeState(StateInitializationContext context) throws Exception
127127
this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
128128
this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true);
129129
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
130-
this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());
130+
this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient(), this.conf.getString(FlinkOptions.WRITE_CLIENT_ID));
131131
this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
132132

133133
preLoadIndexRecords();
@@ -229,17 +229,14 @@ protected void loadRecords(String partitionPath) throws Exception {
229229
.filter(logFile -> isValidFile(logFile.getFileStatus()))
230230
.map(logFile -> logFile.getPath().toString())
231231
.collect(toList());
232-
HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
233-
writeConfig, hadoopConf);
234232

235-
try {
233+
try (HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
234+
writeConfig, hadoopConf)) {
236235
for (String recordKey : scanner.getRecords().keySet()) {
237236
output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice))));
238237
}
239238
} catch (Exception e) {
240239
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
241-
} finally {
242-
scanner.close();
243240
}
244241
}
245242
}

Diff for: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
148148
TypeInformation.of(WriteMetadataEvent.class)
149149
));
150150

151-
this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), this.metaClient.getBasePath());
151+
this.ckpMetadata = CkpMetadata.getInstance(this.metaClient, this.config.getString(FlinkOptions.WRITE_CLIENT_ID));
152152
this.currentInstant = lastPendingInstant();
153153
if (context.isRestored()) {
154154
restoreWriteMetadata();

Diff for: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.hudi.common.fs.FSUtils;
2222
import org.apache.hudi.common.table.HoodieTableMetaClient;
23+
import org.apache.hudi.common.util.StringUtils;
2324
import org.apache.hudi.common.util.ValidationUtils;
2425
import org.apache.hudi.configuration.FlinkOptions;
2526
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -75,12 +76,13 @@ public class CkpMetadata implements Serializable {
7576
private List<String> instantCache;
7677

7778
private CkpMetadata(Configuration config) {
78-
this(FSUtils.getFs(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)), config.getString(FlinkOptions.PATH));
79+
this(FSUtils.getFs(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)),
80+
config.getString(FlinkOptions.PATH), config.getString(FlinkOptions.WRITE_CLIENT_ID));
7981
}
8082

81-
private CkpMetadata(FileSystem fs, String basePath) {
83+
private CkpMetadata(FileSystem fs, String basePath, String uniqueId) {
8284
this.fs = fs;
83-
this.path = new Path(ckpMetaPath(basePath));
85+
this.path = new Path(ckpMetaPath(basePath, uniqueId));
8486
}
8587

8688
public void close() {
@@ -208,12 +210,17 @@ public static CkpMetadata getInstance(Configuration config) {
208210
return new CkpMetadata(config);
209211
}
210212

211-
public static CkpMetadata getInstance(FileSystem fs, String basePath) {
212-
return new CkpMetadata(fs, basePath);
213+
public static CkpMetadata getInstance(HoodieTableMetaClient metaClient, String uniqueId) {
214+
return new CkpMetadata(metaClient.getFs(), metaClient.getBasePath(), uniqueId);
213215
}
214216

215-
protected static String ckpMetaPath(String basePath) {
216-
return basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + CKP_META;
217+
public static CkpMetadata getInstance(FileSystem fs, String basePath, String uniqueId) {
218+
return new CkpMetadata(fs, basePath, uniqueId);
219+
}
220+
221+
protected static String ckpMetaPath(String basePath, String uniqueId) {
222+
String metaPath = basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + CKP_META;
223+
return StringUtils.isNullOrEmpty(uniqueId) ? metaPath : metaPath + "_" + uniqueId;
217224
}
218225

219226
private Path fullPath(String fileName) {

Diff for: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java

+13-9
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ public static HoodieWriteConfig getHoodieClientConfig(
216216
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
217217
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
218218
.build())
219+
.withIndexConfig(StreamerUtil.getIndexConfig(conf))
219220
.withPayloadConfig(getPayloadConfig(conf))
220221
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
221222
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
@@ -226,15 +227,18 @@ public static HoodieWriteConfig getHoodieClientConfig(
226227

227228
if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
228229
builder.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL);
229-
if (!conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
230-
builder.withLockConfig(HoodieLockConfig.newBuilder()
231-
.withLockProvider(FileSystemBasedLockProvider.class)
232-
.withLockWaitTimeInMillis(2000L) // 2s
233-
.withFileSystemLockExpire(1) // 1 minute
234-
.withClientNumRetries(30)
235-
.withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
236-
.build());
237-
}
230+
}
231+
232+
if (!conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()) && OptionsResolver.needsGuardByLock(conf)) {
233+
// configure the fs lock provider by default
234+
builder.withLockConfig(HoodieLockConfig.newBuilder()
235+
.withConflictResolutionStrategy(OptionsResolver.getConflictResolutionStrategy(conf))
236+
.withLockProvider(FileSystemBasedLockProvider.class)
237+
.withLockWaitTimeInMillis(2000L) // 2s
238+
.withFileSystemLockExpire(1) // 1 minute
239+
.withClientNumRetries(30)
240+
.withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
241+
.build());
238242
}
239243

240244
// do not configure cleaning strategy as LAZY until multi-writers is supported.

0 commit comments

Comments
 (0)