Skip to content

Commit

Permalink
chore: bump to akka 2.10.0-M1, alpakka 9.0.0-M1
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-alfers committed Oct 2, 2024
1 parent b493202 commit eb4bf9e
Show file tree
Hide file tree
Showing 24 changed files with 59 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

package akka.persistence.cassandra

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.util.OptionVal
import com.datastax.oss.driver.api.core.cql.PreparedStatement

Expand All @@ -26,7 +26,7 @@ import com.datastax.oss.driver.api.core.cql.PreparedStatement
ps.foreach { p =>
// only cache successful futures, ok to overwrite
preparedStatement = OptionVal.Some(Future.successful(p))
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
ps
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.persistence.cassandra
import com.datastax.oss.driver.api.core.cql.Row
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

import akka.persistence.PersistentRepr
import akka.persistence.cassandra.journal._
Expand All @@ -16,7 +17,6 @@ import java.{ util => ju }

import akka.util.OptionVal
import akka.serialization.Serialization
import akka.util.ccompat.JavaConverters._
import java.nio.ByteBuffer

import com.datastax.oss.protocol.internal.util.Bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.persistence.cassandra

import scala.collection.immutable
import scala.jdk.CollectionConverters._

import akka.actor.ClassicActorSystemProvider

Expand Down Expand Up @@ -48,10 +49,7 @@ class KeyspaceAndTableStatements(
* This can be queried in for example a startup script without accessing the actual
* Cassandra plugin actor.
*/
def getCreateJournalTablesStatements: java.util.List[String] = {
import akka.util.ccompat.JavaConverters._
createJournalTablesStatements.asJava
}
def getCreateJournalTablesStatements: java.util.List[String] = createJournalTablesStatements.asJava

/**
* The Cassandra Statement that can be used to create the configured keyspace.
Expand All @@ -77,9 +75,6 @@ class KeyspaceAndTableStatements(
* This can be queried in for example a startup script without accessing the actual
* Cassandra plugin actor.
*/
def getCreateSnapshotTablesStatements: java.util.List[String] = {
import akka.util.ccompat.JavaConverters._
createSnapshotTablesStatements.asJava
}
def getCreateSnapshotTablesStatements: java.util.List[String] = createSnapshotTablesStatements.asJava

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package akka.persistence.cassandra.compaction

import com.typesafe.config.{ Config, ConfigFactory }

import akka.util.ccompat.JavaConverters._
import scala.jdk.CollectionConverters._

/*
* Based upon https://github.com/apache/cassandra/blob/cassandra-2.2/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import akka.event.LoggingAdapter
import akka.persistence.cassandra.PluginSettings
import akka.persistence.cassandra.journal.CassandraJournal.{ Serialized, TagPidSequenceNr }
import com.datastax.oss.driver.api.core.cql.{ PreparedStatement, Row, Statement }
import akka.util.ccompat.JavaConverters._
import scala.concurrent.{ ExecutionContext, Future }
import scala.jdk.CollectionConverters._
import java.lang.{ Long => JLong }

import akka.annotation.InternalApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,20 @@ import akka.persistence.cassandra.journal.TagWriter.TagProgress
import akka.serialization.{ AsyncSerializer, Serialization, SerializationExtension }
import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry }
import akka.stream.scaladsl.Sink
import akka.dispatch.ExecutionContexts
import akka.util.{ OptionVal, Timeout }
import com.datastax.oss.driver.api.core.cql._
import com.typesafe.config.Config
import com.datastax.oss.driver.api.core.uuid.Uuids
import com.datastax.oss.protocol.internal.util.Bytes

import scala.annotation.tailrec
import akka.util.ccompat.JavaConverters._
import scala.collection.immutable
import scala.collection.immutable.Seq
import scala.concurrent._
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import scala.compat.java8.FutureConverters._
import akka.annotation.DoNotInherit
import akka.annotation.InternalStableApi
import akka.stream.scaladsl.Source
Expand Down Expand Up @@ -217,11 +216,11 @@ import akka.stream.scaladsl.Source
result
.flatMap(_ => deleteDeletedToSeqNr(persistenceId))
.flatMap(_ => deleteFromAllPersistenceIds(persistenceId))
else result.map(_ => Done)(ExecutionContexts.parasitic)
else result.map(_ => Done)(ExecutionContext.parasitic)
result2.pipeTo(sender())

case HealthCheckQuery =>
session.selectOne(healthCheckCql).map(_ => HealthCheckResponse)(ExecutionContexts.parasitic).pipeTo(sender())
session.selectOne(healthCheckCql).map(_ => HealthCheckResponse)(ExecutionContext.parasitic).pipeTo(sender())
}

override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
Expand Down Expand Up @@ -287,7 +286,7 @@ import akka.stream.scaladsl.Source
tagWrites match {
case Some(t) =>
implicit val timeout: Timeout = Timeout(settings.eventsByTagSettings.tagWriteTimeout)
t.ask(extractTagWrites(serialized)).map(_ => Nil)(ExecutionContexts.parasitic)
t.ask(extractTagWrites(serialized)).map(_ => Nil)(ExecutionContext.parasitic)
case None => Future.successful(Nil)
}
}
Expand Down Expand Up @@ -547,7 +546,7 @@ import akka.stream.scaladsl.Source
e.getClass.getName,
e.getMessage)
}
deleteResult.map(_ => Done)(ExecutionContexts.parasitic)
deleteResult.map(_ => Done)(ExecutionContext.parasitic)
}
}
}
Expand Down Expand Up @@ -587,7 +586,7 @@ import akka.stream.scaladsl.Source
}
})
})))
deleteResult.map(_ => Done)(ExecutionContexts.parasitic)
deleteResult.map(_ => Done)(ExecutionContext.parasitic)
}

// Deletes the events by inserting into the metadata table deleted_to and physically deletes the rows.
Expand Down Expand Up @@ -696,7 +695,7 @@ import akka.stream.scaladsl.Source
var batch =
new BatchStatementBuilder(BatchType.UNLOGGED).build().setExecutionProfileName(journalSettings.writeProfile)
batch = body(batch)
session.underlying().flatMap(_.executeAsync(batch).toScala).map(_ => ())
session.underlying().flatMap(_.executeAsync(batch).asScala).map(_ => ())
}

private def selectOne[T <: Statement[T]](stmt: Statement[T]): Future[Option[Row]] = {
Expand Down Expand Up @@ -920,7 +919,7 @@ import akka.stream.scaladsl.Source

if (async) Future(deserializedEvent)
else Future.successful(deserializedEvent)
}).map(event => DeserializedEvent(event, meta))(ExecutionContexts.parasitic)
}).map(event => DeserializedEvent(event, meta))(ExecutionContext.parasitic)

} catch {
case NonFatal(e) => Future.failed(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package akka.persistence.cassandra.journal

import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.FutureConverters._

import akka.Done
import akka.annotation.InternalApi
Expand Down Expand Up @@ -335,27 +335,27 @@ import akka.persistence.cassandra.FutureDone
def tagStatements: Future[Done] =
if (eventsByTagSettings.eventsByTagEnabled) {
for {
_ <- session.executeAsync(createTagsTable).toScala
_ <- session.executeAsync(createTagsProgressTable).toScala
_ <- session.executeAsync(createTagScanningTable).toScala
_ <- session.executeAsync(createTagsTable).asScala
_ <- session.executeAsync(createTagsProgressTable).asScala
_ <- session.executeAsync(createTagScanningTable).asScala
} yield Done
} else FutureDone

def keyspace: Future[Done] =
if (journalSettings.keyspaceAutoCreate)
session.executeAsync(createKeyspace).toScala.map(_ => Done)
session.executeAsync(createKeyspace).asScala.map(_ => Done)
else FutureDone

val done = if (journalSettings.tablesAutoCreate) {
// reason for setSchemaMetadataEnabled is that it speed up tests by multiple factors
session.setSchemaMetadataEnabled(false)
val result = for {
_ <- keyspace
_ <- session.executeAsync(createTable).toScala
_ <- session.executeAsync(createMetadataTable).toScala
_ <- session.executeAsync(createTable).asScala
_ <- session.executeAsync(createMetadataTable).asScala
_ <- {
if (settings.journalSettings.supportAllPersistenceIds)
session.executeAsync(createAllPersistenceIdsTable).toScala
session.executeAsync(createAllPersistenceIdsTable).asScala
else
FutureDone
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.actor.Timers
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.LoggingAdapter
import akka.persistence.cassandra.journal.CassandraJournal._
import akka.persistence.cassandra.journal.TagWriter._
Expand Down Expand Up @@ -222,7 +221,7 @@ import scala.util.Try
case BulkTagWrite(tws, withoutTags) =>
val replyTo = sender()
val forwards = tws.map(forwardTagWrite)
Future.sequence(forwards).map(_ => Done)(ExecutionContexts.parasitic).pipeTo(replyTo)
Future.sequence(forwards).map(_ => Done)(ExecutionContext.parasitic).pipeTo(replyTo)
updatePendingScanning(withoutTags)
case WriteTagScanningTick =>
writeTagScanning()
Expand Down Expand Up @@ -351,7 +350,7 @@ import scala.util.Try
Future.successful(Done)
} else {
updatePendingScanning(tw.serialised)
askTagActor(tw.tag, tw).map(_ => Done)(ExecutionContexts.parasitic)
askTagActor(tw.tag, tw).map(_ => Done)(ExecutionContext.parasitic)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import akka.persistence.cassandra.journal.CassandraJournal.{ Serialized, Seriali
import akka.serialization.Serialization

import scala.concurrent._
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
import akka.util.ccompat.JavaConverters._
import com.typesafe.config.{ Config, ConfigValueType }
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import scala.util.{ Failure, Success, Try }
import com.datastax.oss.driver.api.core.CqlSession

import scala.annotation.nowarn
import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

import akka.persistence.cassandra.PluginSettings

/**
Expand Down Expand Up @@ -70,15 +71,15 @@ import akka.persistence.cassandra.PluginSettings

def selectSingleRow(persistenceId: String, pnr: Long)(implicit ec: ExecutionContext): Future[Option[Row]] = {
val boundStatement = selectSingleRowQuery.bind(persistenceId, pnr: JLong).setExecutionProfileName(profile)
session.executeAsync(boundStatement).toScala.map(rs => Option(rs.one()))
session.executeAsync(boundStatement).asScala.map(rs => Option(rs.one()))
}

def highestDeletedSequenceNumber(persistenceId: String)(implicit ec: ExecutionContext): Future[Long] =
executeStatement(selectDeletedToQuery.bind(persistenceId).setExecutionProfileName(profile)).map(r =>
Option(r.one()).map(_.getLong("deleted_to")).getOrElse(0))

private def executeStatement(statement: Statement[_]): Future[AsyncResultSet] =
session.executeAsync(statement).toScala
session.executeAsync(statement).asScala

}

Expand Down Expand Up @@ -401,7 +402,7 @@ import akka.persistence.cassandra.PluginSettings
} else if (rs.remaining() == 0) {
log.debug("EventsByPersistenceId [{}] Fetch more from seqNr [{}]", persistenceId, expectedNextSeqNr)
queryState = QueryInProgress(switchPartition, fetchMore = true, System.nanoTime())
val rsFut = rs.fetchNextPage().toScala
val rsFut = rs.fetchNextPage().asScala
rsFut.onComplete(newResultSetCb.invoke)
} else {
val row = rs.one()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.datastax.oss.driver.api.core.cql.AsyncResultSet
import com.datastax.oss.driver.api.core.cql.Row
import com.datastax.oss.driver.api.core.uuid.Uuids

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

/**
* INTERNAL API
Expand Down Expand Up @@ -99,7 +99,7 @@ import scala.compat.java8.FutureConverters._
Retries.retry({ () =>
val bound =
statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from, to).setExecutionProfileName(readProfile)
session.executeAsync(bound).toScala
session.executeAsync(bound).asScala
}, retries.retries, onFailure, retries.minDuration, retries.maxDuration, retries.randomFactor)
}
}
Expand Down Expand Up @@ -953,7 +953,7 @@ import scala.compat.java8.FutureConverters._

private def fetchMore(rs: AsyncResultSet): Unit = {
log.debug("[{}] No more results without paging. Requesting more.", stageUuid)
val moreResults = rs.fetchNextPage().toScala
val moreResults = rs.fetchNextPage().asScala
updateQueryState(QueryInProgress(abortForMissingSearch = false))
moreResults.onComplete(newResultSetCb.invoke)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import akka.persistence.query.javadsl._
import akka.stream.alpakka.cassandra.javadsl.CassandraSession
import akka.stream.javadsl.Source

import scala.compat.java8.FutureConverters
import scala.jdk.FutureConverters._

object CassandraReadJournal {

Expand Down Expand Up @@ -67,8 +67,7 @@ class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query
* It is also not required to wait until this CompletionStage is complete to start
* using the read journal.
*/
def initialize(): CompletionStage[Done] =
FutureConverters.toJava(scaladslReadJournal.initialize())
def initialize(): CompletionStage[Done] = scaladslReadJournal.initialize().asJava

/**
* Use this as the UUID offset in `eventsByTag` queries when you want all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package akka.persistence.cassandra.snapshot

import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.FutureConverters._

import akka.Done
import akka.annotation.InternalApi
Expand Down Expand Up @@ -112,15 +112,15 @@ import akka.persistence.cassandra.FutureDone
implicit ec: ExecutionContext): Future[Done] = {
def keyspace: Future[Done] =
if (snapshotSettings.keyspaceAutoCreate)
session.executeAsync(createKeyspace).toScala.map(_ => Done)
session.executeAsync(createKeyspace).asScala.map(_ => Done)
else FutureDone

if (snapshotSettings.tablesAutoCreate) {
// reason for setSchemaMetadataEnabled is that it speed up tests by multiple factors
session.setSchemaMetadataEnabled(false)
val result = for {
_ <- keyspace
_ <- session.executeAsync(createTable).toScala
_ <- session.executeAsync(createTable).asScala
} yield {
session.setSchemaMetadataEnabled(null)
Done
Expand Down
Loading

0 comments on commit eb4bf9e

Please sign in to comment.