Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compat module for wire compatibility with Akka #292

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.altoo.akka.serialization.kryo.compat

import akka.actor.ExtendedActorSystem
import io.altoo.akka.serialization.kryo.DefaultKryoInitializer
import io.altoo.akka.serialization.kryo.compat.serializer.CompatActorRefSerializer
import io.altoo.akka.serialization.kryo.serializer.akka.ByteStringSerializer
import io.altoo.akka.serialization.kryo.serializer.scala.ScalaKryo

class PekkoCompatKryoInitializer extends DefaultKryoInitializer {

override def initAkkaSerializer(kryo: ScalaKryo, system: ExtendedActorSystem): Unit = {
super.initAkkaSerializer(kryo, system)

// registering dummy Akka ActorRef to provide wire compatibility
kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.ActorRef], new CompatActorRefSerializer(system))
kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.RepointableActorRef], new CompatActorRefSerializer(system))
// registering dummy Akka ByteString to provide wire compatibility
kryo.addDefaultSerializer(classOf[org.apache.pekko.util.ByteString], classOf[ByteStringSerializer])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.altoo.akka.serialization.kryo.compat

import akka.actor.{ExtendedActorSystem, typed}
import io.altoo.akka.serialization.kryo.compat.serializer.{CompatActorRefSerializer, CompatTypedActorRefSerializer}
import io.altoo.akka.serialization.kryo.serializer.akka.ByteStringSerializer
import io.altoo.akka.serialization.kryo.serializer.scala.ScalaKryo
import io.altoo.akka.serialization.kryo.typed.TypedKryoInitializer

class TypedPekkoCompatKryoInitializer extends TypedKryoInitializer {

override def initAkkaSerializer(kryo: ScalaKryo, typedSystem: typed.ActorSystem[Nothing]): Unit = {
super.initAkkaSerializer(kryo, typedSystem)

// registering dummy Akka ActorRef to provide wire compatibility
kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.ActorRef], new CompatActorRefSerializer(typedSystem.classicSystem.asInstanceOf[ExtendedActorSystem]))
kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.RepointableActorRef], new CompatActorRefSerializer(typedSystem.classicSystem.asInstanceOf[ExtendedActorSystem]))
// registering dummy Akka ByteString to provide wire compatibility
kryo.addDefaultSerializer(classOf[org.apache.pekko.util.ByteString], classOf[ByteStringSerializer])

// registering dummy Akka ActorRef to provide wire compatibility
kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.typed.ActorRef[Nothing]], new CompatTypedActorRefSerializer(typedSystem))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* *****************************************************************************
* Copyright 2012 Roman Levenstein
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ****************************************************************************
*/

package io.altoo.akka.serialization.kryo.compat.serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import akka.actor.{ActorRef, ExtendedActorSystem}
import akka.serialization.Serialization

/**
* Specialized serializer for actor refs.
*
* @author Roman Levenstein
*/
class CompatActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[ActorRef] {

override def read(kryo: Kryo, input: Input, typ: Class[? <: ActorRef]): ActorRef = {
val path = input.readString()
val newPath = path.replace("pekko://", "akka://")
system.provider.resolveActorRef(newPath)
}

override def write(kryo: Kryo, output: Output, obj: ActorRef): Unit = {
output.writeAscii(Serialization.serializedActorPath(obj))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* *****************************************************************************
* Copyright 2012 Roman Levenstein
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ****************************************************************************
*/

package io.altoo.akka.serialization.kryo.compat.serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import akka.actor.typed.{ActorRef, ActorRefResolver, ActorSystem}

/**
* Specialized serializer for typed actor refs.
*
* @author Arman Bilge
*/
class CompatTypedActorRefSerializer(val system: ActorSystem[Nothing]) extends Serializer[ActorRef[Nothing]] {

private val resolver = ActorRefResolver(system)

override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef[Nothing]]): ActorRef[Nothing] = {
val path = input.readString()
val newPath = path.replace("akka://", "pekko://")
resolver.resolveActorRef(newPath)
}

override def write(kryo: Kryo, output: Output, obj: ActorRef[Nothing]): Unit = {
output.writeAscii(resolver.toSerializationFormat(obj))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.pekko.actor

/**
* Dummy class to register a serializer for akka.actor.ActorRef on Pekko system
*/
class ActorRef
class RepointableActorRef
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.apache.pekko.actor.typed

/**
* Dummy class to register a serializer for akka.actor.typed.ActorRef on Pekko system
*/
class ActorRef[-T]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.pekko.util


/**
* Dummy class to register a serializer for akka.util.ByteString on Pekko system
*/
class ByteString
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.altoo.pekko.serialization.kryo.compat

import com.typesafe.config.ConfigFactory
import akka.actor.{Actor, ActorSystem, Props}
import akka.serialization.SerializationExtension
import akka.testkit.TestKit
import io.altoo.akka.serialization.kryo.KryoSerializer
import io.altoo.testing.SampleMessage
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, Inside}

object PekkoCompatSerializerTest {
private val testConfig =
"""
|akka {
| actor {
| serializers {
| kryo = "io.altoo.akka.serialization.kryo.KryoSerializer"
| }
| serialization-bindings {
| "org.apache.pekko.actor.ActorRef" = kryo
| "akka.actor.ActorRef" = kryo
| "io.altoo.testing.SampleMessage" = kryo
| }
| }
|}
|akka-kryo-serialization {
| trace = true
| id-strategy = "default"
| implicit-registration-logging = true
| post-serialization-transformations = off
|
| kryo-initializer = "io.altoo.akka.serialization.kryo.compat.PekkoCompatKryoInitializer"
|}
|""".stripMargin

// serialized io.altoo.testing.SampleMessage(actorRef: org.apache.pekko.actor.ActorRef) with pekko-kryo-serialization
private val pekkoActorRefSerialized = Array[Byte](1, 0, 105, 111, 46, 97, 108, 116, 111, 111, 46, 116, 101, 115, 116, 105, 110, 103, 46, 83, 97, 109, 112, 108, 101, 77, 101, 115, 115, 97,
103, -27, 1, 1, 1, 111, 114, 103, 46, 97, 112, 97, 99, 104, 101, 46, 112, 101, 107, 107, 111, 46, 97, 99, 116, 111, 114, 46, 82, 101, 112, 111, 105, 110, 116, 97, 98, 108, 101, 65, 99,
116, 111, 114, 82, 101, -26, 1, 112, 101, 107, 107, 111, 58, 47, 47, 116, 101, 115, 116, 83, 121, 115, 116, 101, 109, 47, 117, 115, 101, 114, 47, 115, 97, 109, 112, 108, 101, 65, 99, 116,
111, 114, 35, 56, 48, 52, 54, 54, 57, 49, 52, -79)
}

class PekkoCompatSerializerTest extends TestKit(ActorSystem("testSystem", ConfigFactory.parseString(PekkoCompatSerializerTest.testConfig).withFallback(ConfigFactory.load())))
with AnyFlatSpecLike with Matchers with Inside with BeforeAndAfterAll {

private val serialization = SerializationExtension(system)

override protected def afterAll(): Unit = shutdown(system)

behavior of "ActorRefSerializer"

it should "deserialize actorRef from Pekko" in {
// create actor with path to not get deadLetter ref
system.actorOf(Props(new Actor { def receive: Receive = PartialFunction.empty }), "sampleActor")

val serializer = serialization.serializerFor(classOf[SampleMessage])
serializer shouldBe a[KryoSerializer]

// deserialize
val deserialized = serializer.fromBinary(PekkoCompatSerializerTest.pekkoActorRefSerialized)
deserialized shouldBe a[SampleMessage]
deserialized.asInstanceOf[SampleMessage].actorRef.path.toString shouldBe "akka://testSystem/user/sampleActor"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.altoo.testing

import akka.actor.ActorRef

// Mirror class using Pekko ActorRef instead of Akka ActorRef
case class SampleMessage(actorRef: ActorRef) extends Serializable
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ lazy val root: Project = project.in(file("."))
.settings(publish / skip := true)
.settings(OsgiKeys.privatePackage := Nil)
.settings(OsgiKeys.exportPackage := Seq("io.altoo.*"))
.aggregate(core, typed)
.aggregate(core, typed, pekkoCompat)

lazy val core: Project = Project("akka-kryo-serialization", file("akka-kryo-serialization"))
.settings(moduleSettings)
Expand All @@ -59,6 +59,12 @@ lazy val typed: Project = Project("akka-kryo-serialization-typed", file("akka-kr
.settings(libraryDependencies ++= typedDeps ++ testingDeps)
.dependsOn(core)

lazy val pekkoCompat: Project = Project("akka-kryo-serialization-pekko-compat", file("akka-kryo-serialization-pekko-compat"))
.settings(moduleSettings)
.settings(description := "akka-serialization implementation using kryo - extension for improved wire compatibility with Pekko")
.settings(libraryDependencies ++= testingDeps)
.dependsOn(core, typed)


// Dependencies
lazy val coreDeps = Seq(
Expand Down
20 changes: 20 additions & 0 deletions migration-guide.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
akka-kryo-serialization - migration guide
=========================================

Migration from akka-kryo-serialization to pekko-kryo-serialization
-----------------------------
* You should upgrade to akka-kryo-serialization 2.5.x before migrating to pekko-kryo-serialization.
* To support efforts for live migration from Akka to Pekko, compat modules are available in both Akka and Pekko Kryo Serialization to help with wire compatibility of custom messages containing ActorRefs and ByteStrings.
```
# on Pekko
libraryDependencies += "io.altoo" %% "pekko-kryo-serialization-akka-compat" % "1.0.1"

# on Akka
libraryDependencies += "io.altoo" %% "pekko-kryo-serialization-akka-compat" % "2.5.2"
```
Then configure (or derive from if using a custom initializer) `AkkaCompatKryoInitializer` on Pekko, and `PekkoCompatKryoInitializer` on Akka.
```
# on Pekko
pekko-kryo-serialization.kryo-initializer = "io.altoo.pekko.serialization.kryo.compat.AkkaCompatKryoInitializer"

# on Akka
kka-kryo-serialization.kryo-initializer = "io.altoo.akka.serialization.kryo.compat.PekkoCompatKryoInitializer"
```

Migration from 2.4.x to 2.5.x
-----------------------------
* `EnumerationSerializer` has been deprecated with 2.4.2, with 2.5.0 default serializer for `scala.Enumeration` has been switched to `EnumerationNameSerializer`, which is not backwards compatible.
Expand Down
Loading