Skip to content

Commit

Permalink
fix: set default page-size (#1081)
Browse files Browse the repository at this point in the history
* max-buffer-size and max-result-size-query were not used
* set the driver page size
  • Loading branch information
patriknw authored Jan 18, 2024
1 parent 54ba1de commit adc960e
Show file tree
Hide file tree
Showing 12 changed files with 21 additions and 23 deletions.
10 changes: 6 additions & 4 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,6 @@ akka.persistence.cassandra {
# time to find the in-order sequence number before failing the stream
events-by-persistence-id-gap-timeout = 10s

# How many events to fetch in one query (replay) and keep buffered until they
# are delivered downstreams.
max-buffer-size = 500

# Deserialization of events is perfomed in an Akka streams mapAsync operator and this is the
# parallelism for that. Increasing to means that deserialization is pipelined, which can
# be an advantage for machines with many CPU cores but otherwise it might be slower because
Expand Down Expand Up @@ -594,6 +590,12 @@ datastax-java-driver {
consistency = QUORUM
# the journal does not use any counters or collections
default-idempotence = true

# The page size. This controls how many rows will be retrieved simultaneously
# in a single network roundtrip (the goal being to avoid loading too many
# results in memory at the same time). If there are more results, additional
# requests will be used to retrieve them.
page-size = 500
}
}
akka-persistence-cassandra-snapshot-profile {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import com.typesafe.config.Config

val gapFreeSequenceNumbers: Boolean = queryConfig.getBoolean("gap-free-sequence-numbers")

val maxBufferSize: Int = queryConfig.getInt("max-buffer-size")

val deserializationParallelism: Int = queryConfig.getInt("deserialization-parallelism")

val pluginDispatcher: String = queryConfig.getString("plugin-dispatcher")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ object AllPersistenceIdsSpec {
akka.persistence.cassandra {
journal.target-partition-size = 15
query {
max-buffer-size = 10
refresh-interval = 0.5s
max-result-size-query = 10
}
}
}
datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 10
""").withFallback(CassandraLifecycle.config)
}

Expand Down Expand Up @@ -77,7 +76,7 @@ class AllPersistenceIdsSpec extends CassandraSpec(AllPersistenceIdsSpec.config)
src.runWith(TestSink.probe[Any]).request(10).expectNext("d").expectComplete()
}

"find existing persistence ids in batches if there is more of them than max-result-size-query" in {
"find existing persistence ids in batches if there is more of them than page-size" in {
for (_ <- 1 to 1000) {
setup(UUID.randomUUID().toString, 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ object EventAdaptersReadSpec {
}
}
query {
max-buffer-size = 50
refresh-interval = 500ms
max-result-size-query = 2
}
events-by-tag {
flush-interval = 0ms
}
}
datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 2
""").withFallback(CassandraLifecycle.config)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ object EventsByPersistenceIdFastForwardSpec {
val config = ConfigFactory.parseString(s"""
akka.persistence.cassandra.journal.keyspace=EventsByPersistenceIdFastForwardSpec
akka.persistence.cassandra.query.refresh-interval = 250ms
akka.persistence.cassandra.query.max-result-size-query = 2
akka.persistence.cassandra.journal.target-partition-size = 15
datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 2
""").withFallback(CassandraLifecycle.config)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ object EventsByPersistenceIdMultiPartitionGapSpec {
akka.loglevel = INFO
akka.persistence.cassandra.journal.target-partition-size = 15
akka.persistence.cassandra.query.refresh-interval = 0.5s
akka.persistence.cassandra.query.max-result-size-query = 2
akka.persistence.cassandra.query.events-by-persistence-id-gap-timeout = 4 seconds
akka.persistence.cassandra.query.gap-free-sequence-numbers = off
datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 2
akka.stream.materializer.max-input-buffer-size = 4 # there is an async boundary
""").withFallback(CassandraLifecycle.config)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ object EventsByPersistenceIdSpec {
val config = ConfigFactory.parseString(s"""
akka.persistence.cassandra.journal.target-partition-size = 15
akka.persistence.cassandra.query.refresh-interval = 0.5s
akka.persistence.cassandra.query.max-result-size-query = 2
akka.persistence.cassandra.query.events-by-persistence-id-gap-timeout = 4 seconds
datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 2
akka.stream.materializer.max-input-buffer-size = 4 # there is an async boundary
""").withFallback(CassandraLifecycle.config)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object EventsByPersistenceIdWithControlSpec {
akka.persistence.cassandra.journal.keyspace=EventsByPersistenceIdWithControlSpec
akka.persistence.cassandra.journal.target-partition-size = 15
akka.persistence.cassandra.query.refresh-interval = 120s # effectively disabled
akka.persistence.cassandra.query.max-result-size-query = 20
datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 20
akka.stream.materializer.max-input-buffer-size = 4 # there is an async boundary
""").withFallback(CassandraLifecycle.config)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ object EventsByTagSpec {
query {
refresh-interval = 500ms
max-buffer-size = 50
}

events-by-tag {
Expand All @@ -73,6 +72,7 @@ object EventsByTagSpec {

# coordinated-shutdown-on-error = on
}
datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 50
""").withFallback(CassandraLifecycle.config)

val strictConfig = ConfigFactory.parseString(s"""
Expand Down Expand Up @@ -922,7 +922,7 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends AbstractEventsByTagSpec(Even
val w2 = UUID.randomUUID().toString
val w3 = UUID.randomUUID().toString

// max-buffer-size = 50
// page-size = 50
// create 120 events per day in total, 60 from each one of the two persistenceId
var lastT = t1
for {
Expand Down Expand Up @@ -986,7 +986,7 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends AbstractEventsByTagSpec(Even
val w2 = UUID.randomUUID().toString
val w3 = UUID.randomUUID().toString

// max-buffer-size = 50
// buffer-size = 50
(1L to 100L).foreach { n =>
val eventA = PersistentRepr(s"A$n", n, "a", "", writerUuid = w1)
val t = t1.plus(3 * n, ChronoUnit.MILLIS)
Expand Down Expand Up @@ -1040,7 +1040,7 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends AbstractEventsByTagSpec(Even
val t1 = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(5).minusDays(5)
val w1 = UUID.randomUUID().toString

// max-buffer-size = 50
// page-size = 50
// start at seqNr 1 here to trigger the backtracking mode
(101L to 430L).foreach { n =>
val eventA = PersistentRepr(s"B$n", n, "b", "", writerUuid = w1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ object EventsByTagStageSpec {
log-queries = off

query {
max-result-size-query = $fetchSize
log-queries = on
refresh-interval = 200ms
}
Expand All @@ -54,6 +53,7 @@ object EventsByTagStageSpec {
new-persistence-id-scan-timeout = ${newPersistenceIdTimeout.toMillis}ms
}
}
datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = $fetchSize
""").withFallback(CassandraLifecycle.config)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import scala.concurrent.duration._
object CassandraReadJournalSpec {
val config = ConfigFactory.parseString(s"""
akka.actor.serialize-messages=off
akka.persistence.cassandra.query.max-buffer-size = 10
akka.persistence.cassandra.query.refresh-interval = 0.5s
akka.persistence.cassandra.journal.event-adapters {
test-tagger = akka.persistence.cassandra.query.javadsl.TestTagger
}
akka.persistence.cassandra.journal.event-adapter-bindings = {
"java.lang.String" = test-tagger
}
datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 10
""").withFallback(CassandraLifecycle.config)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import scala.concurrent.duration._
object CassandraReadJournalSpec {
val config = ConfigFactory.parseString(s"""
akka.actor.serialize-messages=off
akka.persistence.cassandra.query.max-buffer-size = 10
akka.persistence.cassandra.query.refresh-interval = 0.5s
akka.persistence.cassandra.journal.event-adapters {
test-tagger = akka.persistence.cassandra.query.scaladsl.TestTagger
Expand All @@ -26,6 +25,7 @@ object CassandraReadJournalSpec {
"java.lang.String" = test-tagger
}
akka.persistence.cassandra.log-queries = off
datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 10
""").withFallback(CassandraLifecycle.config)
}

Expand Down

0 comments on commit adc960e

Please sign in to comment.