Skip to content

Commit

Permalink
CC-7648: add ORC format with Hive (#484)
Browse files Browse the repository at this point in the history
* Support ORC Format w/Hive

Signed-off-by: Lev Zemlyanov <[email protected]>

Co-authored-by: buom <[email protected]>
  • Loading branch information
Lev Zemlyanov and buom authored Feb 11, 2020
1 parent 5e0d21b commit 02d82fe
Show file tree
Hide file tree
Showing 18 changed files with 1,248 additions and 17 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<suppress
checks="ClassDataAbstractionCoupling"
files="(DataWriter|WALFile).java"
files="(DataWriter|WALFile|OrcUtil).java"
/>

<suppress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.connect.hdfs.avro;

import javax.annotation.Nonnull;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
Expand Down Expand Up @@ -53,6 +54,7 @@ public Schema getSchema(HdfsSinkConnectorConfig conf, Path path) {
}
}

@Override
public boolean hasNext() {
throw new UnsupportedOperationException();
}
Expand All @@ -65,6 +67,8 @@ public void remove() {
throw new UnsupportedOperationException();
}

@Override
@Nonnull
public Iterator<Object> iterator() {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.connect.hdfs.avro;

import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.storage.format.RecordWriter;
import java.io.OutputStream;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
Expand Down Expand Up @@ -51,13 +52,10 @@ public String getExtension() {
}

@Override
public io.confluent.connect.storage.format.RecordWriter getRecordWriter(
final HdfsSinkConnectorConfig conf,
final String filename
) {
return new io.confluent.connect.storage.format.RecordWriter() {
public RecordWriter getRecordWriter(HdfsSinkConnectorConfig conf, String filename) {
return new RecordWriter() {
final DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>());
Schema schema = null;
Schema schema;

@Override
public void write(SinkRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.connect.hdfs.json;

import javax.annotation.Nonnull;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.connect.data.Schema;

Expand All @@ -37,6 +38,7 @@ public Collection<Object> readData(HdfsSinkConnectorConfig conf, Path path) {
throw new UnsupportedOperationException();
}

@Override
public boolean hasNext() {
throw new UnsupportedOperationException();
}
Expand All @@ -49,6 +51,8 @@ public void remove() {
throw new UnsupportedOperationException();
}

@Override
@Nonnull
public Iterator<Object> iterator() {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public String getExtension() {
}

@Override
public RecordWriter getRecordWriter(final HdfsSinkConnectorConfig conf, final String filename) {
public RecordWriter getRecordWriter(HdfsSinkConnectorConfig conf, String filename) {
try {
return new RecordWriter() {
final OutputStream out = storage.create(filename, true);
Expand Down
122 changes: 122 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/orc/OrcFileReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.hdfs.orc;

import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.storage.format.SchemaFileReader;
import javax.annotation.Nonnull;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;

public class OrcFileReader implements SchemaFileReader<HdfsSinkConnectorConfig, Path> {

private static final Logger log = LoggerFactory.getLogger(OrcFileReader.class);

@Override
public Schema getSchema(HdfsSinkConnectorConfig conf, Path path) {
try {
log.debug("Opening ORC record reader for: {}", path);

ReaderOptions readerOptions = new ReaderOptions(conf.getHadoopConfiguration());
Reader reader = OrcFile.createReader(path, readerOptions);

if (reader.getObjectInspector().getCategory() == ObjectInspector.Category.STRUCT) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("record").version(1);
StructObjectInspector objectInspector = (StructObjectInspector) reader.getObjectInspector();

for (StructField schema : objectInspector.getAllStructFieldRefs()) {
ObjectInspector fieldObjectInspector = schema.getFieldObjectInspector();
String typeName = fieldObjectInspector.getTypeName();
Schema.Type schemaType;

switch (fieldObjectInspector.getCategory()) {
case PRIMITIVE:
PrimitiveTypeEntry typeEntry = PrimitiveObjectInspectorUtils
.getTypeEntryFromTypeName(typeName);
if (java.sql.Date.class.isAssignableFrom(typeEntry.primitiveJavaClass)) {
schemaType = Date.SCHEMA.type();
} else if (java.sql.Timestamp.class.isAssignableFrom(typeEntry.primitiveJavaClass)) {
schemaType = Timestamp.SCHEMA.type();
} else {
schemaType = ConnectSchema.schemaType(typeEntry.primitiveJavaClass);
}
break;
case LIST:
schemaType = Schema.Type.ARRAY;
break;
case MAP:
schemaType = Schema.Type.MAP;
break;
default:
throw new DataException("Unknown type " + fieldObjectInspector.getCategory().name());
}

schemaBuilder.field(schema.getFieldName(), SchemaBuilder.type(schemaType).build());
}

return schemaBuilder.build();
} else {
throw new ConnectException(
"Top level type must be of type STRUCT, but was "
+ reader.getObjectInspector().getCategory().name()
);
}
} catch (IOException e) {
throw new ConnectException("Failed to get schema for file " + path, e);
}
}

@Override
public boolean hasNext() {
throw new UnsupportedOperationException();
}

public Object next() {
throw new UnsupportedOperationException();
}

public void remove() {
throw new UnsupportedOperationException();
}

@Override
@Nonnull
public Iterator<Object> iterator() {
throw new UnsupportedOperationException();
}

public void close() {
}
}
45 changes: 45 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/orc/OrcFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.hdfs.orc;

import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.storage.format.Format;
import io.confluent.connect.storage.format.RecordWriterProvider;
import io.confluent.connect.storage.format.SchemaFileReader;
import io.confluent.connect.storage.hive.HiveFactory;
import org.apache.hadoop.fs.Path;

public class OrcFormat implements Format<HdfsSinkConnectorConfig, Path> {

// DO NOT change this signature, it is required for instantiation via reflection
public OrcFormat(HdfsStorage storage) { }

@Override
public RecordWriterProvider<HdfsSinkConnectorConfig> getRecordWriterProvider() {
return new OrcRecordWriterProvider();
}

@Override
public SchemaFileReader<HdfsSinkConnectorConfig, Path> getSchemaFileReader() {
return new OrcFileReader();
}

@Override
public HiveFactory getHiveFactory() {
return new OrcHiveFactory();
}
}
30 changes: 30 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/orc/OrcHiveFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.hdfs.orc;

import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.storage.hive.HiveFactory;
import io.confluent.connect.storage.hive.HiveMetaStore;
import io.confluent.connect.storage.hive.HiveUtil;
import org.apache.kafka.common.config.AbstractConfig;

public class OrcHiveFactory implements HiveFactory {

@Override
public HiveUtil createHiveUtil(AbstractConfig conf, HiveMetaStore hiveMetaStore) {
return new OrcHiveUtil((HdfsSinkConnectorConfig) conf, hiveMetaStore);
}
}
105 changes: 105 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/orc/OrcHiveUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.hdfs.orc;

import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.storage.errors.HiveMetaStoreException;
import io.confluent.connect.storage.hive.HiveMetaStore;
import io.confluent.connect.storage.hive.HiveSchemaConverter;
import io.confluent.connect.storage.hive.HiveUtil;
import io.confluent.connect.storage.partitioner.Partitioner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.kafka.connect.data.Schema;

import java.util.List;

public class OrcHiveUtil extends HiveUtil {
private final HdfsSinkConnectorConfig config;

public OrcHiveUtil(HdfsSinkConnectorConfig config, HiveMetaStore hiveMetaStore) {
super(config, hiveMetaStore);
this.config = config;
}

@Override
public void alterSchema(String database, String tableName, Schema schema) {
Table table = hiveMetaStore.getTable(database, tableName);
List<FieldSchema> columns = HiveSchemaConverter.convertSchema(schema);
table.setFields(columns);
hiveMetaStore.alterTable(table);
}

@Override
public void createTable(
String database,
String tableName,
Schema schema,
Partitioner<FieldSchema> partitioner
) throws HiveMetaStoreException {
Table table = constructOrcTable(database, tableName, schema, partitioner);
hiveMetaStore.createTable(table);
}

private Table constructOrcTable(
String database,
String tableName,
Schema schema,
Partitioner<FieldSchema> partitioner
) throws HiveMetaStoreException {

Table table = newTable(database, tableName);
table.setTableType(TableType.EXTERNAL_TABLE);
table.getParameters().put("EXTERNAL", "TRUE");

// tableName is always the topic name
String tablePath = hiveDirectoryName(url, config.getTopicsDirFromTopic(tableName), tableName);
table.setDataLocation(new Path(tablePath));
table.setSerializationLib(getHiveOrcSerde());

try {
table.setInputFormatClass(getHiveOrcInputFormat());
table.setOutputFormatClass(getHiveOrcOutputFormat());
} catch (HiveException e) {
throw new HiveMetaStoreException("Cannot find input/output format:", e);
}

// convert Connect schema schema to Hive columns
List<FieldSchema> columns = HiveSchemaConverter.convertSchema(schema);
table.setFields(columns);
table.setPartCols(partitioner.partitionFields());
return table;
}

private String getHiveOrcInputFormat() {
return OrcInputFormat.class.getName();
}

private String getHiveOrcOutputFormat() {
return OrcOutputFormat.class.getName();
}

private String getHiveOrcSerde() {
return OrcSerde.class.getName();
}

}
Loading

0 comments on commit 02d82fe

Please sign in to comment.