Skip to content

Commit ba5963a

Browse files
committed
enables kafka_storage_handler test and some props refractoring changes
1 parent 322009c commit ba5963a

File tree

5 files changed

+19
-70
lines changed

5 files changed

+19
-70
lines changed

itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort, Intege
8383

8484
properties.setProperty("zookeeper.connect", zkString);
8585
properties.setProperty("broker.id", String.valueOf(1));
86-
properties.setProperty("host.name", LOCALHOST);
87-
properties.setProperty("port", Integer.toString(brokerPort));
86+
properties.setProperty("listeners", "PLAINTEXT://" + LOCALHOST + ":" + Integer.toString(brokerPort));
87+
properties.setProperty("advertised.listeners", "PLAINTEXT://" + LOCALHOST + ":" + Integer.toString(brokerPort));
8888
properties.setProperty("log.dir", logDir);
8989
// This property is very important, we are sending form records with a specific time
9090
// Thus need to make sure that they don't get DELETED

kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

33+
import java.time.Duration;
3334
import java.util.Iterator;
3435
import java.util.Properties;
3536

@@ -150,7 +151,7 @@ private synchronized void initialize(KafkaInputSplit inputSplit, Configuration j
150151
LOG.trace("total read bytes [{}]", readBytes);
151152
if (consumer != null) {
152153
consumer.wakeup();
153-
consumer.close();
154+
consumer.close(Duration.ZERO);
154155
}
155156
}
156157

kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.slf4j.LoggerFactory;
3838

3939
import java.io.IOException;
40+
import java.time.Duration;
4041
import java.util.Iterator;
4142
import java.util.Properties;
4243

@@ -150,7 +151,7 @@ private void cleanRowBoat() {
150151
LOG.trace("total read bytes [{}]", readBytes);
151152
if (consumer != null) {
152153
consumer.wakeup();
153-
consumer.close();
154+
consumer.close(Duration.ZERO);
154155
}
155156
}
156157

ql/src/test/queries/clientpositive/kafka_storage_handler.q

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
--! qt:disabled:HIVE-23985
21

32
SET hive.vectorized.execution.enabled=true;
43

ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out

Lines changed: 13 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,10 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
225225
10
226226
PREHOOK: query: Drop table kafka_table_offsets
227227
PREHOOK: type: DROPTABLE
228+
PREHOOK: Output: database:default
228229
POSTHOOK: query: Drop table kafka_table_offsets
229230
POSTHOOK: type: DROPTABLE
231+
POSTHOOK: Output: database:default
230232
PREHOOK: query: create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp)
231233
PREHOOK: type: CREATETABLE
232234
PREHOOK: Output: database:default
@@ -257,8 +259,10 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
257259
0 -1
258260
PREHOOK: query: Drop table orc_kafka_table
259261
PREHOOK: type: DROPTABLE
262+
PREHOOK: Output: database:default
260263
POSTHOOK: query: Drop table orc_kafka_table
261264
POSTHOOK: type: DROPTABLE
265+
POSTHOOK: Output: database:default
262266
PREHOOK: query: Create table orc_kafka_table (partition_id int, row_offset bigint, kafka_ts bigint,
263267
`__time` timestamp , `page` string, `user` string, `language` string,
264268
`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean,
@@ -435,10 +439,12 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
435439
PREHOOK: query: Drop table kafka_table_offsets
436440
PREHOOK: type: DROPTABLE
437441
PREHOOK: Input: default@kafka_table_offsets
442+
PREHOOK: Output: database:default
438443
PREHOOK: Output: default@kafka_table_offsets
439444
POSTHOOK: query: Drop table kafka_table_offsets
440445
POSTHOOK: type: DROPTABLE
441446
POSTHOOK: Input: default@kafka_table_offsets
447+
POSTHOOK: Output: database:default
442448
POSTHOOK: Output: default@kafka_table_offsets
443449
PREHOOK: query: create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp)
444450
PREHOOK: type: CREATETABLE
@@ -451,10 +457,12 @@ POSTHOOK: Output: default@kafka_table_offsets
451457
PREHOOK: query: Drop table orc_kafka_table
452458
PREHOOK: type: DROPTABLE
453459
PREHOOK: Input: default@orc_kafka_table
460+
PREHOOK: Output: database:default
454461
PREHOOK: Output: default@orc_kafka_table
455462
POSTHOOK: query: Drop table orc_kafka_table
456463
POSTHOOK: type: DROPTABLE
457464
POSTHOOK: Input: default@orc_kafka_table
465+
POSTHOOK: Output: database:default
458466
POSTHOOK: Output: default@orc_kafka_table
459467
PREHOOK: query: Create table orc_kafka_table (partition_id int, row_offset bigint, kafka_ts bigint,
460468
`__time` timestamp , `page` string, `user` string, `language` string,
@@ -1077,7 +1085,7 @@ POSTHOOK: query: explain extended select distinct `__offset`, cast(`__timestamp`
10771085
POSTHOOK: type: QUERY
10781086
POSTHOOK: Input: default@wiki_kafka_avro_table
10791087
POSTHOOK: Output: hdfs://### HDFS PATH ###
1080-
OPTIMIZED SQL: SELECT `__offset` AS `$f0`, CAST(`__timestamp` AS TIMESTAMP) AS `$f1`, `__key` AS `$f2`
1088+
OPTIMIZED SQL: SELECT `__offset`, CAST(`__timestamp` AS TIMESTAMP) AS `_c1`, `__key`
10811089
FROM `default`.`wiki_kafka_avro_table`
10821090
GROUP BY `__offset`, CAST(`__timestamp` AS TIMESTAMP), `__key`
10831091
STAGE DEPENDENCIES:
@@ -1408,7 +1416,7 @@ POSTHOOK: query: explain extended select distinct `__offset`, cast(`__timestamp`
14081416
POSTHOOK: type: QUERY
14091417
POSTHOOK: Input: default@wiki_kafka_avro_table
14101418
POSTHOOK: Output: hdfs://### HDFS PATH ###
1411-
OPTIMIZED SQL: SELECT `__offset` AS `$f0`, CAST(`__timestamp` AS TIMESTAMP) AS `$f1`, `__key` AS `$f2`
1419+
OPTIMIZED SQL: SELECT `__offset`, CAST(`__timestamp` AS TIMESTAMP) AS `_c1`, `__key`
14121420
FROM `default`.`wiki_kafka_avro_table`
14131421
GROUP BY `__offset`, CAST(`__timestamp` AS TIMESTAMP), `__key`
14141422
STAGE DEPENDENCIES:
@@ -1569,72 +1577,12 @@ STAGE PLANS:
15691577
output format: org.apache.hadoop.hive.kafka.KafkaOutputFormat
15701578
properties:
15711579
EXTERNAL TRUE
1572-
avro.schema.literal {
1573-
"type" : "record",
1574-
"name" : "Wikipedia",
1575-
"namespace" : "org.apache.hive.kafka",
1576-
"version": "1",
1577-
"fields" : [ {
1578-
"name" : "isrobot",
1579-
"type" : "boolean"
1580-
}, {
1581-
"name" : "channel",
1582-
"type" : "string"
1583-
}, {
1584-
"name" : "timestamp",
1585-
"type" : "string"
1586-
}, {
1587-
"name" : "flags",
1588-
"type" : "string"
1589-
}, {
1590-
"name" : "isunpatrolled",
1591-
"type" : "boolean"
1592-
}, {
1593-
"name" : "page",
1594-
"type" : "string"
1595-
}, {
1596-
"name" : "diffurl",
1597-
"type" : "string"
1598-
}, {
1599-
"name" : "added",
1600-
"type" : "long"
1601-
}, {
1602-
"name" : "comment",
1603-
"type" : "string"
1604-
}, {
1605-
"name" : "commentlength",
1606-
"type" : "long"
1607-
}, {
1608-
"name" : "isnew",
1609-
"type" : "boolean"
1610-
}, {
1611-
"name" : "isminor",
1612-
"type" : "boolean"
1613-
}, {
1614-
"name" : "delta",
1615-
"type" : "long"
1616-
}, {
1617-
"name" : "isanonymous",
1618-
"type" : "boolean"
1619-
}, {
1620-
"name" : "user",
1621-
"type" : "string"
1622-
}, {
1623-
"name" : "deltabucket",
1624-
"type" : "double"
1625-
}, {
1626-
"name" : "deleted",
1627-
"type" : "long"
1628-
}, {
1629-
"name" : "namespace",
1630-
"type" : "string"
1631-
} ]
1632-
}
1580+
avro.schema.literal {"type":"record","name":"Wikipedia","namespace":"org.apache.hive.kafka","fields":[{"name":"isrobot","type":"boolean"},{"name":"channel","type":"string"},{"name":"timestamp","type":"string"},{"name":"flags","type":"string"},{"name":"isunpatrolled","type":"boolean"},{"name":"page","type":"string"},{"name":"diffurl","type":"string"},{"name":"added","type":"long"},{"name":"comment","type":"string"},{"name":"commentlength","type":"long"},{"name":"isnew","type":"boolean"},{"name":"isminor","type":"boolean"},{"name":"delta","type":"long"},{"name":"isanonymous","type":"boolean"},{"name":"user","type":"string"},{"name":"deltabucket","type":"double"},{"name":"deleted","type":"long"},{"name":"namespace","type":"string"}],"version":"1"}
16331581
bucketing_version 2
16341582
column.name.delimiter ,
1635-
columns isrobot,channel,timestamp,flags,isunpatrolled,page,diffurl,added,comment,commentlength,isnew,isminor,delta,isanonymous,user,deltabucket,deleted,namespace,__key,__partition,__offset,__timestamp
1583+
columns isrobot,channel,timestamp,flags,isunpatrolled,page,diffurl,added,comment,commentlength,isnew,isminor,delta,isanonymous,user,deltabucket,deleted,namespace
16361584
columns.comments 'from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer'
1637-
columns.types boolean:string:string:string:boolean:string:string:bigint:string:bigint:boolean:boolean:bigint:boolean:string:double:bigint:string:binary:int:bigint:bigint
1585+
columns.types boolean,string,string,string,boolean,string,string,bigint,string,bigint,boolean,boolean,bigint,boolean,string,double,bigint,string
16381586
#### A masked pattern was here ####
16391587
hive.kafka.max.retries 6
16401588
hive.kafka.metadata.poll.timeout.ms 30000

0 commit comments

Comments
 (0)