diff --git a/akka-kryo-serialization/src/main/scala/io/altoo/akka/serialization/kryo/Transformer.scala b/akka-kryo-serialization/src/main/scala/io/altoo/akka/serialization/kryo/Transformer.scala index e98d05f..8628822 100644 --- a/akka-kryo-serialization/src/main/scala/io/altoo/akka/serialization/kryo/Transformer.scala +++ b/akka-kryo-serialization/src/main/scala/io/altoo/akka/serialization/kryo/Transformer.scala @@ -7,7 +7,7 @@ import akka.annotation.InternalApi import javax.crypto.Cipher import javax.crypto.spec.{GCMParameterSpec, SecretKeySpec} -import net.jpountz.lz4.LZ4Factory +import net.jpountz.lz4.{LZ4Exception, LZ4Factory} import scala.collection.mutable @@ -90,10 +90,14 @@ class LZ4KryoCompressor extends Transformer { override def toBinary(inputBuff: Array[Byte], outputBuff: ByteBuffer): Unit = { val inputSize = inputBuff.length val lz4 = lz4factory.fastCompressor - lz4.maxCompressedLength(inputSize) // encode 32 bit length in the first bytes outputBuff.order(ByteOrder.LITTLE_ENDIAN).putInt(inputSize) - lz4.compress(ByteBuffer.wrap(inputBuff), outputBuff) + try { + lz4.compress(ByteBuffer.wrap(inputBuff), outputBuff) + } catch { + case e: LZ4Exception => + throw new RuntimeException(s"Compression failed for input buffer size: ${inputBuff.length} and output buffer size: ${outputBuff.capacity()}", e) + } } override def fromBinary(inputBuff: Array[Byte]): Array[Byte] = { diff --git a/akka-kryo-serialization/src/main/scala/io/altoo/akka/serialization/kryo/serializer/akka/ActorRefSerializer.scala b/akka-kryo-serialization/src/main/scala/io/altoo/akka/serialization/kryo/serializer/akka/ActorRefSerializer.scala index 37ec2aa..c11fdcb 100644 --- a/akka-kryo-serialization/src/main/scala/io/altoo/akka/serialization/kryo/serializer/akka/ActorRefSerializer.scala +++ b/akka-kryo-serialization/src/main/scala/io/altoo/akka/serialization/kryo/serializer/akka/ActorRefSerializer.scala @@ -32,10 +32,12 @@ class ActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[Act override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef]): ActorRef = { val path = input.readString() + println("deserializing actor ref " + typ + "for path: " + path) system.provider.resolveActorRef(path) } override def write(kryo: Kryo, output: Output, obj: ActorRef): Unit = { + println("serializing actor ref " + obj) output.writeAscii(Serialization.serializedActorPath(obj)) } } diff --git a/akka-kryo-serialization/src/test/scala/io/altoo/akka/serialization/kryo/LZ4KryoCompressorTest.scala b/akka-kryo-serialization/src/test/scala/io/altoo/akka/serialization/kryo/LZ4KryoCompressorTest.scala new file mode 100644 index 0000000..e247829 --- /dev/null +++ b/akka-kryo-serialization/src/test/scala/io/altoo/akka/serialization/kryo/LZ4KryoCompressorTest.scala @@ -0,0 +1,19 @@ +package io.altoo.akka.serialization.kryo + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.nio.ByteBuffer + +class LZ4KryoCompressorTest extends AnyFlatSpec with Matchers { + + it should "serialize short message" in { + // arrange + val compressor = new LZ4KryoCompressor() + val data = Seq[Byte](0, 1, 2, 3, 4, 5, 6, 7, 8, 9).toArray + val buffer = ByteBuffer.allocateDirect(2* data.length) + + // act & assert + compressor.toBinary(data, buffer) + } +} diff --git a/akka-kryo-serialization/src/test/scala/io/altoo/testing/SampleMessage.scala b/akka-kryo-serialization/src/test/scala/io/altoo/testing/SampleMessage.scala new file mode 100644 index 0000000..c84fa98 --- /dev/null +++ b/akka-kryo-serialization/src/test/scala/io/altoo/testing/SampleMessage.scala @@ -0,0 +1,5 @@ +package io.altoo.testing + +import akka.actor.ActorRef + +case class SampleMessage(actorRef: ActorRef) extends Serializable