Skip to content

CustomSerializerDeserializer

rollno748 edited this page Sep 30, 2024 · 1 revision

Custom Serializers and Deserializers in Kafka

DI-Kafkameter provides serializers and deserializers for the default types. suach as

1. StringSerializer
2. ByteArraySerializer
3. IntegerSerializer
4. LongSerializer
5. FloatSerializer
6. DoubleSerializer

At time, you need to handle more complex data structures or custom objects. That’s where custom serializers/deserializers comes in.

Below is the sample custom serializer and deserializer code used to evaluate the plugin

MessageDto.java

import com.fasterxml.jackson.annotation.JsonProperty;

public class MessageDto {
    @JsonProperty
    public String message;

    @JsonProperty
    public String version;

    // Default constructor
    public MessageDto() {}

    // Constructor with parameters
    public MessageDto(String message, String version) {
        this.message = message;
        this.version = version;
    }

    // Getters and setters
    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public String getVersion() {
        return version;
    }

    public void setVersion(String version) {
        this.version = version;
    }
}
CustomSerializer.java

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

public class CustomSerializer implements Serializer<MessageDto> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {}

    @Override
    public byte[] serialize(String key, MessageDto data) {
        try {
            if (data == null) {
                System.out.println("Null received at serializing");
                return null;
            }
            System.out.println("Serializing...");
            return this.objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            System.err.println("Error when serializing MessageDto to byte[]: " + e.getMessage());
            throw new SerializationException("Error when serializing MessageDto to byte[]", e);
        }
    }

    @Override
    public void close() {}
}
CustomDeserializer.java

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.nio.charset.StandardCharsets;
import java.util.Map;

public class CustomDeserializer implements Deserializer<MessageDto> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public MessageDto deserialize(String key, byte[] data) {
        try {
            if (data == null){
                System.out.println("Null received at deserializing");
                return null;
            }
            System.out.println("Deserializing...");
            return objectMapper.readValue(new String(data, StandardCharsets.UTF_8), MessageDto.class);
        } catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to MessageDto");
        }
    }

    @Override
    public void close() {
    }
}

The Architectural flow in achievign custom serializer and deserializer

alt text

Steps to achieve serialization

  1. Add a JSR223 Pre-processor to the Producer sampler
  2. Add the code to serilize the message to required format
  3. Store it as variable
  4. Pass the variable to the Producer Sampler

Steps to achieve serialization

  1. Add the Consumer sampler
  2. Add the JSR223 Post-Processor element to the Consumer Sampler
  3. Add the code to deserilize the message from required format to string
  4. View the output in View Results Tree

Example code

JSR223 Serialization code

Set the custom Serilizer package name

alt text

alt text

import io.perfwise.kafka.converters.CustomSerializer;
import io.perfwise.kafka.converters.MessageDto;

MessageDto mDto = new MessageDto();
mDto.message = "New message";
mDto.version = "1";

CustomSerializer serializer = new CustomSerializer();
byte[] serializedData = serializer.serialize("key1", mDto);

vars.put("data", serializedData.toString())

JSR223 DeSerialization code

Set the custom DeSerilizer package name

alt text

import io.perfwise.kafka.converters.CustomDeserializer;
import io.perfwise.kafka.converters.MessageDto;

MessageDto mDto = new MessageDto();

CustomDeserializer deserializer = new CustomDeserializer();
mDto = deserializer.deserialize("key1", prev.responseData());

prev.setResponseData(mDto.toString())