diff --git a/akka-kryo-serialization-pekko-compat/src/main/scala/io/altoo/akka/serialization/kryo/compat/PekkoCompatKryoInitializer.scala b/akka-kryo-serialization-pekko-compat/src/main/scala/io/altoo/akka/serialization/kryo/compat/PekkoCompatKryoInitializer.scala new file mode 100644 index 0000000..2d6b3c5 --- /dev/null +++ b/akka-kryo-serialization-pekko-compat/src/main/scala/io/altoo/akka/serialization/kryo/compat/PekkoCompatKryoInitializer.scala @@ -0,0 +1,20 @@ +package io.altoo.akka.serialization.kryo.compat + +import akka.actor.ExtendedActorSystem +import io.altoo.akka.serialization.kryo.DefaultKryoInitializer +import io.altoo.akka.serialization.kryo.compat.serializer.CompatActorRefSerializer +import io.altoo.akka.serialization.kryo.serializer.akka.ByteStringSerializer +import io.altoo.akka.serialization.kryo.serializer.scala.ScalaKryo + +class PekkoCompatKryoInitializer extends DefaultKryoInitializer { + + override def initAkkaSerializer(kryo: ScalaKryo, system: ExtendedActorSystem): Unit = { + super.initAkkaSerializer(kryo, system) + + // registering dummy Akka ActorRef to provide wire compatibility + kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.ActorRef], new CompatActorRefSerializer(system)) + kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.RepointableActorRef], new CompatActorRefSerializer(system)) + // registering dummy Akka ByteString to provide wire compatibility + kryo.addDefaultSerializer(classOf[org.apache.pekko.util.ByteString], classOf[ByteStringSerializer]) + } +} diff --git a/akka-kryo-serialization-pekko-compat/src/main/scala/io/altoo/akka/serialization/kryo/compat/TypedPekkoCompatKryoInitializer.scala b/akka-kryo-serialization-pekko-compat/src/main/scala/io/altoo/akka/serialization/kryo/compat/TypedPekkoCompatKryoInitializer.scala new file mode 100644 index 0000000..279c366 --- /dev/null +++ b/akka-kryo-serialization-pekko-compat/src/main/scala/io/altoo/akka/serialization/kryo/compat/TypedPekkoCompatKryoInitializer.scala @@ -0,0 +1,23 @@ +package io.altoo.akka.serialization.kryo.compat + +import akka.actor.{ExtendedActorSystem, typed} +import io.altoo.akka.serialization.kryo.compat.serializer.{CompatActorRefSerializer, CompatTypedActorRefSerializer} +import io.altoo.akka.serialization.kryo.serializer.akka.ByteStringSerializer +import io.altoo.akka.serialization.kryo.serializer.scala.ScalaKryo +import io.altoo.akka.serialization.kryo.typed.TypedKryoInitializer + +class TypedPekkoCompatKryoInitializer extends TypedKryoInitializer { + + override def initAkkaSerializer(kryo: ScalaKryo, typedSystem: typed.ActorSystem[Nothing]): Unit = { + super.initAkkaSerializer(kryo, typedSystem) + + // registering dummy Akka ActorRef to provide wire compatibility + kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.ActorRef], new CompatActorRefSerializer(typedSystem.classicSystem.asInstanceOf[ExtendedActorSystem])) + kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.RepointableActorRef], new CompatActorRefSerializer(typedSystem.classicSystem.asInstanceOf[ExtendedActorSystem])) + // registering dummy Akka ByteString to provide wire compatibility + kryo.addDefaultSerializer(classOf[org.apache.pekko.util.ByteString], classOf[ByteStringSerializer]) + + // registering dummy Akka ActorRef to provide wire compatibility + kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.typed.ActorRef[Nothing]], new CompatTypedActorRefSerializer(typedSystem)) + } +} diff --git a/akka-kryo-serialization-pekko-compat/src/main/scala/io/altoo/akka/serialization/kryo/compat/serializer/CompatActorRefSerializer.scala b/akka-kryo-serialization-pekko-compat/src/main/scala/io/altoo/akka/serialization/kryo/compat/serializer/CompatActorRefSerializer.scala new file mode 100644 index 0000000..7bf187d --- /dev/null +++ b/akka-kryo-serialization-pekko-compat/src/main/scala/io/altoo/akka/serialization/kryo/compat/serializer/CompatActorRefSerializer.scala @@ -0,0 +1,42 @@ +/** + * ***************************************************************************** + * Copyright 2012 Roman Levenstein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * **************************************************************************** + */ + +package io.altoo.akka.serialization.kryo.compat.serializer + +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, Serializer} +import akka.actor.{ActorRef, ExtendedActorSystem} +import akka.serialization.Serialization + +/** + * Specialized serializer for actor refs. + * + * @author Roman Levenstein + */ +class CompatActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[ActorRef] { + + override def read(kryo: Kryo, input: Input, typ: Class[? <: ActorRef]): ActorRef = { + val path = input.readString() + val newPath = path.replace("pekko://", "akka://") + system.provider.resolveActorRef(newPath) + } + + override def write(kryo: Kryo, output: Output, obj: ActorRef): Unit = { + output.writeAscii(Serialization.serializedActorPath(obj)) + } +} diff --git a/akka-kryo-serialization-pekko-compat/src/main/scala/io/altoo/akka/serialization/kryo/compat/serializer/CompatTypedActorRefSerializer.scala b/akka-kryo-serialization-pekko-compat/src/main/scala/io/altoo/akka/serialization/kryo/compat/serializer/CompatTypedActorRefSerializer.scala new file mode 100644 index 0000000..722c6d7 --- /dev/null +++ b/akka-kryo-serialization-pekko-compat/src/main/scala/io/altoo/akka/serialization/kryo/compat/serializer/CompatTypedActorRefSerializer.scala @@ -0,0 +1,43 @@ +/** + * ***************************************************************************** + * Copyright 2012 Roman Levenstein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * **************************************************************************** + */ + +package io.altoo.akka.serialization.kryo.compat.serializer + +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, Serializer} +import akka.actor.typed.{ActorRef, ActorRefResolver, ActorSystem} + +/** + * Specialized serializer for typed actor refs. + * + * @author Arman Bilge + */ +class CompatTypedActorRefSerializer(val system: ActorSystem[Nothing]) extends Serializer[ActorRef[Nothing]] { + + private val resolver = ActorRefResolver(system) + + override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef[Nothing]]): ActorRef[Nothing] = { + val path = input.readString() + val newPath = path.replace("akka://", "pekko://") + resolver.resolveActorRef(newPath) + } + + override def write(kryo: Kryo, output: Output, obj: ActorRef[Nothing]): Unit = { + output.writeAscii(resolver.toSerializationFormat(obj)) + } +} diff --git a/akka-kryo-serialization-pekko-compat/src/main/scala/org/apache/pekko/actor/ActorRef.scala b/akka-kryo-serialization-pekko-compat/src/main/scala/org/apache/pekko/actor/ActorRef.scala new file mode 100644 index 0000000..d00c332 --- /dev/null +++ b/akka-kryo-serialization-pekko-compat/src/main/scala/org/apache/pekko/actor/ActorRef.scala @@ -0,0 +1,7 @@ +package org.apache.pekko.actor + +/** + * Dummy class to register a serializer for akka.actor.ActorRef on Pekko system + */ +class ActorRef +class RepointableActorRef diff --git a/akka-kryo-serialization-pekko-compat/src/main/scala/org/apache/pekko/actor/typed/ActorRef.scala b/akka-kryo-serialization-pekko-compat/src/main/scala/org/apache/pekko/actor/typed/ActorRef.scala new file mode 100644 index 0000000..49561e1 --- /dev/null +++ b/akka-kryo-serialization-pekko-compat/src/main/scala/org/apache/pekko/actor/typed/ActorRef.scala @@ -0,0 +1,6 @@ +package org.apache.pekko.actor.typed + +/** + * Dummy class to register a serializer for akka.actor.typed.ActorRef on Pekko system + */ +class ActorRef[-T] diff --git a/akka-kryo-serialization-pekko-compat/src/main/scala/org/apache/pekko/util/ByteString.scala b/akka-kryo-serialization-pekko-compat/src/main/scala/org/apache/pekko/util/ByteString.scala new file mode 100644 index 0000000..5e1b78c --- /dev/null +++ b/akka-kryo-serialization-pekko-compat/src/main/scala/org/apache/pekko/util/ByteString.scala @@ -0,0 +1,7 @@ +package org.apache.pekko.util + + +/** + * Dummy class to register a serializer for akka.util.ByteString on Pekko system + */ +class ByteString diff --git a/akka-kryo-serialization-pekko-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/PekkoCompatSerializerTest.scala b/akka-kryo-serialization-pekko-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/PekkoCompatSerializerTest.scala new file mode 100644 index 0000000..da3ff0b --- /dev/null +++ b/akka-kryo-serialization-pekko-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/PekkoCompatSerializerTest.scala @@ -0,0 +1,66 @@ +package io.altoo.pekko.serialization.kryo.compat + +import com.typesafe.config.ConfigFactory +import akka.actor.{Actor, ActorSystem, Props} +import akka.serialization.SerializationExtension +import akka.testkit.TestKit +import io.altoo.akka.serialization.kryo.KryoSerializer +import io.altoo.testing.SampleMessage +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, Inside} + +object PekkoCompatSerializerTest { + private val testConfig = + """ + |akka { + | actor { + | serializers { + | kryo = "io.altoo.akka.serialization.kryo.KryoSerializer" + | } + | serialization-bindings { + | "org.apache.pekko.actor.ActorRef" = kryo + | "akka.actor.ActorRef" = kryo + | "io.altoo.testing.SampleMessage" = kryo + | } + | } + |} + |akka-kryo-serialization { + | trace = true + | id-strategy = "default" + | implicit-registration-logging = true + | post-serialization-transformations = off + | + | kryo-initializer = "io.altoo.akka.serialization.kryo.compat.PekkoCompatKryoInitializer" + |} + |""".stripMargin + + // serialized io.altoo.testing.SampleMessage(actorRef: org.apache.pekko.actor.ActorRef) with pekko-kryo-serialization + private val pekkoActorRefSerialized = Array[Byte](1, 0, 105, 111, 46, 97, 108, 116, 111, 111, 46, 116, 101, 115, 116, 105, 110, 103, 46, 83, 97, 109, 112, 108, 101, 77, 101, 115, 115, 97, + 103, -27, 1, 1, 1, 111, 114, 103, 46, 97, 112, 97, 99, 104, 101, 46, 112, 101, 107, 107, 111, 46, 97, 99, 116, 111, 114, 46, 82, 101, 112, 111, 105, 110, 116, 97, 98, 108, 101, 65, 99, + 116, 111, 114, 82, 101, -26, 1, 112, 101, 107, 107, 111, 58, 47, 47, 116, 101, 115, 116, 83, 121, 115, 116, 101, 109, 47, 117, 115, 101, 114, 47, 115, 97, 109, 112, 108, 101, 65, 99, 116, + 111, 114, 35, 56, 48, 52, 54, 54, 57, 49, 52, -79) +} + +class PekkoCompatSerializerTest extends TestKit(ActorSystem("testSystem", ConfigFactory.parseString(PekkoCompatSerializerTest.testConfig).withFallback(ConfigFactory.load()))) + with AnyFlatSpecLike with Matchers with Inside with BeforeAndAfterAll { + + private val serialization = SerializationExtension(system) + + override protected def afterAll(): Unit = shutdown(system) + + behavior of "ActorRefSerializer" + + it should "serialize and deserialize actorRef" in { + // create actor with path to not get deadLetter ref + system.actorOf(Props(new Actor { def receive: Receive = PartialFunction.empty }), "sampleActor") + + val serializer = serialization.serializerFor(classOf[SampleMessage]) + serializer shouldBe a[KryoSerializer] + + // deserialize + val deserialized = serializer.fromBinary(PekkoCompatSerializerTest.pekkoActorRefSerialized) + deserialized shouldBe a[SampleMessage] + deserialized.asInstanceOf[SampleMessage].actorRef.path.toString shouldBe "akka://testSystem/user/sampleActor" + } +} diff --git a/akka-kryo-serialization-pekko-compat/src/test/scala/io/altoo/testing/SampleMessage.scala b/akka-kryo-serialization-pekko-compat/src/test/scala/io/altoo/testing/SampleMessage.scala new file mode 100644 index 0000000..cf4a942 --- /dev/null +++ b/akka-kryo-serialization-pekko-compat/src/test/scala/io/altoo/testing/SampleMessage.scala @@ -0,0 +1,6 @@ +package io.altoo.testing + +import akka.actor.ActorRef + +// Mirror class using Pekko ActorRef instead of Akka ActorRef +case class SampleMessage(actorRef: ActorRef) extends Serializable diff --git a/build.sbt b/build.sbt index f7bfc0a..4de6c30 100644 --- a/build.sbt +++ b/build.sbt @@ -32,7 +32,7 @@ lazy val root: Project = project.in(file(".")) .settings(publish / skip := true) .settings(OsgiKeys.privatePackage := Nil) .settings(OsgiKeys.exportPackage := Seq("io.altoo.*")) - .aggregate(core, typed) + .aggregate(core, typed, pekkoCompat) lazy val core: Project = Project("akka-kryo-serialization", file("akka-kryo-serialization")) .settings(moduleSettings) @@ -59,6 +59,12 @@ lazy val typed: Project = Project("akka-kryo-serialization-typed", file("akka-kr .settings(libraryDependencies ++= typedDeps ++ testingDeps) .dependsOn(core) +lazy val pekkoCompat: Project = Project("akka-kryo-serialization-pekko-compat", file("akka-kryo-serialization-pekko-compat")) + .settings(moduleSettings) + .settings(description := "akka-serialization implementation using kryo - extension for improved wire compatibility with Pekko") + .settings(libraryDependencies ++= testingDeps) + .dependsOn(core, typed) + // Dependencies lazy val coreDeps = Seq(