Skip to content

Commit 4978e85

Browse files
committed
removed kafka.test.version definition and added consistent implementation of enqueueNewPartitions method
1 parent ba3c021 commit 4978e85

File tree

2 files changed

+36
-21
lines changed

2 files changed

+36
-21
lines changed

itests/qtest-druid/pom.xml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
<druid.derby.version>10.11.1.1</druid.derby.version>
3737
<druid.guava.version>16.0.1</druid.guava.version>
3838
<druid.guice.version>4.1.0</druid.guice.version>
39-
<kafka.test.version>3.9.1</kafka.test.version>
4039
<druid.guice.version>4.1.0</druid.guice.version>
4140
<slf4j.version>1.7.30</slf4j.version>
4241
</properties>
@@ -219,17 +218,17 @@
219218
<dependency>
220219
<groupId>org.apache.kafka</groupId>
221220
<artifactId>kafka_2.12</artifactId>
222-
<version>${kafka.test.version}</version>
221+
<version>${kafka.version}</version>
223222
</dependency>
224223
<dependency>
225224
<groupId>org.apache.kafka</groupId>
226225
<artifactId>kafka-clients</artifactId>
227-
<version>${kafka.test.version}</version>
226+
<version>${kafka.version}</version>
228227
</dependency>
229228
<dependency>
230229
<groupId>org.apache.kafka</groupId>
231230
<artifactId>kafka-server</artifactId>
232-
<version>${kafka.test.version}</version>
231+
<version>${kafka.version}</version>
233232
</dependency>
234233
<dependency>
235234
<groupId>org.slf4j</groupId>

kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -188,27 +188,43 @@ short getEpoch() {
188188
*/
189189
private void flushNewPartitions() {
190190
LOG.info("Flushing new partitions");
191-
Object transactionManager = getValue(kafkaProducer, "transactionManager");
192-
Set<TopicPartition> newPartitionsInTransaction =
193-
(Set<TopicPartition>) getValue(transactionManager, "newPartitionsInTransaction");
194-
if (!newPartitionsInTransaction.isEmpty()) {
195-
TransactionalRequestResult result = enqueueNewPartitions();
196-
Object sender = getValue(kafkaProducer, "sender");
197-
invoke(sender, "wakeup");
198-
result.await();
199-
}
191+
TransactionalRequestResult result = enqueueNewPartitions();
192+
Object sender = getValue(kafkaProducer, "sender");
193+
invoke(sender, "wakeup");
194+
result.await();
200195
}
201196

202197
private synchronized TransactionalRequestResult enqueueNewPartitions() {
203198
Object transactionManager = getValue(kafkaProducer, "transactionManager");
204-
Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
205-
invoke(transactionManager,
206-
"enqueueRequest",
207-
new Class[] {txnRequestHandler.getClass().getSuperclass()},
208-
new Object[] {txnRequestHandler});
209-
return (TransactionalRequestResult) getValue(txnRequestHandler,
210-
txnRequestHandler.getClass().getSuperclass(),
211-
"result");
199+
synchronized (transactionManager) {
200+
Object newPartitionsInTransaction =
201+
getValue(transactionManager, "newPartitionsInTransaction");
202+
Object newPartitionsInTransactionIsEmpty =
203+
invoke(newPartitionsInTransaction, "isEmpty");
204+
TransactionalRequestResult result;
205+
if (newPartitionsInTransactionIsEmpty instanceof Boolean
206+
&& !((Boolean) newPartitionsInTransactionIsEmpty)) {
207+
Object txnRequestHandler =
208+
invoke(transactionManager, "addPartitionsToTransactionHandler");
209+
invoke(
210+
transactionManager,
211+
"enqueueRequest",
212+
new Class[]{txnRequestHandler.getClass().getSuperclass()},
213+
new Object[]{txnRequestHandler});
214+
215+
result = (TransactionalRequestResult)
216+
getValue(
217+
txnRequestHandler,
218+
txnRequestHandler.getClass().getSuperclass(),
219+
"result");
220+
} else {
221+
// we don't have an operation but this operation string is also used in
222+
// addPartitionsToTransactionHandler.
223+
result = new TransactionalRequestResult("AddPartitionsToTxn");
224+
result.done();
225+
}
226+
return result;
227+
}
212228
}
213229

214230
@SuppressWarnings("unchecked") private static Enum<?> getEnum(String enumFullName) {

0 commit comments

Comments
 (0)