Skip to content

Commit 79cb4e2

Browse files
authored
Look up logical type for decimal values and use correct scale parquet codec (#5990)
Signed-off-by: Taylor Gray <[email protected]>
1 parent 3438936 commit 79cb4e2

File tree

2 files changed

+128
-16
lines changed

2 files changed

+128
-16
lines changed

data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/GenericRecordJsonEncoder.java

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package org.opensearch.dataprepper.plugins.codec.parquet;
66

77
import java.math.BigDecimal;
8+
import java.math.BigInteger;
89
import java.nio.ByteBuffer;
910
import java.nio.charset.StandardCharsets;
1011
import java.util.Collection;
@@ -15,6 +16,7 @@
1516
import java.util.function.Function;
1617

1718
import org.apache.avro.LogicalType;
19+
import org.apache.avro.LogicalTypes;
1820
import org.apache.avro.Schema;
1921
import org.apache.avro.generic.GenericContainer;
2022
import org.apache.avro.generic.GenericData;
@@ -50,7 +52,7 @@ public Function<Object, Object> getLogicalTypeConverter(LogicalType logicalType)
5052

5153
public String serialize(GenericRecord value) {
5254
StringBuilder buffer = new StringBuilder();
53-
serialize(value, buffer, new IdentityHashMap<>(128) );
55+
serialize(value, buffer, new IdentityHashMap<>(128), null);
5456
String result = buffer.toString();
5557
return result;
5658
}
@@ -59,7 +61,9 @@ public String serialize(GenericRecord value) {
5961
" \">>> CIRCULAR REFERENCE CANNOT BE PUT IN JSON STRING, ABORTING RECURSION <<<\" ";
6062

6163
/** Renders a Java datum as <a href="http://www.json.org/">JSON</a>. */
62-
private void serialize(final Object datum, final StringBuilder buffer, final IdentityHashMap<Object, Object> seenObjects) {
64+
private void serialize(final Object datum, final StringBuilder buffer,
65+
final IdentityHashMap<Object, Object> seenObjects,
66+
final Integer decimalScale) {
6367
if (isRecord(datum)) {
6468
if (seenObjects.containsKey(datum)) {
6569
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
@@ -70,10 +74,32 @@ private void serialize(final Object datum, final StringBuilder buffer, final Ide
7074
int count = 0;
7175
Schema schema = getRecordSchema(datum);
7276
for (Schema.Field f : schema.getFields()) {
73-
serialize(f.name(), buffer, seenObjects);
77+
serialize(f.name(), buffer, seenObjects, null); // field name
7478
buffer.append(": ");
79+
7580
Function<Object, Object> logicalTypeConverter = getLogicalTypeConverter(f);
76-
serialize(logicalTypeConverter.apply(getField(datum, f.name(), f.pos())), buffer, seenObjects);
81+
82+
boolean serializedDecimal = false;
83+
Schema fieldSchema = f.schema();
84+
if (fieldSchema.getType() == Schema.Type.UNION) {
85+
for (Schema s : fieldSchema.getTypes()) {
86+
if (s.getType() != Schema.Type.NULL) {
87+
if (s.getType() == Schema.Type.BYTES &&
88+
s.getLogicalType() instanceof LogicalTypes.Decimal) {
89+
serialize(logicalTypeConverter.apply(getField(datum, f.name(), f.pos())), buffer, seenObjects, ((LogicalTypes.Decimal) s.getLogicalType()).getScale());
90+
serializedDecimal = true;
91+
break;
92+
}
93+
}
94+
}
95+
} else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
96+
serialize(logicalTypeConverter.apply(getField(datum, f.name(), f.pos())), buffer, seenObjects, ((LogicalTypes.Decimal) fieldSchema.getLogicalType()).getScale());
97+
serializedDecimal = true;
98+
}
99+
100+
if (!serializedDecimal) {
101+
serialize(logicalTypeConverter.apply(getField(datum, f.name(), f.pos())), buffer, seenObjects, null);
102+
}
77103
if (++count < schema.getFields().size())
78104
buffer.append(", ");
79105
}
@@ -90,7 +116,7 @@ private void serialize(final Object datum, final StringBuilder buffer, final Ide
90116
long last = array.size()-1;
91117
int i = 0;
92118
for (Object element : array) {
93-
serialize(element, buffer, seenObjects);
119+
serialize(element, buffer, seenObjects, null);
94120
if (i++ < last)
95121
buffer.append(", ");
96122
}
@@ -107,9 +133,9 @@ private void serialize(final Object datum, final StringBuilder buffer, final Ide
107133
@SuppressWarnings(value="unchecked")
108134
Map<Object,Object> map = (Map<Object,Object>)datum;
109135
for (Map.Entry<Object,Object> entry : map.entrySet()) {
110-
serialize(entry.getKey(), buffer, seenObjects);
136+
serialize(entry.getKey(), buffer, seenObjects, null);
111137
buffer.append(": ");
112-
serialize(entry.getValue(), buffer, seenObjects);
138+
serialize(entry.getValue(), buffer, seenObjects, null);
113139
if (++count < map.size())
114140
buffer.append(", ");
115141
}
@@ -120,15 +146,24 @@ private void serialize(final Object datum, final StringBuilder buffer, final Ide
120146
writeEscapedString(datum.toString(), buffer);
121147
buffer.append("\"");
122148
} else if (isBytes(datum)) {
123-
final String bytesAsString = StandardCharsets.UTF_8.decode((ByteBuffer) datum).toString();
124-
final Optional<BigDecimal> bytesAsBigDecimal = getBigDecimal(bytesAsString);
125-
if (bytesAsBigDecimal.isPresent()) {
126-
buffer.append(bytesAsBigDecimal.get().doubleValue());
149+
if (decimalScale != null) {
150+
ByteBuffer sourceBuffer = (ByteBuffer) datum;
151+
byte[] bytesArray = new byte[sourceBuffer.remaining()];
152+
sourceBuffer.duplicate().get(bytesArray);
153+
BigInteger unscaledValue = new BigInteger(bytesArray);
154+
BigDecimal decimal = new BigDecimal(unscaledValue, decimalScale);
155+
buffer.append(decimal.doubleValue());
127156
} else {
128-
buffer.append("{\"bytes\": \"");
129-
ByteBuffer bytes = ((ByteBuffer) datum).duplicate();
130-
writeEscapedString(new String(bytes.array(), StandardCharsets.ISO_8859_1), buffer);
131-
buffer.append("\"}");
157+
final String bytesAsString = StandardCharsets.UTF_8.decode((ByteBuffer) datum).toString();
158+
final Optional<BigDecimal> bytesAsBigDecimal = getBigDecimal(bytesAsString);
159+
if (bytesAsBigDecimal.isPresent()) {
160+
buffer.append(bytesAsBigDecimal.get().doubleValue());
161+
} else {
162+
buffer.append("{\"bytes\": \"");
163+
ByteBuffer bytes = ((ByteBuffer) datum).duplicate();
164+
writeEscapedString(new String(bytes.array(), StandardCharsets.ISO_8859_1), buffer);
165+
buffer.append("\"}");
166+
}
132167
}
133168
} else if (((datum instanceof Float) && // quote Nan & Infinity
134169
(((Float)datum).isInfinite() || ((Float)datum).isNaN()))
@@ -143,7 +178,7 @@ private void serialize(final Object datum, final StringBuilder buffer, final Ide
143178
return;
144179
}
145180
seenObjects.put(datum, datum);
146-
serialize(datum, buffer, seenObjects);
181+
serialize(datum, buffer, seenObjects, null);
147182
seenObjects.remove(datum);
148183
} else {
149184
// This fallback is the reason why GenericRecord toString does not

data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/GenericRecordJsonEncoderTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.junit.jupiter.params.ParameterizedTest;
99
import org.junit.jupiter.params.provider.ValueSource;
1010

11+
import java.math.BigDecimal;
1112
import java.nio.ByteBuffer;
1213
import java.nio.charset.StandardCharsets;
1314
import java.time.Instant;
@@ -239,4 +240,80 @@ void registerLogicalTypeConverter_WithLogicalType_ConvertsValueUsingConverter()
239240
assertEquals(expectedJson, json);
240241
}
241242

243+
@Test
244+
void serialize_WithDecimalLogicalType_UsesScaleFromSchema() {
245+
Schema decimalSchema = new Schema.Parser().parse(
246+
"{ \"type\": \"record\", \"name\": \"DecimalRecord\", \"fields\": [" +
247+
"{\"name\": \"amount\", \"type\": [\"null\", {\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}]}" +
248+
"] }"
249+
);
250+
251+
GenericRecord record = new GenericData.Record(decimalSchema);
252+
253+
BigDecimal value = new BigDecimal("12.34").setScale(2);
254+
byte[] decimalBytes = value.unscaledValue().toByteArray();
255+
record.put("amount", ByteBuffer.wrap(decimalBytes));
256+
257+
String json = encoder.serialize(record);
258+
259+
// Should output the scaled decimal number (double form) from schema
260+
assertEquals("{\"amount\": 12.34}", json);
261+
}
262+
263+
@Test
264+
void serialize_WithNonNullableDecimalLogicalType_UsesScaleFromSchema() {
265+
Schema decimalSchema = new Schema.Parser().parse(
266+
"{ \"type\": \"record\", \"name\": \"DecimalRecord\", \"fields\": [" +
267+
"{\"name\": \"amount\", \"type\": {\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}" +
268+
"] }"
269+
);
270+
271+
GenericRecord record = new GenericData.Record(decimalSchema);
272+
273+
BigDecimal value = new BigDecimal("12.34").setScale(2);
274+
byte[] decimalBytes = value.unscaledValue().toByteArray();
275+
record.put("amount", ByteBuffer.wrap(decimalBytes));
276+
277+
String json = encoder.serialize(record);
278+
279+
// Should output the scaled decimal number (double form) from schema
280+
assertEquals("{\"amount\": 12.34}", json);
281+
}
282+
283+
@Test
284+
void serialize_WithDecimalLogicalType_UsesScaleFromSchema_with_scale_three() {
285+
Schema decimalSchema = new Schema.Parser().parse(
286+
"{ \"type\": \"record\", \"name\": \"DecimalRecord\", \"fields\": [" +
287+
"{\"name\": \"amount\", \"type\": [\"null\", {\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":3}]}" +
288+
"] }"
289+
);
290+
291+
GenericRecord record = new GenericData.Record(decimalSchema);
292+
293+
BigDecimal value = new BigDecimal("12.345").setScale(3);
294+
byte[] decimalBytes = value.unscaledValue().toByteArray();
295+
record.put("amount", ByteBuffer.wrap(decimalBytes));
296+
297+
String json = encoder.serialize(record);
298+
299+
// Should output the scaled decimal number (double form) from schema
300+
assertEquals("{\"amount\": 12.345}", json);
301+
}
302+
303+
@Test
304+
void serialize_WithNullDecimalLogicalType_ReturnsNull() {
305+
Schema decimalSchema = new Schema.Parser().parse(
306+
"{ \"type\": \"record\", \"name\": \"DecimalRecord\", \"fields\": [" +
307+
"{\"name\": \"amount\", \"type\": [\"null\", {\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}]}" +
308+
"] }"
309+
);
310+
311+
GenericRecord record = new GenericData.Record(decimalSchema);
312+
record.put("amount", null);
313+
314+
String json = encoder.serialize(record);
315+
316+
assertEquals("{\"amount\": null}", json);
317+
}
318+
242319
}

0 commit comments

Comments
 (0)