Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix protobuf serde errors #425

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.twitter.elephantbird.hive.serde;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.Descriptors.Descriptor;
import com.twitter.elephantbird.mapreduce.io.ProtobufConverter;
import com.twitter.elephantbird.util.Protobufs;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -31,7 +31,7 @@
*/
public class ProtobufDeserializer implements Deserializer {

private ProtobufConverter<? extends Message> protobufConverter = null;
private Message.Builder msgBuilder;
private ObjectInspector objectInspector;

@Override
Expand All @@ -42,8 +42,7 @@ public void initialize(Configuration job, Properties tbl) throws SerDeException

Class<? extends Message> protobufClass = job.getClassByName(protoClassName)
.asSubclass(Message.class);
protobufConverter = ProtobufConverter.newInstance(protobufClass);

msgBuilder = Protobufs.getMessageBuilder(protobufClass);
Descriptor descriptor = Protobufs.getMessageDescriptor(protobufClass);
objectInspector = new ProtobufStructObjectInspector(descriptor);
} catch (Exception e) {
Expand All @@ -54,7 +53,11 @@ public void initialize(Configuration job, Properties tbl) throws SerDeException
@Override
public Object deserialize(Writable blob) throws SerDeException {
BytesWritable bytes = (BytesWritable) blob;
return protobufConverter.fromBytes(bytes.getBytes(), 0, bytes.getLength());
try {
return msgBuilder.clear().mergeFrom(bytes.getBytes(), 0, bytes.getLength());
} catch (InvalidProtocolBufferException e) {
throw new SerDeException(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.Descriptors.FieldDescriptor.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
Expand All @@ -30,6 +31,21 @@ public static class ProtobufStructField implements StructField {
public ProtobufStructField(FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
oi = this.createOIForField();
comment = fieldDescriptor.getFullName();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof ProtobufStructField) {
ProtobufStructField other = (ProtobufStructField)obj;
return fieldDescriptor.equals(other.fieldDescriptor);
}
return false;
}

@Override
public int hashCode() {
return fieldDescriptor.hashCode();
}

@Override
Expand Down Expand Up @@ -108,6 +124,21 @@ private ObjectInspector createOIForField() {
}
}

@Override
public boolean equals(Object obj) {
if (obj instanceof ProtobufStructObjectInspector) {
ProtobufStructObjectInspector other = (ProtobufStructObjectInspector)obj;
return this.descriptor.equals(other.descriptor) &&
this.structFields.equals(other.structFields);
}
return false;
}

@Override
public int hashCode() {
return descriptor.hashCode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this mix in structFields.hashCode() too? This is still correct, but will have more hash collisions in the case of comparing things with the same descriptor but different structFields (idk if that's common or not)

}

@Override
public Category getCategory() {
return Category.STRUCT;
Expand All @@ -132,15 +163,32 @@ public String getTypeName() {

@Override
public Object create() {
return descriptor.toProto().toBuilder().build();
return DynamicMessage.newBuilder(descriptor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this return a new builder, or a new record? .build() was being called previously.

}

@Override
public Object setStructFieldData(Object data, StructField field, Object fieldValue) {
return ((Message) data)
.toBuilder()
.setField(descriptor.findFieldByName(field.getFieldName()), fieldValue)
.build();
DynamicMessage.Builder builder = (DynamicMessage.Builder)data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see, is the contract that data here will be the result of create()?
(using Object for return types does not lead to an easily understood API :p)

ProtobufStructField psf = (ProtobufStructField)field;
FieldDescriptor fd = psf.getFieldDescriptor();
if (fd.isRepeated()) {
return builder.setField(fd, fieldValue);
}
switch (fd.getType()) {
case ENUM:
builder.setField(fd, fd.getEnumType().findValueByName((String) fieldValue));
break;
case BYTES:
builder.setField(fd, ByteString.copyFrom((byte[])fieldValue));
break;
case MESSAGE:
builder.setField(fd, ((Message.Builder)fieldValue).build());
break;
default:
builder.setField(fd, fieldValue);
break;
}
return builder;
}

@Override
Expand All @@ -153,16 +201,32 @@ public Object getStructFieldData(Object data, StructField structField) {
if (data == null) {
return null;
}
Message m = (Message) data;
Message.Builder builder;
if (data instanceof Message.Builder) {
builder = (Message.Builder)data;
} else if (data instanceof Message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this fix uses only the builder, do you ever expect Message?

builder = ((Message)data).toBuilder();
} else {
throw new RuntimeException("Type Message or Message.Builder expected: " +
data.getClass().getCanonicalName());
}
ProtobufStructField psf = (ProtobufStructField) structField;
FieldDescriptor fieldDescriptor = psf.getFieldDescriptor();
Object result = m.getField(fieldDescriptor);
Object result = builder.getField(fieldDescriptor);

if (fieldDescriptor.isRepeated()) {
return result;
}

if (fieldDescriptor.getType() == Type.ENUM) {
return ((EnumValueDescriptor)result).getName();
}
if (fieldDescriptor.getType() == Type.BYTES && (result instanceof ByteString)) {
return ((ByteString)result).toByteArray();
}
if (fieldDescriptor.getType() == Type.MESSAGE) {
return ((Message)result).toBuilder();
}
return result;
}

Expand All @@ -177,9 +241,8 @@ public List<Object> getStructFieldsDataAsList(Object data) {
return null;
}
List<Object> result = Lists.newArrayList();
Message m = (Message) data;
for (FieldDescriptor fd : descriptor.getFields()) {
result.add(m.getField(fd));
for (StructField field : structFields) {
result.add(getStructFieldData(data, field));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.twitter.elephantbird.hive.serde;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -61,7 +62,7 @@ public void setUp() throws SerDeException {
@Test
public final void testDeserializer() throws SerDeException {
BytesWritable serialized = new BytesWritable(test_ab.toByteArray());
AddressBook ab2 = (AddressBook) deserializer.deserialize(serialized);
AddressBook ab2 = ((AddressBook.Builder) deserializer.deserialize(serialized)).build();
assertTrue(test_ab.equals(ab2));
}

Expand All @@ -71,12 +72,13 @@ public final void testObjectInspector() throws SerDeException {
assertEquals(oi.getCategory(), Category.STRUCT);

ProtobufStructObjectInspector protobufOI = (ProtobufStructObjectInspector) oi;
List<Object> readData = protobufOI.getStructFieldsDataAsList(test_ab);

List<Object> readData = protobufOI.getStructFieldsDataAsList(test_ab.toBuilder());

assertEquals(readData.size(), 2);
@SuppressWarnings("unchecked")
ByteString byteStr = (ByteString)readData.get(1);
assertEquals(byteStr, ByteString.copyFrom(new byte[] {16,32,64,(byte) 128}));
byte[] byteStr = (byte[])readData.get(1);
assertArrayEquals(new byte[] {16,32,64,(byte) 128}, byteStr);
List<Person> persons = (List<Person>) readData.get(0);
assertEquals(persons.size(), 3);
assertEquals(persons.get(0).getPhoneCount(), 3);
Expand All @@ -101,7 +103,7 @@ public final void testObjectInspectorGetStructFieldData() throws SerDeException
private void checkFields(List<FieldDescriptor> fields, Message message) {
for (FieldDescriptor fieldDescriptor : fields) {
ProtobufStructField psf = new ProtobufStructField(fieldDescriptor);
Object data = protobufOI.getStructFieldData(message, psf);
Object data = protobufOI.getStructFieldData(message.toBuilder(), psf);
Object target = message.getField(fieldDescriptor);
if (fieldDescriptor.getType() == Type.ENUM) {
assertEquals(String.class, data.getClass());
Expand Down