Skip to content

Commit ff590c6

Browse files
authoredJan 28, 2023
[HUDI-5023] Switching default Write Executor type to SIMPLE (apache#7476)
This change switches default Write Executor to be SIMPLE ie one bypassing reliance on any kind of Queue (either BoundedInMemory or Disruptor's one). This should considerably trim down on Runtime (compared to BIMQ) Compute wasted (compared to BIMQ, Disruptor) Since it eliminates unnecessary intermediary "staging" of the records in the queue (for ex, in Spark such in-memory enqueueing occurs at the ingress points, ie shuffling), and allows to handle records writing in one pass (even avoiding making copies of the records in the future)
1 parent 1ecc040 commit ff590c6

File tree

17 files changed

+200
-201
lines changed

17 files changed

+200
-201
lines changed
 

‎hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

+28-20
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
import org.apache.hudi.common.util.ReflectionUtils;
5555
import org.apache.hudi.common.util.StringUtils;
5656
import org.apache.hudi.common.util.ValidationUtils;
57+
import org.apache.hudi.common.util.VisibleForTesting;
58+
import org.apache.hudi.common.util.queue.DisruptorWaitStrategyType;
5759
import org.apache.hudi.common.util.queue.ExecutorType;
5860
import org.apache.hudi.config.metrics.HoodieMetricsCloudWatchConfig;
5961
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
@@ -97,8 +99,7 @@
9799
import java.util.function.Supplier;
98100
import java.util.stream.Collectors;
99101

100-
import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY;
101-
import static org.apache.hudi.common.util.queue.ExecutorType.DISRUPTOR;
102+
import static org.apache.hudi.common.util.queue.ExecutorType.SIMPLE;
102103
import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;
103104
import static org.apache.hudi.table.marker.ConflictDetectionUtils.getDefaultEarlyConflictDetectionStrategy;
104105

@@ -158,10 +159,10 @@ public class HoodieWriteConfig extends HoodieConfig {
158159
.withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` "
159160
+ "extract a key out of incoming records.");
160161

161-
public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty
162+
public static final ConfigProperty<String> WRITE_EXECUTOR_TYPE = ConfigProperty
162163
.key("hoodie.write.executor.type")
163-
.defaultValue(BOUNDED_IN_MEMORY.name())
164-
.withValidValues(BOUNDED_IN_MEMORY.name(), DISRUPTOR.name())
164+
.defaultValue(SIMPLE.name())
165+
.withValidValues(Arrays.stream(ExecutorType.values()).map(Enum::name).toArray(String[]::new))
165166
.sinceVersion("0.13.0")
166167
.withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue."
167168
+ "BOUNDED_IN_MEMORY(default): Use LinkedBlockingQueue as a bounded in-memory queue, this queue will use extra lock to balance producers and consumer"
@@ -271,15 +272,15 @@ public class HoodieWriteConfig extends HoodieConfig {
271272
.defaultValue(String.valueOf(4 * 1024 * 1024))
272273
.withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes.");
273274

274-
public static final ConfigProperty<String> WRITE_DISRUPTOR_BUFFER_SIZE = ConfigProperty
275+
public static final ConfigProperty<String> WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE = ConfigProperty
275276
.key("hoodie.write.executor.disruptor.buffer.size")
276277
.defaultValue(String.valueOf(1024))
277278
.sinceVersion("0.13.0")
278279
.withDocumentation("The size of the Disruptor Executor ring buffer, must be power of 2");
279280

280-
public static final ConfigProperty<String> WRITE_WAIT_STRATEGY = ConfigProperty
281+
public static final ConfigProperty<String> WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY = ConfigProperty
281282
.key("hoodie.write.executor.disruptor.wait.strategy")
282-
.defaultValue("BLOCKING_WAIT")
283+
.defaultValue(DisruptorWaitStrategyType.BLOCKING_WAIT.name())
283284
.sinceVersion("0.13.0")
284285
.withDocumentation("Strategy employed for making Disruptor Executor wait on a cursor. Other options are "
285286
+ "SLEEPING_WAIT, it attempts to be conservative with CPU usage by using a simple busy wait loop"
@@ -1107,7 +1108,7 @@ public String getKeyGeneratorClass() {
11071108
}
11081109

11091110
public ExecutorType getExecutorType() {
1110-
return ExecutorType.valueOf(getStringOrDefault(EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
1111+
return ExecutorType.valueOf(getStringOrDefault(WRITE_EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
11111112
}
11121113

11131114
public boolean isCDCEnabled() {
@@ -1175,12 +1176,12 @@ public int getWriteBufferLimitBytes() {
11751176
return Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE));
11761177
}
11771178

1178-
public Option<String> getWriteExecutorWaitStrategy() {
1179-
return Option.of(getString(WRITE_WAIT_STRATEGY));
1179+
public String getWriteExecutorDisruptorWaitStrategy() {
1180+
return getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY);
11801181
}
11811182

1182-
public Option<Integer> getDisruptorWriteBufferSize() {
1183-
return Option.of(Integer.parseInt(getStringOrDefault(WRITE_DISRUPTOR_BUFFER_SIZE)));
1183+
public Integer getWriteExecutorDisruptorWriteBufferSize() {
1184+
return Integer.parseInt(getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE));
11841185
}
11851186

11861187
public boolean shouldCombineBeforeInsert() {
@@ -1987,7 +1988,7 @@ public ApiSite getDatadogApiSite() {
19871988
public String getDatadogApiKey() {
19881989
if (props.containsKey(HoodieMetricsDatadogConfig.API_KEY.key())) {
19891990
return getString(HoodieMetricsDatadogConfig.API_KEY);
1990-
1991+
19911992
} else {
19921993
Supplier<String> apiKeySupplier = ReflectionUtils.loadClass(
19931994
getString(HoodieMetricsDatadogConfig.API_KEY_SUPPLIER));
@@ -2481,7 +2482,7 @@ public Builder withKeyGenerator(String keyGeneratorClass) {
24812482
}
24822483

24832484
public Builder withExecutorType(String executorClass) {
2484-
writeConfig.setValue(EXECUTOR_TYPE, executorClass);
2485+
writeConfig.setValue(WRITE_EXECUTOR_TYPE, executorClass);
24852486
return this;
24862487
}
24872488

@@ -2536,13 +2537,13 @@ public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
25362537
return this;
25372538
}
25382539

2539-
public Builder withWriteWaitStrategy(String waitStrategy) {
2540-
writeConfig.setValue(WRITE_WAIT_STRATEGY, String.valueOf(waitStrategy));
2540+
public Builder withWriteExecutorDisruptorWaitStrategy(String waitStrategy) {
2541+
writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY, String.valueOf(waitStrategy));
25412542
return this;
25422543
}
25432544

2544-
public Builder withWriteBufferSize(int size) {
2545-
writeConfig.setValue(WRITE_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
2545+
public Builder withWriteExecutorDisruptorWriteBufferSize(long size) {
2546+
writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
25462547
return this;
25472548
}
25482549

@@ -2970,8 +2971,15 @@ private void validate() {
29702971
}
29712972

29722973
public HoodieWriteConfig build() {
2974+
return build(true);
2975+
}
2976+
2977+
@VisibleForTesting
2978+
public HoodieWriteConfig build(boolean shouldValidate) {
29732979
setDefaults();
2974-
validate();
2980+
if (shouldValidate) {
2981+
validate();
2982+
}
29752983
// Build WriteConfig at the end
29762984
return new HoodieWriteConfig(engineType, writeConfig.getProps());
29772985
}

‎hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java

+16-14
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import org.apache.avro.Schema;
2222
import org.apache.hudi.client.WriteStatus;
2323
import org.apache.hudi.client.utils.LazyIterableIterator;
24-
import org.apache.hudi.common.config.TypedProperties;
2524
import org.apache.hudi.common.engine.TaskContextSupplier;
2625
import org.apache.hudi.common.model.HoodieRecord;
26+
import org.apache.hudi.common.util.queue.ExecutorType;
2727
import org.apache.hudi.config.HoodieWriteConfig;
2828
import org.apache.hudi.io.CreateHandleFactory;
2929
import org.apache.hudi.io.WriteHandleFactory;
@@ -90,23 +90,25 @@ public R getResult() {
9090
}
9191
}
9292

93-
static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema,
94-
HoodieWriteConfig config) {
95-
return getCloningTransformerInternal(schema, config.getProps());
93+
static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformer(Schema schema,
94+
HoodieWriteConfig writeConfig) {
95+
return getTransformerInternal(schema, writeConfig);
9696
}
9797

98-
static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema) {
99-
return getCloningTransformerInternal(schema, new TypedProperties());
100-
}
98+
private static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(Schema schema,
99+
HoodieWriteConfig writeConfig) {
100+
// NOTE: Whether record have to be cloned here is determined based on the executor type used
101+
// for writing: executors relying on an inner queue, will be keeping references to the records
102+
// and therefore in the environments where underlying buffer holding the record could be
103+
// reused (for ex, Spark) we need to make sure that we get a clean copy of
104+
// it since these records will be subsequently buffered (w/in the in-memory queue);
105+
// Only case when we don't need to make a copy is when using [[SimpleExecutor]] which
106+
// is guaranteed to not hold on to references to any records
107+
boolean shouldClone = writeConfig.getExecutorType() != ExecutorType.SIMPLE;
101108

102-
private static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformerInternal(Schema schema,
103-
TypedProperties props) {
104109
return record -> {
105-
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
106-
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
107-
// it since these records will be subsequently buffered (w/in the in-memory queue)
108-
HoodieRecord<T> clonedRecord = record.copy();
109-
return new HoodieInsertValueGenResult(clonedRecord, schema, props);
110+
HoodieRecord<T> clonedRecord = shouldClone ? record.copy() : record;
111+
return new HoodieInsertValueGenResult(clonedRecord, schema, writeConfig.getProps());
110112
};
111113
}
112114

‎hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020

2121
import org.apache.hudi.common.util.Functions;
2222
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
23-
import org.apache.hudi.common.util.queue.SimpleHoodieExecutor;
2423
import org.apache.hudi.common.util.queue.DisruptorExecutor;
2524
import org.apache.hudi.common.util.queue.ExecutorType;
26-
import org.apache.hudi.common.util.queue.HoodieExecutor;
2725
import org.apache.hudi.common.util.queue.HoodieConsumer;
26+
import org.apache.hudi.common.util.queue.HoodieExecutor;
27+
import org.apache.hudi.common.util.queue.SimpleExecutor;
2828
import org.apache.hudi.config.HoodieWriteConfig;
2929
import org.apache.hudi.exception.HoodieException;
3030

@@ -34,28 +34,28 @@
3434
public class ExecutorFactory {
3535

3636
public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
37-
Iterator<I> inputItr,
38-
HoodieConsumer<O, E> consumer,
39-
Function<I, O> transformFunction) {
37+
Iterator<I> inputItr,
38+
HoodieConsumer<O, E> consumer,
39+
Function<I, O> transformFunction) {
4040
return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop());
4141
}
4242

4343
public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
44-
Iterator<I> inputItr,
45-
HoodieConsumer<O, E> consumer,
46-
Function<I, O> transformFunction,
47-
Runnable preExecuteRunnable) {
44+
Iterator<I> inputItr,
45+
HoodieConsumer<O, E> consumer,
46+
Function<I, O> transformFunction,
47+
Runnable preExecuteRunnable) {
4848
ExecutorType executorType = hoodieConfig.getExecutorType();
4949

5050
switch (executorType) {
5151
case BOUNDED_IN_MEMORY:
5252
return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer,
5353
transformFunction, preExecuteRunnable);
5454
case DISRUPTOR:
55-
return new DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr, consumer,
56-
transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(), preExecuteRunnable);
55+
return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer,
56+
transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
5757
case SIMPLE:
58-
return new SimpleHoodieExecutor<>(inputItr, consumer, transformFunction);
58+
return new SimpleExecutor<>(inputItr, consumer, transformFunction);
5959
default:
6060
throw new HoodieException("Unsupported Executor Type " + executorType);
6161
}

‎hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ protected List<WriteStatus> computeNext() {
6060
try {
6161
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
6262
bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(),
63-
getCloningTransformer(schema, hoodieConfig));
63+
getTransformer(schema, hoodieConfig));
6464
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
6565
checkState(result != null && !result.isEmpty());
6666
return result;

‎hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ protected List<WriteStatus> computeNext() {
6464
try {
6565
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
6666
bufferedIteratorExecutor =
67-
ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getCloningTransformer(schema));
67+
ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getTransformer(schema, hoodieConfig));
6868
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
6969
checkState(result != null && !result.isEmpty());
7070
return result;

‎hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ protected List<WriteStatus> computeNext() {
8787
}
8888

8989
bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(),
90-
getCloningTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());
90+
getTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());
9191

9292
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
9393
checkState(result != null && !result.isEmpty());

‎hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java

+13-15
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
2424
import org.apache.hudi.common.util.Option;
2525
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
26+
import org.apache.hudi.common.util.queue.ExecutorType;
2627
import org.apache.hudi.common.util.queue.HoodieConsumer;
2728
import org.apache.hudi.config.HoodieWriteConfig;
2829
import org.apache.hudi.exception.HoodieException;
@@ -41,18 +42,21 @@
4142

4243
import scala.Tuple2;
4344

44-
import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer;
45+
import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformer;
4546
import static org.junit.jupiter.api.Assertions.assertEquals;
4647
import static org.junit.jupiter.api.Assertions.assertFalse;
4748
import static org.junit.jupiter.api.Assertions.assertThrows;
4849
import static org.junit.jupiter.api.Assertions.assertTrue;
49-
import static org.mockito.Mockito.mock;
50-
import static org.mockito.Mockito.when;
5150

5251
public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness {
5352

5453
private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
5554

55+
private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
56+
.withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name())
57+
.withWriteBufferLimitBytes(1024)
58+
.build(false);
59+
5660
@BeforeEach
5761
public void setUp() throws Exception {
5862
initTestDataGenerator();
@@ -74,8 +78,6 @@ public void testExecutor() {
7478
final int recordNumber = 100;
7579
final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, recordNumber);
7680

77-
HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
78-
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
7981
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
8082
new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
8183

@@ -94,8 +96,8 @@ public Integer finish() {
9496

9597
BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
9698
try {
97-
executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
98-
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
99+
executor = new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
100+
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), getPreExecuteRunnable());
99101
int result = executor.execute();
100102

101103
assertEquals(100, result);
@@ -113,8 +115,6 @@ public Integer finish() {
113115
public void testInterruptExecutor() {
114116
final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
115117

116-
HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
117-
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
118118
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
119119
new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
120120

@@ -136,8 +136,8 @@ public Integer finish() {
136136
};
137137

138138
BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor =
139-
new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
140-
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
139+
new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
140+
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), getPreExecuteRunnable());
141141

142142
// Interrupt the current thread (therefore triggering executor to throw as soon as it
143143
// invokes [[get]] on the [[CompletableFuture]])
@@ -154,8 +154,6 @@ public Integer finish() {
154154

155155
@Test
156156
public void testExecutorTermination() {
157-
HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
158-
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
159157
Iterator<GenericRecord> unboundedRecordIter = new Iterator<GenericRecord>() {
160158
@Override
161159
public boolean hasNext() {
@@ -181,8 +179,8 @@ public Integer finish() {
181179
};
182180

183181
BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor =
184-
new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
185-
consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA),
182+
new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
183+
consumer, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig),
186184
getPreExecuteRunnable());
187185
executor.shutdownNow();
188186
boolean terminatedGracefully = executor.awaitTermination();

0 commit comments

Comments
 (0)
Please sign in to comment.