Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 45: Split serializers into format specific libraries #84

Merged
merged 16 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 109 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -215,17 +215,117 @@ def getProjectVersion() {
return ver
}

project('serializers') {
project('serializers:shared') {
dependencies {
compile project(':common')
compile project(':client')
compile group: 'io.pravega', name: 'pravega-client', version: pravegaVersion
compile group: 'org.xerial.snappy', name: 'snappy-java', version: snappyVersion
testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
}

javadoc {
title = "Serializers common"
shiveshr marked this conversation as resolved.
Show resolved Hide resolved
dependsOn delombok
source = delombok.outputDir
failOnError = true
exclude "**/impl/**";
options.addBooleanOption("Xdoclint:all,-reference", true)
}
}

project('serializers:avro') {
dependencies {
compile project(':serializers:shared')
compile group: 'org.apache.avro', name: 'avro', version: avroVersion
testCompile project(':serializers:shared')
testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
}

javadoc {
title = "Serializers avro"
dependsOn delombok
source = delombok.outputDir
failOnError = true
exclude "**/impl/**";
options.addBooleanOption("Xdoclint:all,-reference", true)
}

jar {
from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
}
}

project('serializers:protobuf') {
dependencies {
compile project(':serializers:shared')
compile group: 'com.google.protobuf', name: 'protobuf-java', version: protobufProtocVersion
compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: protobufUtilVersion
compile group: 'io.pravega', name: 'pravega-client', version: pravegaVersion
compile group: 'org.xerial.snappy', name: 'snappy-java', version: snappyVersion
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jsonSchema', version: jacksonVersion
testCompile project(':serializers:shared')
testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
}

javadoc {
title = "Serializers protobuf"
dependsOn delombok
source = delombok.outputDir
failOnError = true
exclude "**/impl/**";
options.addBooleanOption("Xdoclint:all,-reference", true)
}

jar {
from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
}
}

project('serializers:json') {
dependencies {
compile project(':serializers:shared')
compile group: 'com.github.everit-org.json-schema', name: 'org.everit.json.schema', version: everitVersion
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jsonSchema', version: jacksonVersion
testCompile project(':serializers:shared')
testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
}

javadoc {
title = "Serializers json"
dependsOn delombok
source = delombok.outputDir
failOnError = true
exclude "**/impl/**";
options.addBooleanOption("Xdoclint:all,-reference", true)
}

jar {
from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
}
}

project('serializers') {
ravisharda marked this conversation as resolved.
Show resolved Hide resolved
dependencies {
compile project(':serializers:avro')
compile project(':serializers:protobuf')
compile project(':serializers:json')
compile group: 'org.xerial.snappy', name: 'snappy-java', version: snappyVersion
testCompile project(':serializers:shared')
testCompile project(':serializers:avro')
testCompile project(':serializers:protobuf')
testCompile project(':serializers:json')
testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
Expand All @@ -241,7 +341,7 @@ project('serializers') {
}

jar {
manifest {}
zip64=true
shiveshr marked this conversation as resolved.
Show resolved Hide resolved
shiveshr marked this conversation as resolved.
Show resolved Hide resolved

from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
Expand Down Expand Up @@ -393,6 +493,10 @@ task publishAllJars() {
dependsOn ':common:publish'
dependsOn ':contract:publish'
dependsOn ':server:publish'
dependsOn ':serializers:shared:publish'
dependsOn ':serializers:avro:publish'
dependsOn ':serializers:json:publish'
dependsOn ':serializers:protobuf:publish'
dependsOn ':serializers:publish'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import io.pravega.client.stream.Serializer;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.common.Either;
import io.pravega.schemaregistry.contract.data.EncodingInfo;
import io.pravega.schemaregistry.schemas.AvroSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.stream.Collectors;

Expand All @@ -27,8 +30,21 @@
* Internal Factory class for Avro serializers and deserializers.
*/
@Slf4j
class AvroSerializerFactory {
static <T> Serializer<T> serializer(SerializerConfig config, AvroSchema<T> schema) {
public class AvroSerializerFactory {
/**
* Creates a typed avro serializer for the Schema. The serializer implementation returned from this method is
* responsible for interacting with schema registry service and ensures that only valid registered schema can be used.
*
* Note: the returned serializer only implements {@link Serializer#serialize(Object)}.
* It does not implement {@link Serializer#deserialize(ByteBuffer)}.
*
* @param config Serializer Config used for instantiating a new serializer.
* @param schema Schema container that encapsulates an AvroSchema
* @param <T> Type of event. It accepts either POJO or Avro generated classes and serializes them.
* @return A Serializer Implementation that can be used in {@link io.pravega.client.stream.EventStreamWriter} or
* {@link io.pravega.client.stream.TransactionalEventStreamWriter}.
*/
public static <T> Serializer<T> serializer(SerializerConfig config, AvroSchema<T> schema) {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(schema);
Preconditions.checkArgument(config.isWriteEncodingHeader(), "Events should be tagged with encoding ids.");
Expand All @@ -37,7 +53,20 @@ static <T> Serializer<T> serializer(SerializerConfig config, AvroSchema<T> schem
return new AvroSerializer<>(groupId, schemaRegistryClient, schema, config.getEncoder(), config.isRegisterSchema());
}

static <T> Serializer<T> deserializer(SerializerConfig config, AvroSchema<T> schema) {
/**
* Creates a typed avro deserializer for the Schema. The deserializer implementation returned from this method is
* responsible for interacting with schema registry service and validate the writer schema before using it.
*
* Note: the returned serializer only implements {@link Serializer#deserialize(ByteBuffer)}.
* It does not implement {@link Serializer#serialize(Object)}.
*
* @param config Serializer Config used for instantiating a new serializer.
* @param schema Schema container that encapsulates an AvroSchema
* @param <T> Type of event. The typed event should be an avro generated class. For generic type use
* {@link #genericDeserializer(SerializerConfig, AvroSchema)}
* @return A deserializer Implementation that can be used in {@link io.pravega.client.stream.EventStreamReader}.
*/
public static <T> Serializer<T> deserializer(SerializerConfig config, AvroSchema<T> schema) {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(schema);
Preconditions.checkArgument(config.isWriteEncodingHeader(), "Events should be tagged with encoding ids.");
Expand All @@ -49,7 +78,19 @@ static <T> Serializer<T> deserializer(SerializerConfig config, AvroSchema<T> sch
return new AvroDeserializer<>(groupId, schemaRegistryClient, schema, config.getDecoders(), encodingCache);
}

static Serializer<Object> genericDeserializer(SerializerConfig config, @Nullable AvroSchema<Object> schema) {
/**
* Creates a generic avro deserializer. It has the optional parameter for schema.
* If the schema is not supplied, the writer schema is used for deserialization into {@link GenericRecord}.
*
* Note: the returned serializer only implements {@link Serializer#deserialize(ByteBuffer)}.
* It does not implement {@link Serializer#serialize(Object)}.
*
* @param config Serializer Config used for instantiating a new serializer.
* @param schema Schema container that encapsulates an AvroSchema. It can be null to indicate that writer schema should
* be used for deserialization.
* @return A deserializer Implementation that can be used in {@link io.pravega.client.stream.EventStreamReader}.
*/
public static Serializer<Object> genericDeserializer(SerializerConfig config, @Nullable AvroSchema<Object> schema) {
Preconditions.checkNotNull(config);
Preconditions.checkArgument(config.isWriteEncodingHeader(), "Events should be tagged with encoding ids.");
String groupId = config.getGroupId();
Expand All @@ -59,7 +100,15 @@ static Serializer<Object> genericDeserializer(SerializerConfig config, @Nullable
return new AvroGenericDeserializer(groupId, schemaRegistryClient, schema, config.getDecoders(), encodingCache);
}

static <T> Serializer<T> multiTypeSerializer(SerializerConfig config, Map<Class<? extends T>, AvroSchema<T>> schemas) {
/**
* A multiplexed Avro serializer that takes a map of schemas and validates them individually.
*
* @param config Serializer config.
* @param schemas map of avro schemas.
* @param <T> Base Type of schemas.
* @return a Serializer which can serialize events of different types for which schemas are supplied.
*/
public static <T> Serializer<T> multiTypeSerializer(SerializerConfig config, Map<Class<? extends T>, AvroSchema<T>> schemas) {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(schemas);
Preconditions.checkArgument(config.isWriteEncodingHeader(), "Events should be tagged with encoding ids.");
Expand All @@ -73,7 +122,16 @@ static <T> Serializer<T> multiTypeSerializer(SerializerConfig config, Map<Class<
return new MultiplexedSerializer<>(serializerMap);
}

static <T> Serializer<T> multiTypeDeserializer(
/**
* A multiplexed Avro Deserializer that takes a map of schemas and deserializes events into those events depending
* on the object type information in {@link EncodingInfo}.
*
* @param config Serializer config.
* @param schemas map of avro schemas.
* @param <T> Base type of schemas.
* @return a Deserializer which can deserialize events of different types in the stream into typed objects.
*/
public static <T> Serializer<T> multiTypeDeserializer(
SerializerConfig config, Map<Class<? extends T>, AvroSchema<T>> schemas) {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(schemas);
Expand All @@ -91,7 +149,17 @@ static <T> Serializer<T> multiTypeDeserializer(
encodingCache);
}

static <T> Serializer<Either<T, Object>> typedOrGenericDeserializer(
/**
* A multiplexed Avro Deserializer that takes a map of schemas and deserializes events into those events depending
* on the object type information in {@link EncodingInfo}.
*
* @param config Serializer config.
* @param schemas map of avro schemas.
* @param <T> Base type of schemas.
* @return a Deserializer which can deserialize events of different types in the stream into typed objects or a generic
* object
*/
public static <T> Serializer<Either<T, Object>> typedOrGenericDeserializer(
SerializerConfig config, Map<Class<? extends T>, AvroSchema<T>> schemas) {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(schemas);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.schemaregistry.schemas;

import io.pravega.schemaregistry.contract.data.SerializationFormat;
import io.pravega.schemaregistry.testobjs.SchemaDefinitions;
import io.pravega.schemaregistry.testobjs.User;
import io.pravega.schemaregistry.testobjs.generated.Test1;
import io.pravega.schemaregistry.testobjs.generated.Test2;
import org.apache.avro.specific.SpecificRecordBase;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class SchemasTest {
@Test
public void testAvroSchema() {
AvroSchema<Object> schema = AvroSchema.of(SchemaDefinitions.SCHEMA1);
assertNotNull(schema.getSchema());
assertEquals(schema.getSchemaInfo().getSerializationFormat(), SerializationFormat.Avro);

AvroSchema<User> schema2 = AvroSchema.of(User.class);
assertNotNull(schema2.getSchema());
assertEquals(schema2.getSchemaInfo().getSerializationFormat(), SerializationFormat.Avro);

AvroSchema<Test1> schema3 = AvroSchema.of(Test1.class);
assertNotNull(schema3.getSchema());
assertEquals(schema3.getSchemaInfo().getSerializationFormat(), SerializationFormat.Avro);

AvroSchema<SpecificRecordBase> schemabase1 = AvroSchema.ofSpecificRecord(Test1.class);
assertNotNull(schemabase1.getSchema());
assertEquals(schemabase1.getSchemaInfo().getSerializationFormat(), SerializationFormat.Avro);

AvroSchema<SpecificRecordBase> schemabase2 = AvroSchema.ofSpecificRecord(Test2.class);
assertNotNull(schemabase2.getSchema());
assertEquals(schemabase2.getSchemaInfo().getSerializationFormat(), SerializationFormat.Avro);
}
}
Loading