From 60ba6031116eccd655d7c46a07b8408682249823 Mon Sep 17 00:00:00 2001 From: dgeorgiev Date: Fri, 26 Apr 2024 11:15:25 +0000 Subject: [PATCH 01/10] [WTCH-225] Revert optimization of findAndLockLatestActualPartition by WTCH-198, WTCH-204, WTCH-205, WTCH-211 --- .../repository/TxQueuePartitionJpaRepository.kt | 17 ++++++----------- .../V1.4.4__enqueued_tx_idx_change.sql | 5 ----- .../V1.4.5__tx_queue_partition_idx_change.sql | 4 ---- .../jpa/TxQueuePartitionJpaRepositoryTest.kt | 4 ++-- 4 files changed, 8 insertions(+), 22 deletions(-) delete mode 100644 we-tx-observer-module/we-tx-observer-jpa/src/main/resources/__tx_observer_schema/V1.4.4__enqueued_tx_idx_change.sql delete mode 100644 we-tx-observer-module/we-tx-observer-jpa/src/main/resources/__tx_observer_schema/V1.4.5__tx_queue_partition_idx_change.sql diff --git a/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/TxQueuePartitionJpaRepository.kt b/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/TxQueuePartitionJpaRepository.kt index 13e791e..06590d5 100644 --- a/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/TxQueuePartitionJpaRepository.kt +++ b/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/TxQueuePartitionJpaRepository.kt @@ -15,17 +15,12 @@ interface TxQueuePartitionJpaRepository : @Query( """ - select tqp.id - from $TX_OBSERVER_SCHEMA_NAME.tx_queue_partition tqp - where tqp.paused_on_tx_id is null - and exists( - select * from $TX_OBSERVER_SCHEMA_NAME.enqueued_tx - where partition_id = tqp.id - and status = 'NEW' - ) - order by tqp.priority desc - for update of tqp skip locked - limit 1 + select tqp.id from $TX_OBSERVER_SCHEMA_NAME.tx_queue_partition tqp + join $TX_OBSERVER_SCHEMA_NAME.enqueued_tx etx on etx.partition_id = tqp.id and etx.status = 'NEW' + where tqp.paused_on_tx_id is null + order by tqp.priority desc, etx.tx_timestamp + for update of tqp skip locked + limit 1 """, nativeQuery = true ) diff --git a/we-tx-observer-module/we-tx-observer-jpa/src/main/resources/__tx_observer_schema/V1.4.4__enqueued_tx_idx_change.sql b/we-tx-observer-module/we-tx-observer-jpa/src/main/resources/__tx_observer_schema/V1.4.4__enqueued_tx_idx_change.sql deleted file mode 100644 index 9ed05cf..0000000 --- a/we-tx-observer-module/we-tx-observer-jpa/src/main/resources/__tx_observer_schema/V1.4.4__enqueued_tx_idx_change.sql +++ /dev/null @@ -1,5 +0,0 @@ -drop index if exists ix__enqueued_tx_partition_id_timestamp_for_new; - -create index if not exists ix__enqueued_tx_partition_id_for_new - on enqueued_tx(partition_id) - where (status = 'NEW'); \ No newline at end of file diff --git a/we-tx-observer-module/we-tx-observer-jpa/src/main/resources/__tx_observer_schema/V1.4.5__tx_queue_partition_idx_change.sql b/we-tx-observer-module/we-tx-observer-jpa/src/main/resources/__tx_observer_schema/V1.4.5__tx_queue_partition_idx_change.sql deleted file mode 100644 index 548bfc3..0000000 --- a/we-tx-observer-module/we-tx-observer-jpa/src/main/resources/__tx_observer_schema/V1.4.5__tx_queue_partition_idx_change.sql +++ /dev/null @@ -1,4 +0,0 @@ -drop index if exists ix__tx_queue_partition__priority_for_paused; - -create index if not exists ix__tx_queue_partition__priority - on tx_queue_partition(priority desc); \ No newline at end of file diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/jpa/TxQueuePartitionJpaRepositoryTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/jpa/TxQueuePartitionJpaRepositoryTest.kt index 5e58a2d..53c8406 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/jpa/TxQueuePartitionJpaRepositoryTest.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/jpa/TxQueuePartitionJpaRepositoryTest.kt @@ -123,10 +123,10 @@ internal class TxQueuePartitionJpaRepositoryTest { fun `should find latest actual partition`() { val firstTxQueuePartition = partitionWithNewTx( id = "firstTxQueuePartitionId", - priority = 1, + priority = 0, enqueuedTxTimestamp = OffsetDateTime.of( LocalDate.now(), - LocalTime.of(12, 0, 0), + LocalTime.of(11, 0, 0), ZoneOffset.UTC ) ) From a62c718aaa17ae228f49bb8d8dde3bce415007d7 Mon Sep 17 00:00:00 2001 From: dgeorgiev Date: Sat, 27 Apr 2024 12:56:50 +0000 Subject: [PATCH 02/10] [WTCH-226] Implement a hybrid mode for the poller --- .../executor/ScheduledPartitionPoller.kt | 6 +- ...nPoller.kt => DefaultTxPartitionPoller.kt} | 24 +++++-- ...r.kt => ErrorHandlingTxPartitionPoller.kt} | 10 +-- .../partition/LatestTxPartitionPoller.kt | 5 -- .../spring/partition/TxPartitionPoller.kt | 5 ++ .../properties/PartitionPollerConfig.kt | 1 + .../TxQueuePartitionJpaRepository.kt | 20 +++++- .../observer/starter/PartitionPollerConfig.kt | 25 +++---- .../properties/PartitionPollerProperties.kt | 2 + ...est.kt => DefaultTxPartitionPollerTest.kt} | 65 ++++++++++++++----- ...ndlingTxPartitionPollerIntegrationTest.kt} | 8 +-- .../executor/ScheduledPartitionPollerTest.kt | 10 +-- .../jpa/TxQueuePartitionJpaRepositoryTest.kt | 45 +++++++++++-- 13 files changed, 168 insertions(+), 58 deletions(-) rename we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/{DefaultLatestTxPartitionPoller.kt => DefaultTxPartitionPoller.kt} (64%) rename we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/{ErrorHandlingLatestTxPartitionPoller.kt => ErrorHandlingTxPartitionPoller.kt} (54%) delete mode 100644 we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/LatestTxPartitionPoller.kt create mode 100644 we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/TxPartitionPoller.kt rename we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/{DefaultLatestTxPartitionPollerTest.kt => DefaultTxPartitionPollerTest.kt} (57%) rename we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/{ErrorHandlingLatestTxPartitionPollerIntegrationTest.kt => ErrorHandlingTxPartitionPollerIntegrationTest.kt} (95%) diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/ScheduledPartitionPoller.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/ScheduledPartitionPoller.kt index d424565..8842b38 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/ScheduledPartitionPoller.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/ScheduledPartitionPoller.kt @@ -1,14 +1,14 @@ package com.wavesenterprise.we.tx.observer.core.spring.executor -import com.wavesenterprise.we.tx.observer.core.spring.partition.LatestTxPartitionPoller +import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPoller open class ScheduledPartitionPoller( - private val errorHandlingLatestTxPartitionPoller: LatestTxPartitionPoller, + private val errorHandlingTxPartitionPoller: TxPartitionPoller, ) { open fun pollWhileHavingActivePartitions() { while (true) { - errorHandlingLatestTxPartitionPoller.pollLatestActualPartition() ?: break + errorHandlingTxPartitionPoller.pollPartition() ?: break } } } diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultLatestTxPartitionPoller.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPoller.kt similarity index 64% rename from we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultLatestTxPartitionPoller.kt rename to we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPoller.kt index 30770af..badaacf 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultLatestTxPartitionPoller.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPoller.kt @@ -2,28 +2,39 @@ package com.wavesenterprise.we.tx.observer.core.spring.partition import com.wavesenterprise.we.tx.observer.api.BlockListenerException import com.wavesenterprise.we.tx.observer.api.PartitionHandlingException +import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionPollerConfig +import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus +import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.transaction.annotation.Transactional -open class DefaultLatestTxPartitionPoller( +open class DefaultTxPartitionPoller( val txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository, + val enqueuedTxJpaRepository: EnqueuedTxJpaRepository, val pollingTxSubscriber: PollingTxSubscriber, val partitionHandler: PartitionHandler, -) : LatestTxPartitionPoller { + val partitionPollerProperties: PartitionPollerConfig, +) : TxPartitionPoller { - private val logger: Logger = LoggerFactory.getLogger(DefaultLatestTxPartitionPoller::class.java) + private val logger: Logger = LoggerFactory.getLogger(DefaultTxPartitionPoller::class.java) @Transactional - override fun pollLatestActualPartition(): String? = - txQueuePartitionJpaRepository.findAndLockLatestActualPartition()?.also { + override fun pollPartition(): String? { + val partitionId = if (isAccelerationRequired()) { + txQueuePartitionJpaRepository.findAndLockRandomPartition() + } else { + txQueuePartitionJpaRepository.findAndLockLatestPartition() + } + return partitionId?.also { logger.debug("Started polling partition with ID = $it") tryToDequeueTxForPartition(it) logger.debug("Finished polling partition with ID = $it") }.also { it ?: logger.debug("No actual partitions found to be polled") } + } private fun tryToDequeueTxForPartition(partitionId: String) { try { @@ -39,4 +50,7 @@ open class DefaultLatestTxPartitionPoller( throw PartitionHandlingException(partitionId, e) } } + + private fun isAccelerationRequired(): Boolean = + enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) >= partitionPollerProperties.accelerateAtQueueSize } diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/ErrorHandlingLatestTxPartitionPoller.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/ErrorHandlingTxPartitionPoller.kt similarity index 54% rename from we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/ErrorHandlingLatestTxPartitionPoller.kt rename to we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/ErrorHandlingTxPartitionPoller.kt index 355ba0f..cccf965 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/ErrorHandlingLatestTxPartitionPoller.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/ErrorHandlingTxPartitionPoller.kt @@ -2,13 +2,13 @@ package com.wavesenterprise.we.tx.observer.core.spring.partition import com.wavesenterprise.we.tx.observer.api.PartitionHandlingException -class ErrorHandlingLatestTxPartitionPoller( - private val defaultLatestTxPartitionPoller: LatestTxPartitionPoller, +class ErrorHandlingTxPartitionPoller( + private val defaultTxPartitionPoller: TxPartitionPoller, private val partitionHandler: PartitionHandler, -) : LatestTxPartitionPoller { +) : TxPartitionPoller { - override fun pollLatestActualPartition(): String? = try { - defaultLatestTxPartitionPoller.pollLatestActualPartition() + override fun pollPartition(): String? = try { + defaultTxPartitionPoller.pollPartition() } catch (ex: PartitionHandlingException) { partitionHandler.handleErrorWhenReading(ex.partitionId) ex.partitionId diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/LatestTxPartitionPoller.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/LatestTxPartitionPoller.kt deleted file mode 100644 index 7b3f72b..0000000 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/LatestTxPartitionPoller.kt +++ /dev/null @@ -1,5 +0,0 @@ -package com.wavesenterprise.we.tx.observer.core.spring.partition - -interface LatestTxPartitionPoller { - fun pollLatestActualPartition(): String? -} diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/TxPartitionPoller.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/TxPartitionPoller.kt new file mode 100644 index 0000000..9205fd4 --- /dev/null +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/TxPartitionPoller.kt @@ -0,0 +1,5 @@ +package com.wavesenterprise.we.tx.observer.core.spring.partition + +interface TxPartitionPoller { + fun pollPartition(): String? +} diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/PartitionPollerConfig.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/PartitionPollerConfig.kt index e55012f..e7fbee3 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/PartitionPollerConfig.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/PartitionPollerConfig.kt @@ -6,4 +6,5 @@ interface PartitionPollerConfig { var enabled: Boolean var fixedDelay: Duration var threadCount: Int + var accelerateAtQueueSize: Long } diff --git a/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/TxQueuePartitionJpaRepository.kt b/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/TxQueuePartitionJpaRepository.kt index 06590d5..f8b5ead 100644 --- a/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/TxQueuePartitionJpaRepository.kt +++ b/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/TxQueuePartitionJpaRepository.kt @@ -24,7 +24,25 @@ interface TxQueuePartitionJpaRepository : """, nativeQuery = true ) - fun findAndLockLatestActualPartition(): String? + fun findAndLockLatestPartition(): String? + + @Query( + """ + select tqp.id + from $TX_OBSERVER_SCHEMA_NAME.tx_queue_partition tqp + where tqp.paused_on_tx_id is null + and exists( + select * from $TX_OBSERVER_SCHEMA_NAME.enqueued_tx + where partition_id = tqp.id + and status = 'NEW' + ) + order by tqp.priority desc + for update of tqp skip locked + limit 1 + """, + nativeQuery = true, + ) + fun findAndLockRandomPartition(): String? @Query( """ diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionPollerConfig.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionPollerConfig.kt index cdf5516..fd272a3 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionPollerConfig.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionPollerConfig.kt @@ -2,11 +2,11 @@ package com.wavesenterprise.we.tx.observer.starter import com.wavesenterprise.we.tx.observer.core.spring.executor.AppContextPollingTxSubscriber import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionPoller -import com.wavesenterprise.we.tx.observer.core.spring.partition.DefaultLatestTxPartitionPoller -import com.wavesenterprise.we.tx.observer.core.spring.partition.ErrorHandlingLatestTxPartitionPoller -import com.wavesenterprise.we.tx.observer.core.spring.partition.LatestTxPartitionPoller +import com.wavesenterprise.we.tx.observer.core.spring.partition.DefaultTxPartitionPoller +import com.wavesenterprise.we.tx.observer.core.spring.partition.ErrorHandlingTxPartitionPoller import com.wavesenterprise.we.tx.observer.core.spring.partition.PartitionHandler import com.wavesenterprise.we.tx.observer.core.spring.partition.PollingTxSubscriber +import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPoller import com.wavesenterprise.we.tx.observer.jpa.config.TxObserverJpaConfig import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository @@ -34,30 +34,33 @@ class PartitionPollerConfig { @Bean fun scheduledPartitionPoller( - errorHandlingLatestTxPartitionPoller: LatestTxPartitionPoller, + errorHandlingTxPartitionPoller: TxPartitionPoller, ): ScheduledPartitionPoller = ScheduledPartitionPoller( - errorHandlingLatestTxPartitionPoller = errorHandlingLatestTxPartitionPoller, + errorHandlingTxPartitionPoller = errorHandlingTxPartitionPoller, ) @Bean - fun errorHandlingLatestTxPartitionPoller( - defaultLatestTxPartitionPoller: LatestTxPartitionPoller, + fun errorHandlingTxPartitionPoller( + defaultTxPartitionPoller: TxPartitionPoller, partitionHandler: PartitionHandler, - ): LatestTxPartitionPoller = ErrorHandlingLatestTxPartitionPoller( - defaultLatestTxPartitionPoller = defaultLatestTxPartitionPoller, + ): TxPartitionPoller = ErrorHandlingTxPartitionPoller( + defaultTxPartitionPoller = defaultTxPartitionPoller, partitionHandler = partitionHandler, ) @Bean - fun defaultLatestTxPartitionPoller( + fun defaultTxPartitionPoller( txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository, + enqueuedTxJpaRepository: EnqueuedTxJpaRepository, pollingTxSubscriber: PollingTxSubscriber, partitionHandler: PartitionHandler, partitionPollerProperties: PartitionPollerProperties, - ): LatestTxPartitionPoller = DefaultLatestTxPartitionPoller( + ): TxPartitionPoller = DefaultTxPartitionPoller( txQueuePartitionJpaRepository = txQueuePartitionJpaRepository, + enqueuedTxJpaRepository = enqueuedTxJpaRepository, pollingTxSubscriber = pollingTxSubscriber, partitionHandler = partitionHandler, + partitionPollerProperties = partitionPollerProperties, ) @Bean diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/PartitionPollerProperties.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/PartitionPollerProperties.kt index 6c534f4..7cbffd1 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/PartitionPollerProperties.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/PartitionPollerProperties.kt @@ -17,4 +17,6 @@ data class PartitionPollerProperties( override var fixedDelay: Duration, @DefaultValue("4") override var threadCount: Int, + @DefaultValue("200") + override var accelerateAtQueueSize: Long, ) : PartitionPollerConfig diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultLatestTxPartitionPollerTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultTxPartitionPollerTest.kt similarity index 57% rename from we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultLatestTxPartitionPollerTest.kt rename to we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultTxPartitionPollerTest.kt index 55dc45c..0139508 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultLatestTxPartitionPollerTest.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultTxPartitionPollerTest.kt @@ -2,9 +2,12 @@ package com.wavesenterprise.we.tx.observer.starter.observer.executor import com.wavesenterprise.we.tx.observer.api.BlockListenerException import com.wavesenterprise.we.tx.observer.api.PartitionHandlingException -import com.wavesenterprise.we.tx.observer.core.spring.partition.DefaultLatestTxPartitionPoller +import com.wavesenterprise.we.tx.observer.core.spring.partition.DefaultTxPartitionPoller import com.wavesenterprise.we.tx.observer.core.spring.partition.PartitionHandler import com.wavesenterprise.we.tx.observer.core.spring.partition.PollingTxSubscriber +import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionPollerConfig +import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus +import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository import io.mockk.confirmVerified import io.mockk.every @@ -15,6 +18,7 @@ import io.mockk.junit5.MockKExtension import io.mockk.verify import io.mockk.verifyOrder import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.junit.jupiter.api.extension.ExtendWith @@ -24,7 +28,7 @@ import org.junit.jupiter.params.provider.MethodSource import java.lang.IllegalArgumentException @ExtendWith(MockKExtension::class) -internal class DefaultLatestTxPartitionPollerTest { +internal class DefaultTxPartitionPollerTest { @RelaxedMockK lateinit var txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository @@ -35,19 +39,50 @@ internal class DefaultLatestTxPartitionPollerTest { @MockK lateinit var pollingTxSubscriber: PollingTxSubscriber + @MockK + lateinit var enqueuedTxJpaRepository: EnqueuedTxJpaRepository + + @MockK + lateinit var partitionPollerProperties: PartitionPollerConfig + @InjectMockKs - lateinit var defaultLatestTxPartitionPoller: DefaultLatestTxPartitionPoller + lateinit var defaultTxPartitionPoller: DefaultTxPartitionPoller + + private val accelerateAtQueueSize = 200L + + @BeforeEach + fun init() { + every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns accelerateAtQueueSize - 1 + every { partitionPollerProperties.accelerateAtQueueSize } returns accelerateAtQueueSize + } @Test fun `should get latest actual partition and handle it with success`() { val partitionId = "partId" + every { partitionPollerProperties.accelerateAtQueueSize } returns accelerateAtQueueSize + every { pollingTxSubscriber.dequeuePartitionAndSendToSubscribers(any()) } returns 1 + every { txQueuePartitionJpaRepository.findAndLockLatestPartition() } returns partitionId + + defaultTxPartitionPoller.pollPartition() + + verifyOrder { + txQueuePartitionJpaRepository.findAndLockLatestPartition() + pollingTxSubscriber.dequeuePartitionAndSendToSubscribers(partitionId) + } + confirmVerified(txQueuePartitionJpaRepository, partitionHandler) + } + + @Test + fun `should get random actual partition and handle it with success`() { + val partitionId = "partId" + every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns accelerateAtQueueSize + 1 every { pollingTxSubscriber.dequeuePartitionAndSendToSubscribers(any()) } returns 1 - every { txQueuePartitionJpaRepository.findAndLockLatestActualPartition() } returns partitionId + every { txQueuePartitionJpaRepository.findAndLockRandomPartition() } returns partitionId - defaultLatestTxPartitionPoller.pollLatestActualPartition() + defaultTxPartitionPoller.pollPartition() verifyOrder { - txQueuePartitionJpaRepository.findAndLockLatestActualPartition() + txQueuePartitionJpaRepository.findAndLockRandomPartition() pollingTxSubscriber.dequeuePartitionAndSendToSubscribers(partitionId) } confirmVerified(txQueuePartitionJpaRepository, partitionHandler) @@ -55,12 +90,12 @@ internal class DefaultLatestTxPartitionPollerTest { @Test fun `should do nothing when having no actual partitions`() { - every { txQueuePartitionJpaRepository.findAndLockLatestActualPartition() } returns null + every { txQueuePartitionJpaRepository.findAndLockLatestPartition() } returns null - defaultLatestTxPartitionPoller.pollLatestActualPartition() + defaultTxPartitionPoller.pollPartition() verify(exactly = 1) { - txQueuePartitionJpaRepository.findAndLockLatestActualPartition() + txQueuePartitionJpaRepository.findAndLockLatestPartition() } confirmVerified(txQueuePartitionJpaRepository, pollingTxSubscriber, partitionHandler) } @@ -71,20 +106,20 @@ internal class DefaultLatestTxPartitionPollerTest { exception: Exception ) { val partitionId = "partId" - every { txQueuePartitionJpaRepository.findAndLockLatestActualPartition() } returns partitionId + every { txQueuePartitionJpaRepository.findAndLockLatestPartition() } returns partitionId every { pollingTxSubscriber.dequeuePartitionAndSendToSubscribers(partitionId) } throws exception val partitionHandlingException = - assertThrows { defaultLatestTxPartitionPoller.pollLatestActualPartition() } + assertThrows { defaultTxPartitionPoller.pollPartition() } partitionHandlingException.also { assertEquals(partitionId, it.partitionId) assertEquals(exception, it.cause) } verifyOrder { - txQueuePartitionJpaRepository.findAndLockLatestActualPartition() + txQueuePartitionJpaRepository.findAndLockLatestPartition() pollingTxSubscriber.dequeuePartitionAndSendToSubscribers(partitionId) } confirmVerified(txQueuePartitionJpaRepository, pollingTxSubscriber, partitionHandler) @@ -93,15 +128,15 @@ internal class DefaultLatestTxPartitionPollerTest { @Test fun `should handle empty partition`() { val partitionId = "partId" - every { txQueuePartitionJpaRepository.findAndLockLatestActualPartition() } returns partitionId + every { txQueuePartitionJpaRepository.findAndLockLatestPartition() } returns partitionId every { pollingTxSubscriber.dequeuePartitionAndSendToSubscribers(partitionId) } returns 0 - defaultLatestTxPartitionPoller.pollLatestActualPartition() + defaultTxPartitionPoller.pollPartition() verifyOrder { - txQueuePartitionJpaRepository.findAndLockLatestActualPartition() + txQueuePartitionJpaRepository.findAndLockLatestPartition() pollingTxSubscriber.dequeuePartitionAndSendToSubscribers(partitionId) partitionHandler.handleEmptyPartition(partitionId) } diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ErrorHandlingLatestTxPartitionPollerIntegrationTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ErrorHandlingTxPartitionPollerIntegrationTest.kt similarity index 95% rename from we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ErrorHandlingLatestTxPartitionPollerIntegrationTest.kt rename to we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ErrorHandlingTxPartitionPollerIntegrationTest.kt index 6c5c180..dfc51dc 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ErrorHandlingLatestTxPartitionPollerIntegrationTest.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ErrorHandlingTxPartitionPollerIntegrationTest.kt @@ -5,8 +5,8 @@ import com.wavesenterprise.sdk.node.client.http.tx.CreateContractTxDto.Companion import com.wavesenterprise.sdk.node.test.data.TestDataFactory import com.wavesenterprise.we.flyway.starter.FlywaySchemaConfiguration import com.wavesenterprise.we.tx.observer.api.BlockListenerException -import com.wavesenterprise.we.tx.observer.core.spring.partition.LatestTxPartitionPoller import com.wavesenterprise.we.tx.observer.core.spring.partition.PollingTxSubscriber +import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPoller import com.wavesenterprise.we.tx.observer.domain.TxQueuePartition import com.wavesenterprise.we.tx.observer.jpa.TxObserverJpaAutoConfig import com.wavesenterprise.we.tx.observer.jpa.config.TxObserverJpaConfig @@ -48,7 +48,7 @@ import javax.persistence.PersistenceContext ] ) @AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE) -internal class ErrorHandlingLatestTxPartitionPollerIntegrationTest { +internal class ErrorHandlingTxPartitionPollerIntegrationTest { @Autowired lateinit var txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository @@ -57,7 +57,7 @@ internal class ErrorHandlingLatestTxPartitionPollerIntegrationTest { lateinit var enqueuedTxJpaRepository: EnqueuedTxJpaRepository @Autowired - lateinit var errorHandlingLatestTxPartitionPoller: LatestTxPartitionPoller + lateinit var errorHandlingTxPartitionPoller: TxPartitionPoller @SpykBean lateinit var pollingTxSubscriber: PollingTxSubscriber @@ -104,7 +104,7 @@ internal class ErrorHandlingLatestTxPartitionPollerIntegrationTest { TestTransaction.flagForCommit() TestTransaction.end() - repeat(3) { errorHandlingLatestTxPartitionPoller.pollLatestActualPartition() } + repeat(3) { errorHandlingTxPartitionPoller.pollPartition() } verify { errorPartitions.forEach { diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ScheduledPartitionPollerTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ScheduledPartitionPollerTest.kt index 85ac98d..21ef3ad 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ScheduledPartitionPollerTest.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ScheduledPartitionPollerTest.kt @@ -1,7 +1,7 @@ package com.wavesenterprise.we.tx.observer.starter.observer.executor import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionPoller -import com.wavesenterprise.we.tx.observer.core.spring.partition.LatestTxPartitionPoller +import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPoller import io.mockk.confirmVerified import io.mockk.every import io.mockk.impl.annotations.InjectMockKs @@ -15,7 +15,7 @@ import org.junit.jupiter.api.extension.ExtendWith internal class ScheduledPartitionPollerTest { @MockK - lateinit var errorHandlingLatestTxPartitionPoller: LatestTxPartitionPoller + lateinit var errorHandlingTxPartitionPoller: TxPartitionPoller @InjectMockKs lateinit var scheduledPartitionPoller: ScheduledPartitionPoller @@ -23,14 +23,14 @@ internal class ScheduledPartitionPollerTest { @Test fun `should poll latest actual partition while they are available`() { every { - errorHandlingLatestTxPartitionPoller.pollLatestActualPartition() + errorHandlingTxPartitionPoller.pollPartition() } returnsMany listOf("id1", "id2", "id3", null, "id4") scheduledPartitionPoller.pollWhileHavingActivePartitions() verify(exactly = 4) { - errorHandlingLatestTxPartitionPoller.pollLatestActualPartition() + errorHandlingTxPartitionPoller.pollPartition() } - confirmVerified(errorHandlingLatestTxPartitionPoller) + confirmVerified(errorHandlingTxPartitionPoller) } } diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/jpa/TxQueuePartitionJpaRepositoryTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/jpa/TxQueuePartitionJpaRepositoryTest.kt index 53c8406..88c9c82 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/jpa/TxQueuePartitionJpaRepositoryTest.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/jpa/TxQueuePartitionJpaRepositoryTest.kt @@ -18,7 +18,6 @@ import com.wavesenterprise.we.tx.observer.starter.observer.config.NodeBlockingSe import com.wavesenterprise.we.tx.observer.starter.observer.util.ModelFactory.enqueuedTx import org.hamcrest.CoreMatchers.`is` import org.hamcrest.MatcherAssert.assertThat -import org.hamcrest.Matchers.contains import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.Test @@ -158,11 +157,49 @@ internal class TxQueuePartitionJpaRepositoryTest { } flushAndClear() - txQueuePartitionJpaRepository.findAndLockLatestActualPartition().also { + txQueuePartitionJpaRepository.findAndLockLatestPartition().also { assertEquals(firstTxQueuePartition.first.id, it) } } + @Test + fun `should find actual partition`() { + val firstTxQueuePartition = partitionWithNewTx( + id = "firstTxQueuePartitionId", + priority = 0, + enqueuedTxTimestamp = OffsetDateTime.of( + LocalDate.now(), + LocalTime.of(11, 0, 0), + ZoneOffset.UTC + ) + ) + val secondTxQueuePartition = partitionWithNewTx( + id = "secondTxQueuePartitionId", + priority = 0, + enqueuedTxTimestamp = OffsetDateTime.of( + LocalDate.now(), + LocalTime.of(12, 0, 0), + ZoneOffset.UTC + ) + ) + setOf( + firstTxQueuePartition, + secondTxQueuePartition, + ).apply { + map { it.first }.also { txQueuePartitionJpaRepository.saveAll(it) } + map { it.second }.also { enqueuedTxJpaRepository.saveAll(it) } + } + flushAndClear() + + txQueuePartitionJpaRepository.findAndLockRandomPartition().also { + if (firstTxQueuePartition.first.id == it) { + assertEquals(firstTxQueuePartition.first.id, it) + } else { + assertEquals(secondTxQueuePartition.first.id, it) + } + } + } + @ParameterizedTest @MethodSource("foundPartitions") fun `should find specified partitions`( @@ -172,7 +209,7 @@ internal class TxQueuePartitionJpaRepositoryTest { txQueuePartitionJpaRepository.saveAndFlush(partition) enqueuedTxJpaRepository.saveAndFlush(enqueuedTx) - assertEquals(partition.id, txQueuePartitionJpaRepository.findAndLockLatestActualPartition()) + assertEquals(partition.id, txQueuePartitionJpaRepository.findAndLockLatestPartition()) } @ParameterizedTest @@ -184,7 +221,7 @@ internal class TxQueuePartitionJpaRepositoryTest { txQueuePartitionJpaRepository.saveAndFlush(partition) enqueuedTx?.also { enqueuedTxJpaRepository.saveAndFlush(enqueuedTx) } - assertNull(txQueuePartitionJpaRepository.findAndLockLatestActualPartition()) + assertNull(txQueuePartitionJpaRepository.findAndLockLatestPartition()) } companion object { From e69907368cae70ca0b266e316f62341b2d4582d2 Mon Sep 17 00:00:00 2001 From: dgeorgiev Date: Thu, 2 May 2024 14:32:14 +0300 Subject: [PATCH 03/10] [WTCH-228] Do native countByStatus in EnqueuedTxJpaRepository --- .../tx/observer/jpa/repository/EnqueuedTxJpaRepository.kt | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/EnqueuedTxJpaRepository.kt b/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/EnqueuedTxJpaRepository.kt index 8a04a55..a042673 100644 --- a/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/EnqueuedTxJpaRepository.kt +++ b/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/EnqueuedTxJpaRepository.kt @@ -19,6 +19,12 @@ import javax.persistence.LockModeType @Repository interface EnqueuedTxJpaRepository : JpaRepository, JpaSpecificationExecutor { + @Query( + value = """ + select count(*) from $TX_OBSERVER_SCHEMA_NAME.enqueued_tx tx where tx.status = :#{#enqueuedTxStatus.name()} + """, + nativeQuery = true + ) fun countByStatus(enqueuedTxStatus: EnqueuedTxStatus): Long @Query("select count(tx) from EnqueuedTx tx where status = 'NEW' and txType = 114") From e90c344a5e1982babb9c161fdbb0467359b07dde Mon Sep 17 00:00:00 2001 From: dgeorgiev Date: Fri, 3 May 2024 07:45:06 +0000 Subject: [PATCH 04/10] [WTCH-229] Add TxPartitionPollerAccelerationHelper for caching method isAccelerationRequired() --- .../we/tx/observer/api/cache/CacheConsts.kt | 3 ++ .../partition/DefaultTxPartitionPoller.kt | 9 +--- ...aultTxPartitionPollerAccelerationHelper.kt | 16 ++++++ .../TxPartitionPollerAccelerationHelper.kt | 5 ++ .../we-tx-observer-starter/build.gradle.kts | 2 + .../tx/observer/starter/JpaExecutorsConfig.kt | 2 + .../observer/starter/PartitionPollerConfig.kt | 15 +++++- .../DefaultTimeBasedCacheConfiguration.kt | 37 +++++++++++++ ...TxPartitionPollerAccelerationHelperTest.kt | 52 +++++++++++++++++++ .../executor/DefaultTxPartitionPollerTest.kt | 13 ++--- 10 files changed, 136 insertions(+), 18 deletions(-) create mode 100644 we-tx-observer-module/we-tx-observer-api/src/main/kotlin/com/wavesenterprise/we/tx/observer/api/cache/CacheConsts.kt create mode 100644 we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPollerAccelerationHelper.kt create mode 100644 we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/TxPartitionPollerAccelerationHelper.kt create mode 100644 we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/cache/DefaultTimeBasedCacheConfiguration.kt create mode 100644 we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/cache/DefaultTxPartitionPollerAccelerationHelperTest.kt diff --git a/we-tx-observer-module/we-tx-observer-api/src/main/kotlin/com/wavesenterprise/we/tx/observer/api/cache/CacheConsts.kt b/we-tx-observer-module/we-tx-observer-api/src/main/kotlin/com/wavesenterprise/we/tx/observer/api/cache/CacheConsts.kt new file mode 100644 index 0000000..e1cf864 --- /dev/null +++ b/we-tx-observer-module/we-tx-observer-api/src/main/kotlin/com/wavesenterprise/we/tx/observer/api/cache/CacheConsts.kt @@ -0,0 +1,3 @@ +package com.wavesenterprise.we.tx.observer.api.cache + +const val DEFAULT_TIME_BASED_CACHE_MANAGER_NAME = "defaultTimeBasedCacheManager" diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPoller.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPoller.kt index badaacf..058fe35 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPoller.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPoller.kt @@ -2,8 +2,6 @@ package com.wavesenterprise.we.tx.observer.core.spring.partition import com.wavesenterprise.we.tx.observer.api.BlockListenerException import com.wavesenterprise.we.tx.observer.api.PartitionHandlingException -import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionPollerConfig -import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository import org.slf4j.Logger @@ -15,14 +13,14 @@ open class DefaultTxPartitionPoller( val enqueuedTxJpaRepository: EnqueuedTxJpaRepository, val pollingTxSubscriber: PollingTxSubscriber, val partitionHandler: PartitionHandler, - val partitionPollerProperties: PartitionPollerConfig, + val txPartitionPollerAccelerationHelper: TxPartitionPollerAccelerationHelper, ) : TxPartitionPoller { private val logger: Logger = LoggerFactory.getLogger(DefaultTxPartitionPoller::class.java) @Transactional override fun pollPartition(): String? { - val partitionId = if (isAccelerationRequired()) { + val partitionId = if (txPartitionPollerAccelerationHelper.isAccelerationRequired()) { txQueuePartitionJpaRepository.findAndLockRandomPartition() } else { txQueuePartitionJpaRepository.findAndLockLatestPartition() @@ -50,7 +48,4 @@ open class DefaultTxPartitionPoller( throw PartitionHandlingException(partitionId, e) } } - - private fun isAccelerationRequired(): Boolean = - enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) >= partitionPollerProperties.accelerateAtQueueSize } diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPollerAccelerationHelper.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPollerAccelerationHelper.kt new file mode 100644 index 0000000..9bac356 --- /dev/null +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPollerAccelerationHelper.kt @@ -0,0 +1,16 @@ +package com.wavesenterprise.we.tx.observer.core.spring.partition + +import com.wavesenterprise.we.tx.observer.api.cache.DEFAULT_TIME_BASED_CACHE_MANAGER_NAME +import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionPollerConfig +import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus +import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository +import org.springframework.cache.annotation.Cacheable + +open class DefaultTxPartitionPollerAccelerationHelper( + val enqueuedTxJpaRepository: EnqueuedTxJpaRepository, + val partitionPollerProperties: PartitionPollerConfig, +) : TxPartitionPollerAccelerationHelper { + @Cacheable("isAccelerationRequired", cacheManager = DEFAULT_TIME_BASED_CACHE_MANAGER_NAME) + override fun isAccelerationRequired(): Boolean = + enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) >= partitionPollerProperties.accelerateAtQueueSize +} diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/TxPartitionPollerAccelerationHelper.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/TxPartitionPollerAccelerationHelper.kt new file mode 100644 index 0000000..5b62f48 --- /dev/null +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/TxPartitionPollerAccelerationHelper.kt @@ -0,0 +1,5 @@ +package com.wavesenterprise.we.tx.observer.core.spring.partition + +interface TxPartitionPollerAccelerationHelper { + fun isAccelerationRequired(): Boolean +} diff --git a/we-tx-observer-module/we-tx-observer-starter/build.gradle.kts b/we-tx-observer-module/we-tx-observer-starter/build.gradle.kts index 3a8660d..22404e0 100644 --- a/we-tx-observer-module/we-tx-observer-starter/build.gradle.kts +++ b/we-tx-observer-module/we-tx-observer-starter/build.gradle.kts @@ -12,6 +12,8 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter") implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-data-jpa") + implementation("org.springframework.boot:spring-boot-starter-cache") + implementation("com.github.ben-manes.caffeine:caffeine") implementation("org.flywaydb:flyway-core") implementation("com.wavesenterprise:we-flyway-starter") diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/JpaExecutorsConfig.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/JpaExecutorsConfig.kt index 0f13d80..3c3c2b8 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/JpaExecutorsConfig.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/JpaExecutorsConfig.kt @@ -1,6 +1,7 @@ package com.wavesenterprise.we.tx.observer.starter import com.wavesenterprise.we.tx.observer.common.conditional.ConditionalOnJpaMode +import com.wavesenterprise.we.tx.observer.starter.cache.DefaultTimeBasedCacheConfiguration import com.wavesenterprise.we.tx.observer.starter.lock.LockConfig import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Import @@ -17,5 +18,6 @@ import org.springframework.context.annotation.Import TxObserverSchedulerConfig::class, LockConfig::class, PartitionPausedOnTxIdCleanerConfig::class, + DefaultTimeBasedCacheConfiguration::class, ) class JpaExecutorsConfig diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionPollerConfig.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionPollerConfig.kt index fd272a3..105829b 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionPollerConfig.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionPollerConfig.kt @@ -3,10 +3,12 @@ package com.wavesenterprise.we.tx.observer.starter import com.wavesenterprise.we.tx.observer.core.spring.executor.AppContextPollingTxSubscriber import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionPoller import com.wavesenterprise.we.tx.observer.core.spring.partition.DefaultTxPartitionPoller +import com.wavesenterprise.we.tx.observer.core.spring.partition.DefaultTxPartitionPollerAccelerationHelper import com.wavesenterprise.we.tx.observer.core.spring.partition.ErrorHandlingTxPartitionPoller import com.wavesenterprise.we.tx.observer.core.spring.partition.PartitionHandler import com.wavesenterprise.we.tx.observer.core.spring.partition.PollingTxSubscriber import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPoller +import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPollerAccelerationHelper import com.wavesenterprise.we.tx.observer.jpa.config.TxObserverJpaConfig import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository @@ -48,19 +50,28 @@ class PartitionPollerConfig { partitionHandler = partitionHandler, ) + @Bean + fun txPartitionPollerAccelerationHelper( + enqueuedTxJpaRepository: EnqueuedTxJpaRepository, + partitionPollerProperties: PartitionPollerProperties, + ): TxPartitionPollerAccelerationHelper = DefaultTxPartitionPollerAccelerationHelper( + enqueuedTxJpaRepository = enqueuedTxJpaRepository, + partitionPollerProperties = partitionPollerProperties, + ) + @Bean fun defaultTxPartitionPoller( txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository, enqueuedTxJpaRepository: EnqueuedTxJpaRepository, pollingTxSubscriber: PollingTxSubscriber, partitionHandler: PartitionHandler, - partitionPollerProperties: PartitionPollerProperties, + txPartitionPollerAccelerationHelper: TxPartitionPollerAccelerationHelper, ): TxPartitionPoller = DefaultTxPartitionPoller( txQueuePartitionJpaRepository = txQueuePartitionJpaRepository, enqueuedTxJpaRepository = enqueuedTxJpaRepository, pollingTxSubscriber = pollingTxSubscriber, partitionHandler = partitionHandler, - partitionPollerProperties = partitionPollerProperties, + txPartitionPollerAccelerationHelper = txPartitionPollerAccelerationHelper, ) @Bean diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/cache/DefaultTimeBasedCacheConfiguration.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/cache/DefaultTimeBasedCacheConfiguration.kt new file mode 100644 index 0000000..d097c02 --- /dev/null +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/cache/DefaultTimeBasedCacheConfiguration.kt @@ -0,0 +1,37 @@ +package com.wavesenterprise.we.tx.observer.starter.cache + +import com.github.benmanes.caffeine.cache.Caffeine +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.boot.context.properties.ConstructorBinding +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.boot.context.properties.bind.DefaultValue +import org.springframework.boot.convert.DurationUnit +import org.springframework.cache.CacheManager +import org.springframework.cache.annotation.EnableCaching +import org.springframework.cache.caffeine.CaffeineCacheManager +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import java.time.Duration +import java.time.temporal.ChronoUnit + +@Configuration +@EnableCaching +@EnableConfigurationProperties(DefaultTimeBasedCacheProperties::class) +class DefaultTimeBasedCacheConfiguration(val defaultTimeBasedCacheProperties: DefaultTimeBasedCacheProperties) { + @Bean + fun defaultTimeBasedCacheManager(): CacheManager = + CaffeineCacheManager().apply { + setCaffeine( + Caffeine.newBuilder() + .expireAfterWrite(defaultTimeBasedCacheProperties.expireAfterWrite) + ) + } +} + +@ConfigurationProperties("cache.default-time-based") +@ConstructorBinding +data class DefaultTimeBasedCacheProperties( + @DurationUnit(ChronoUnit.SECONDS) + @DefaultValue("60s") + var expireAfterWrite: Duration +) diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/cache/DefaultTxPartitionPollerAccelerationHelperTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/cache/DefaultTxPartitionPollerAccelerationHelperTest.kt new file mode 100644 index 0000000..234813a --- /dev/null +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/cache/DefaultTxPartitionPollerAccelerationHelperTest.kt @@ -0,0 +1,52 @@ +package com.wavesenterprise.we.tx.observer.starter.observer.cache + +import com.ninjasquad.springmockk.MockkBean +import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPollerAccelerationHelper +import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus +import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository +import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository +import com.wavesenterprise.we.tx.observer.starter.PartitionPollerConfig +import com.wavesenterprise.we.tx.observer.starter.cache.DefaultTimeBasedCacheConfiguration +import com.wavesenterprise.we.tx.observer.starter.properties.PartitionPollerProperties +import io.mockk.every +import io.mockk.verify +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.context.annotation.Profile +import org.springframework.test.context.ContextConfiguration + +@ContextConfiguration( + classes = [ + DefaultTimeBasedCacheConfiguration::class, + PartitionPollerConfig::class + ] +) +@SpringBootTest +@Profile("test") +class DefaultTxPartitionPollerAccelerationHelperTest { + + @MockkBean(relaxed = true) + private lateinit var txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository + + @MockkBean + private lateinit var enqueuedTxJpaRepository: EnqueuedTxJpaRepository + + @MockkBean + private lateinit var partitionPollerProperties: PartitionPollerProperties + + @Autowired + private lateinit var txPartitionPollerAccelerationHelper: TxPartitionPollerAccelerationHelper + + @Test + fun `should use cache of query`() { + val newEnqueuedTxCount = 100L + every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns newEnqueuedTxCount + every { partitionPollerProperties.accelerateAtQueueSize } returns newEnqueuedTxCount + + val firstResult = txPartitionPollerAccelerationHelper.isAccelerationRequired() + assertEquals(firstResult, txPartitionPollerAccelerationHelper.isAccelerationRequired()) + verify(exactly = 1) { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } + } +} diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultTxPartitionPollerTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultTxPartitionPollerTest.kt index 0139508..08bb180 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultTxPartitionPollerTest.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultTxPartitionPollerTest.kt @@ -5,8 +5,7 @@ import com.wavesenterprise.we.tx.observer.api.PartitionHandlingException import com.wavesenterprise.we.tx.observer.core.spring.partition.DefaultTxPartitionPoller import com.wavesenterprise.we.tx.observer.core.spring.partition.PartitionHandler import com.wavesenterprise.we.tx.observer.core.spring.partition.PollingTxSubscriber -import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionPollerConfig -import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus +import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPollerAccelerationHelper import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository import io.mockk.confirmVerified @@ -43,23 +42,19 @@ internal class DefaultTxPartitionPollerTest { lateinit var enqueuedTxJpaRepository: EnqueuedTxJpaRepository @MockK - lateinit var partitionPollerProperties: PartitionPollerConfig + lateinit var txPartitionPollerAccelerationHelper: TxPartitionPollerAccelerationHelper @InjectMockKs lateinit var defaultTxPartitionPoller: DefaultTxPartitionPoller - private val accelerateAtQueueSize = 200L - @BeforeEach fun init() { - every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns accelerateAtQueueSize - 1 - every { partitionPollerProperties.accelerateAtQueueSize } returns accelerateAtQueueSize + every { txPartitionPollerAccelerationHelper.isAccelerationRequired() } returns false } @Test fun `should get latest actual partition and handle it with success`() { val partitionId = "partId" - every { partitionPollerProperties.accelerateAtQueueSize } returns accelerateAtQueueSize every { pollingTxSubscriber.dequeuePartitionAndSendToSubscribers(any()) } returns 1 every { txQueuePartitionJpaRepository.findAndLockLatestPartition() } returns partitionId @@ -75,7 +70,7 @@ internal class DefaultTxPartitionPollerTest { @Test fun `should get random actual partition and handle it with success`() { val partitionId = "partId" - every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns accelerateAtQueueSize + 1 + every { txPartitionPollerAccelerationHelper.isAccelerationRequired() } returns true every { pollingTxSubscriber.dequeuePartitionAndSendToSubscribers(any()) } returns 1 every { txQueuePartitionJpaRepository.findAndLockRandomPartition() } returns partitionId From 31d2dbc0334f806d289af19bec61e1cf65830064 Mon Sep 17 00:00:00 2001 From: dgridnev Date: Mon, 6 May 2024 17:42:25 +0300 Subject: [PATCH 05/10] [WTCH-230] Pause sync at too big queue size --- .../executor/syncinfo/SyncInfoServiceImpl.kt | 9 ++- .../spring/properties/TxObserverConfig.kt | 1 + .../executor/syncinfo/SyncInfoConfig.kt | 6 +- .../properties/TxObserverProperties.kt | 1 + .../executor/blockinfo/SyncInfoServiceTest.kt | 60 ++++++++++++++++--- 5 files changed, 66 insertions(+), 11 deletions(-) diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/syncinfo/SyncInfoServiceImpl.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/syncinfo/SyncInfoServiceImpl.kt index 317a196..1b01d77 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/syncinfo/SyncInfoServiceImpl.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/syncinfo/SyncInfoServiceImpl.kt @@ -7,7 +7,9 @@ import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.BlockSea import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.BlockSearchResult.NotFound import com.wavesenterprise.we.tx.observer.core.spring.metrics.MetricsContainer import com.wavesenterprise.we.tx.observer.domain.BlockHeightInfo +import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus import com.wavesenterprise.we.tx.observer.jpa.repository.BlockHeightJpaRepository +import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import org.slf4j.debug import org.slf4j.info import org.slf4j.lazyLogger @@ -16,6 +18,7 @@ import java.lang.Long.max class SyncInfoServiceImpl( private val blockHeightJpaRepository: BlockHeightJpaRepository, + private val enqueuedTxJpaRepository: EnqueuedTxJpaRepository, private val blockHistoryService: BlockHistoryService, private val blocksService: BlocksService, private val syncHistory: SyncHistoryProperties, @@ -26,7 +29,8 @@ class SyncInfoServiceImpl( ) : SyncInfoService { data class SyncHistoryProperties( val enabled: Boolean, - val fromHeight: Long + val fromHeight: Long, + val pauseSyncAtQueueSize: Long, ) private val log by lazyLogger(SyncInfoServiceImpl::class) @@ -49,6 +53,9 @@ class SyncInfoServiceImpl( override fun syncInfo(): SyncInfo { val nodeHeight = blocksService.blockHeight() + if (enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) >= syncHistory.pauseSyncAtQueueSize) { + return blockInfoSingleRecord.toSyncInfo(nodeHeight) + } val blockInfo = blockInfoSingleRecord.updated(nodeHeight) log.debug { "Current node block height = $nodeHeight; current block height in repo = ${blockInfo.currentHeight}" diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/TxObserverConfig.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/TxObserverConfig.kt index 8b015e9..35be3fe 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/TxObserverConfig.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/TxObserverConfig.kt @@ -11,6 +11,7 @@ interface TxObserverConfig { var activationHeight: Long var blockHeightWindow: Long var syncHistory: Boolean + var pauseSyncAtQueueSize: Long var blockHistoryDepth: Int var forkNotResolvedHeightDrop: Long var blockHistoryCleanDelay: Duration diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/executor/syncinfo/SyncInfoConfig.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/executor/syncinfo/SyncInfoConfig.kt index d65f9da..daf54ef 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/executor/syncinfo/SyncInfoConfig.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/executor/syncinfo/SyncInfoConfig.kt @@ -12,6 +12,7 @@ import com.wavesenterprise.we.tx.observer.core.spring.metrics.MetricContainerDat import com.wavesenterprise.we.tx.observer.core.spring.metrics.MetricsContainer import com.wavesenterprise.we.tx.observer.jpa.repository.BlockHeightJpaRepository import com.wavesenterprise.we.tx.observer.jpa.repository.BlockHistoryRepository +import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.starter.properties.TxObserverProperties import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean @@ -25,6 +26,7 @@ class SyncInfoConfig( private val blocksService: BlocksService, private val txObserverProperties: TxObserverProperties, private val blockHeightJpaRepository: BlockHeightJpaRepository, + private val enqueuedTxJpaRepository: EnqueuedTxJpaRepository, private val blockHistoryRepository: BlockHistoryRepository, ) { @@ -36,11 +38,13 @@ class SyncInfoConfig( ): SyncInfoService = SyncInfoServiceImpl( blockHeightJpaRepository = blockHeightJpaRepository, + enqueuedTxJpaRepository = enqueuedTxJpaRepository, blockHistoryService = blockHistoryService, blocksService = blocksService, syncHistory = SyncInfoServiceImpl.SyncHistoryProperties( enabled = txObserverProperties.syncHistory, - fromHeight = txObserverProperties.activationHeight + fromHeight = txObserverProperties.activationHeight, + pauseSyncAtQueueSize = txObserverProperties.pauseSyncAtQueueSize, ), autoResetHeight = txObserverProperties.autoResetHeight, forkNotResolvedHeightDrop = txObserverProperties.forkNotResolvedHeightDrop, diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt index 4f9d443..4026bfb 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt @@ -24,6 +24,7 @@ data class TxObserverProperties( override var activationHeight: Long = 1, override var blockHeightWindow: Long = 99, override var syncHistory: Boolean = true, + override var pauseSyncAtQueueSize: Long = 10000, override var blockHistoryDepth: Int = 100, override var forkNotResolvedHeightDrop: Long = 10, @DurationUnit(ChronoUnit.MILLIS) diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/blockinfo/SyncInfoServiceTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/blockinfo/SyncInfoServiceTest.kt index 81d912f..480c4a7 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/blockinfo/SyncInfoServiceTest.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/blockinfo/SyncInfoServiceTest.kt @@ -9,15 +9,20 @@ import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfo import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfoServiceImpl import com.wavesenterprise.we.tx.observer.core.spring.metrics.MetricsContainer import com.wavesenterprise.we.tx.observer.domain.BlockHeightInfo +import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus import com.wavesenterprise.we.tx.observer.jpa.repository.BlockHeightJpaRepository +import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.starter.observer.util.ModelFactory.blockAtHeight import io.mockk.every import io.mockk.impl.annotations.MockK import io.mockk.impl.annotations.RelaxedMockK import io.mockk.junit5.MockKExtension +import io.mockk.verify import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource @ExtendWith(MockKExtension::class) class SyncInfoServiceTest { @@ -27,6 +32,9 @@ class SyncInfoServiceTest { @MockK lateinit var blockHeightJpaRepository: BlockHeightJpaRepository + @MockK + lateinit var enqueuedTxJpaRepository: EnqueuedTxJpaRepository + @RelaxedMockK lateinit var blockHistoryService: BlockHistoryService @@ -36,10 +44,38 @@ class SyncInfoServiceTest { @RelaxedMockK lateinit var observerHeightMetric: MetricsContainer + @ParameterizedTest + @ValueSource(longs = [10000L, 10001L]) + fun `should pause sync when NEW transactions count greater than or equal to pauseSyncAtQueueSize`( + newEnqueuedTxCount: Long, + ) { + val observerHeight = 100L + val nodeHeight = 1000L + val prevBlockSignature = Signature.fromByteArray("VJFGHc".toByteArray()) + every { blocksService.blockHeight() } returns Height(nodeHeight) + every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns newEnqueuedTxCount + every { blockHeightJpaRepository.findAll() } returns listOf( + BlockHeightInfo( + currentHeight = observerHeight, + prevBlockSignature = prevBlockSignature.asBase58String() + ) + ) + + val syncInfo = syncInfo() + + assertThat(syncInfo.nodeHeight.value).isEqualTo(nodeHeight) + assertThat(syncInfo.observerHeight.value).isEqualTo(observerHeight) + assertThat(syncInfo.prevBlockSignature).isEqualTo(prevBlockSignature) + verify(exactly = 0) { blockHeightJpaRepository.save(any()) } + } + @Test fun `should start with activation height if enabled and observer height = activation height`() { val activationHeight = 20L val signature = Signature.fromByteArray("VJFGHc".toByteArray()) + every { blocksService.blockHeight() } returns Height(100L) + every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns 100L + every { blockHeightJpaRepository.save(any()) } answers { firstArg() } every { blockHeightJpaRepository.findAll() } returns emptyList() every { blocksService.blockAtHeight(activationHeight - 1) } returns blockAtHeight( signature = signature @@ -55,6 +91,9 @@ class SyncInfoServiceTest { fun `should reset height to stable node height if enabled and node height is less than observer height`() { val signature = Signature.fromByteArray("JiLqI92".toByteArray()) val nodeHeight = 57L + every { blocksService.blockHeight() } returns Height(nodeHeight) + every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns 100L + every { blockHeightJpaRepository.save(any()) } answers { firstArg() } every { blockHeightJpaRepository.findAll() } returns listOf( BlockHeightInfo( currentHeight = nodeHeight + 1, @@ -65,7 +104,7 @@ class SyncInfoServiceTest { signature = signature ) - val syncInfo = syncInfo(nodeHeight = nodeHeight, autoResetHeight = true) + val syncInfo = syncInfo(autoResetHeight = true) assertThat(syncInfo.observerHeight.value).isEqualTo(nodeHeight) assertThat(syncInfo.prevBlockSignature).isEqualTo(signature) @@ -74,8 +113,11 @@ class SyncInfoServiceTest { @Test fun `should use observer height if no need to reset or sync`() { val observerHeight = 130L - val signature = Signature.fromByteArray("hiD50hVO".toByteArray()) val nodeHeight = 180L + val signature = Signature.fromByteArray("hiD50hVO".toByteArray()) + every { blocksService.blockHeight() } returns Height(nodeHeight) + every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns 100L + every { blockHeightJpaRepository.save(any()) } answers { firstArg() } every { blockHeightJpaRepository.findAll() } returns listOf( BlockHeightInfo( currentHeight = observerHeight, @@ -84,26 +126,23 @@ class SyncInfoServiceTest { ) every { blocksService.blockAtHeight(nodeHeight) } returns blockAtHeight() - val syncInfo = syncInfo(nodeHeight = nodeHeight) + val syncInfo = syncInfo() assertThat(syncInfo.observerHeight.value).isEqualTo(observerHeight) assertThat(syncInfo.prevBlockSignature).isEqualTo(signature) } private fun syncInfo( - nodeHeight: Long = 100L, activationHeight: Long = 1L, syncHistory: Boolean = true, + pauseSyncAtQueueSize: Long = 10000L, autoResetHeight: Boolean = false, forkNotResolvedHeightDrop: Long = 10, ): SyncInfo { - every { blocksService.blockHeight() } returns Height(nodeHeight) - every { blockHeightJpaRepository.save(any()) } answers { - arg(0) - } return syncInfoService( activationHeight, syncHistory, + pauseSyncAtQueueSize, autoResetHeight, forkNotResolvedHeightDrop, ).syncInfo() @@ -112,15 +151,18 @@ class SyncInfoServiceTest { private fun syncInfoService( activationHeight: Long, syncHistory: Boolean, + pauseSyncAtQueueSize: Long, autoResetHeight: Boolean, forkNotResolvedHeightDrop: Long, ): SyncInfoService = SyncInfoServiceImpl( blockHeightJpaRepository = blockHeightJpaRepository, + enqueuedTxJpaRepository = enqueuedTxJpaRepository, blockHistoryService = blockHistoryService, blocksService = blocksService, syncHistory = SyncInfoServiceImpl.SyncHistoryProperties( enabled = syncHistory, - fromHeight = activationHeight + fromHeight = activationHeight, + pauseSyncAtQueueSize = pauseSyncAtQueueSize, ), autoResetHeight = autoResetHeight, forkNotResolvedHeightDrop = forkNotResolvedHeightDrop, From e01b57a38ec3dea47aa4c15fde47ac4794cd5af1 Mon Sep 17 00:00:00 2001 From: dgridnev Date: Mon, 6 May 2024 16:21:14 +0000 Subject: [PATCH 06/10] Revert "Merge branch 'feature/WTCH-230' into 'dev'" --- .../executor/syncinfo/SyncInfoServiceImpl.kt | 9 +-- .../spring/properties/TxObserverConfig.kt | 1 - .../executor/syncinfo/SyncInfoConfig.kt | 6 +- .../properties/TxObserverProperties.kt | 1 - .../executor/blockinfo/SyncInfoServiceTest.kt | 60 +++---------------- 5 files changed, 11 insertions(+), 66 deletions(-) diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/syncinfo/SyncInfoServiceImpl.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/syncinfo/SyncInfoServiceImpl.kt index 1b01d77..317a196 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/syncinfo/SyncInfoServiceImpl.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/syncinfo/SyncInfoServiceImpl.kt @@ -7,9 +7,7 @@ import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.BlockSea import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.BlockSearchResult.NotFound import com.wavesenterprise.we.tx.observer.core.spring.metrics.MetricsContainer import com.wavesenterprise.we.tx.observer.domain.BlockHeightInfo -import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus import com.wavesenterprise.we.tx.observer.jpa.repository.BlockHeightJpaRepository -import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import org.slf4j.debug import org.slf4j.info import org.slf4j.lazyLogger @@ -18,7 +16,6 @@ import java.lang.Long.max class SyncInfoServiceImpl( private val blockHeightJpaRepository: BlockHeightJpaRepository, - private val enqueuedTxJpaRepository: EnqueuedTxJpaRepository, private val blockHistoryService: BlockHistoryService, private val blocksService: BlocksService, private val syncHistory: SyncHistoryProperties, @@ -29,8 +26,7 @@ class SyncInfoServiceImpl( ) : SyncInfoService { data class SyncHistoryProperties( val enabled: Boolean, - val fromHeight: Long, - val pauseSyncAtQueueSize: Long, + val fromHeight: Long ) private val log by lazyLogger(SyncInfoServiceImpl::class) @@ -53,9 +49,6 @@ class SyncInfoServiceImpl( override fun syncInfo(): SyncInfo { val nodeHeight = blocksService.blockHeight() - if (enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) >= syncHistory.pauseSyncAtQueueSize) { - return blockInfoSingleRecord.toSyncInfo(nodeHeight) - } val blockInfo = blockInfoSingleRecord.updated(nodeHeight) log.debug { "Current node block height = $nodeHeight; current block height in repo = ${blockInfo.currentHeight}" diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/TxObserverConfig.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/TxObserverConfig.kt index 35be3fe..8b015e9 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/TxObserverConfig.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/TxObserverConfig.kt @@ -11,7 +11,6 @@ interface TxObserverConfig { var activationHeight: Long var blockHeightWindow: Long var syncHistory: Boolean - var pauseSyncAtQueueSize: Long var blockHistoryDepth: Int var forkNotResolvedHeightDrop: Long var blockHistoryCleanDelay: Duration diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/executor/syncinfo/SyncInfoConfig.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/executor/syncinfo/SyncInfoConfig.kt index daf54ef..d65f9da 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/executor/syncinfo/SyncInfoConfig.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/executor/syncinfo/SyncInfoConfig.kt @@ -12,7 +12,6 @@ import com.wavesenterprise.we.tx.observer.core.spring.metrics.MetricContainerDat import com.wavesenterprise.we.tx.observer.core.spring.metrics.MetricsContainer import com.wavesenterprise.we.tx.observer.jpa.repository.BlockHeightJpaRepository import com.wavesenterprise.we.tx.observer.jpa.repository.BlockHistoryRepository -import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.starter.properties.TxObserverProperties import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean @@ -26,7 +25,6 @@ class SyncInfoConfig( private val blocksService: BlocksService, private val txObserverProperties: TxObserverProperties, private val blockHeightJpaRepository: BlockHeightJpaRepository, - private val enqueuedTxJpaRepository: EnqueuedTxJpaRepository, private val blockHistoryRepository: BlockHistoryRepository, ) { @@ -38,13 +36,11 @@ class SyncInfoConfig( ): SyncInfoService = SyncInfoServiceImpl( blockHeightJpaRepository = blockHeightJpaRepository, - enqueuedTxJpaRepository = enqueuedTxJpaRepository, blockHistoryService = blockHistoryService, blocksService = blocksService, syncHistory = SyncInfoServiceImpl.SyncHistoryProperties( enabled = txObserverProperties.syncHistory, - fromHeight = txObserverProperties.activationHeight, - pauseSyncAtQueueSize = txObserverProperties.pauseSyncAtQueueSize, + fromHeight = txObserverProperties.activationHeight ), autoResetHeight = txObserverProperties.autoResetHeight, forkNotResolvedHeightDrop = txObserverProperties.forkNotResolvedHeightDrop, diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt index 4026bfb..4f9d443 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt @@ -24,7 +24,6 @@ data class TxObserverProperties( override var activationHeight: Long = 1, override var blockHeightWindow: Long = 99, override var syncHistory: Boolean = true, - override var pauseSyncAtQueueSize: Long = 10000, override var blockHistoryDepth: Int = 100, override var forkNotResolvedHeightDrop: Long = 10, @DurationUnit(ChronoUnit.MILLIS) diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/blockinfo/SyncInfoServiceTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/blockinfo/SyncInfoServiceTest.kt index 480c4a7..81d912f 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/blockinfo/SyncInfoServiceTest.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/blockinfo/SyncInfoServiceTest.kt @@ -9,20 +9,15 @@ import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfo import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfoServiceImpl import com.wavesenterprise.we.tx.observer.core.spring.metrics.MetricsContainer import com.wavesenterprise.we.tx.observer.domain.BlockHeightInfo -import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus import com.wavesenterprise.we.tx.observer.jpa.repository.BlockHeightJpaRepository -import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.starter.observer.util.ModelFactory.blockAtHeight import io.mockk.every import io.mockk.impl.annotations.MockK import io.mockk.impl.annotations.RelaxedMockK import io.mockk.junit5.MockKExtension -import io.mockk.verify import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource @ExtendWith(MockKExtension::class) class SyncInfoServiceTest { @@ -32,9 +27,6 @@ class SyncInfoServiceTest { @MockK lateinit var blockHeightJpaRepository: BlockHeightJpaRepository - @MockK - lateinit var enqueuedTxJpaRepository: EnqueuedTxJpaRepository - @RelaxedMockK lateinit var blockHistoryService: BlockHistoryService @@ -44,38 +36,10 @@ class SyncInfoServiceTest { @RelaxedMockK lateinit var observerHeightMetric: MetricsContainer - @ParameterizedTest - @ValueSource(longs = [10000L, 10001L]) - fun `should pause sync when NEW transactions count greater than or equal to pauseSyncAtQueueSize`( - newEnqueuedTxCount: Long, - ) { - val observerHeight = 100L - val nodeHeight = 1000L - val prevBlockSignature = Signature.fromByteArray("VJFGHc".toByteArray()) - every { blocksService.blockHeight() } returns Height(nodeHeight) - every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns newEnqueuedTxCount - every { blockHeightJpaRepository.findAll() } returns listOf( - BlockHeightInfo( - currentHeight = observerHeight, - prevBlockSignature = prevBlockSignature.asBase58String() - ) - ) - - val syncInfo = syncInfo() - - assertThat(syncInfo.nodeHeight.value).isEqualTo(nodeHeight) - assertThat(syncInfo.observerHeight.value).isEqualTo(observerHeight) - assertThat(syncInfo.prevBlockSignature).isEqualTo(prevBlockSignature) - verify(exactly = 0) { blockHeightJpaRepository.save(any()) } - } - @Test fun `should start with activation height if enabled and observer height = activation height`() { val activationHeight = 20L val signature = Signature.fromByteArray("VJFGHc".toByteArray()) - every { blocksService.blockHeight() } returns Height(100L) - every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns 100L - every { blockHeightJpaRepository.save(any()) } answers { firstArg() } every { blockHeightJpaRepository.findAll() } returns emptyList() every { blocksService.blockAtHeight(activationHeight - 1) } returns blockAtHeight( signature = signature @@ -91,9 +55,6 @@ class SyncInfoServiceTest { fun `should reset height to stable node height if enabled and node height is less than observer height`() { val signature = Signature.fromByteArray("JiLqI92".toByteArray()) val nodeHeight = 57L - every { blocksService.blockHeight() } returns Height(nodeHeight) - every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns 100L - every { blockHeightJpaRepository.save(any()) } answers { firstArg() } every { blockHeightJpaRepository.findAll() } returns listOf( BlockHeightInfo( currentHeight = nodeHeight + 1, @@ -104,7 +65,7 @@ class SyncInfoServiceTest { signature = signature ) - val syncInfo = syncInfo(autoResetHeight = true) + val syncInfo = syncInfo(nodeHeight = nodeHeight, autoResetHeight = true) assertThat(syncInfo.observerHeight.value).isEqualTo(nodeHeight) assertThat(syncInfo.prevBlockSignature).isEqualTo(signature) @@ -113,11 +74,8 @@ class SyncInfoServiceTest { @Test fun `should use observer height if no need to reset or sync`() { val observerHeight = 130L - val nodeHeight = 180L val signature = Signature.fromByteArray("hiD50hVO".toByteArray()) - every { blocksService.blockHeight() } returns Height(nodeHeight) - every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns 100L - every { blockHeightJpaRepository.save(any()) } answers { firstArg() } + val nodeHeight = 180L every { blockHeightJpaRepository.findAll() } returns listOf( BlockHeightInfo( currentHeight = observerHeight, @@ -126,23 +84,26 @@ class SyncInfoServiceTest { ) every { blocksService.blockAtHeight(nodeHeight) } returns blockAtHeight() - val syncInfo = syncInfo() + val syncInfo = syncInfo(nodeHeight = nodeHeight) assertThat(syncInfo.observerHeight.value).isEqualTo(observerHeight) assertThat(syncInfo.prevBlockSignature).isEqualTo(signature) } private fun syncInfo( + nodeHeight: Long = 100L, activationHeight: Long = 1L, syncHistory: Boolean = true, - pauseSyncAtQueueSize: Long = 10000L, autoResetHeight: Boolean = false, forkNotResolvedHeightDrop: Long = 10, ): SyncInfo { + every { blocksService.blockHeight() } returns Height(nodeHeight) + every { blockHeightJpaRepository.save(any()) } answers { + arg(0) + } return syncInfoService( activationHeight, syncHistory, - pauseSyncAtQueueSize, autoResetHeight, forkNotResolvedHeightDrop, ).syncInfo() @@ -151,18 +112,15 @@ class SyncInfoServiceTest { private fun syncInfoService( activationHeight: Long, syncHistory: Boolean, - pauseSyncAtQueueSize: Long, autoResetHeight: Boolean, forkNotResolvedHeightDrop: Long, ): SyncInfoService = SyncInfoServiceImpl( blockHeightJpaRepository = blockHeightJpaRepository, - enqueuedTxJpaRepository = enqueuedTxJpaRepository, blockHistoryService = blockHistoryService, blocksService = blocksService, syncHistory = SyncInfoServiceImpl.SyncHistoryProperties( enabled = syncHistory, - fromHeight = activationHeight, - pauseSyncAtQueueSize = pauseSyncAtQueueSize, + fromHeight = activationHeight ), autoResetHeight = autoResetHeight, forkNotResolvedHeightDrop = forkNotResolvedHeightDrop, From a3b70514204866ad6a1702d503c1d1290072c00e Mon Sep 17 00:00:00 2001 From: dgridnev Date: Mon, 6 May 2024 19:53:47 +0300 Subject: [PATCH 07/10] [WTCH-231] Pause sync at too big queue size --- .../poller/ScheduledBlockInfoSynchronizer.kt | 10 +++++++++ .../spring/properties/TxObserverConfig.kt | 1 + .../poller/PollerBlockSourceConfiguration.kt | 4 ++++ .../properties/TxObserverProperties.kt | 1 + ...BlockInfoSynchronizerOptimisticLockTest.kt | 6 ++++++ .../ScheduledWeBlockInfoSynchronizerTest.kt | 21 +++++++++++++++++++ 6 files changed, 43 insertions(+) diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/poller/ScheduledBlockInfoSynchronizer.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/poller/ScheduledBlockInfoSynchronizer.kt index e972605..a48fcbc 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/poller/ScheduledBlockInfoSynchronizer.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/poller/ScheduledBlockInfoSynchronizer.kt @@ -2,6 +2,8 @@ package com.wavesenterprise.we.tx.observer.core.spring.executor.poller import com.wavesenterprise.we.tx.observer.common.tx.executor.TxExecutor import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfoService +import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus +import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import net.javacrumbs.shedlock.spring.annotation.SchedulerLock @@ -13,6 +15,8 @@ import java.lang.Long.min open class ScheduledBlockInfoSynchronizer( private val sourceExecutor: SourceExecutor, private val syncInfoService: SyncInfoService, + private val enqueuedTxJpaRepository: EnqueuedTxJpaRepository, + private val pauseSyncAtQueueSize: Long, private val liquidBlockPollingDelay: Long, private val blockHeightWindow: Long, private val txExecutor: TxExecutor, @@ -24,6 +28,9 @@ open class ScheduledBlockInfoSynchronizer( name = "syncNodeBlockInfo_task", ) open fun syncNodeBlockInfo() { + if (pauseSyncRequired()) { + return + } val blockSyncInfo = txExecutor.requiresNew { syncInfoService.syncInfo() } @@ -33,6 +40,9 @@ open class ScheduledBlockInfoSynchronizer( syncNodeBlockInfo(startHeight, nodeHeight.value) } + private fun pauseSyncRequired(): Boolean = + enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) >= pauseSyncAtQueueSize + private fun stableNodeHeight(nodeHeight: Long) = nodeHeight - 1 diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/TxObserverConfig.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/TxObserverConfig.kt index 8b015e9..35be3fe 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/TxObserverConfig.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/TxObserverConfig.kt @@ -11,6 +11,7 @@ interface TxObserverConfig { var activationHeight: Long var blockHeightWindow: Long var syncHistory: Boolean + var pauseSyncAtQueueSize: Long var blockHistoryDepth: Int var forkNotResolvedHeightDrop: Long var blockHistoryCleanDelay: Duration diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/executor/poller/PollerBlockSourceConfiguration.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/executor/poller/PollerBlockSourceConfiguration.kt index 72e4fe6..9c49155 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/executor/poller/PollerBlockSourceConfiguration.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/executor/poller/PollerBlockSourceConfiguration.kt @@ -9,6 +9,7 @@ import com.wavesenterprise.we.tx.observer.core.spring.executor.poller.ScheduledB import com.wavesenterprise.we.tx.observer.core.spring.executor.poller.SourceExecutor import com.wavesenterprise.we.tx.observer.core.spring.executor.poller.SourceExecutorImpl import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfoService +import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.starter.executor.BLOCK_SOURCE_MODE import com.wavesenterprise.we.tx.observer.starter.properties.TxObserverProperties import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty @@ -36,10 +37,13 @@ class PollerBlockSourceConfiguration( fun scheduledSourceExecutor( sourceExecutor: SourceExecutor, txExecutor: TxExecutor, + enqueuedTxJpaRepository: EnqueuedTxJpaRepository, ) = ScheduledBlockInfoSynchronizer( sourceExecutor = sourceExecutor, syncInfoService = syncInfoService, + enqueuedTxJpaRepository = enqueuedTxJpaRepository, + pauseSyncAtQueueSize = txObserverProperties.pauseSyncAtQueueSize, blockHeightWindow = txObserverProperties.blockHeightWindow, liquidBlockPollingDelay = txObserverProperties.liquidBlockPollingDelay, txExecutor = txExecutor, diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt index 4f9d443..4026bfb 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt @@ -24,6 +24,7 @@ data class TxObserverProperties( override var activationHeight: Long = 1, override var blockHeightWindow: Long = 99, override var syncHistory: Boolean = true, + override var pauseSyncAtQueueSize: Long = 10000, override var blockHistoryDepth: Int = 100, override var forkNotResolvedHeightDrop: Long = 10, @DurationUnit(ChronoUnit.MILLIS) diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/poller/ScheduledWeBlockInfoSynchronizerOptimisticLockTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/poller/ScheduledWeBlockInfoSynchronizerOptimisticLockTest.kt index 4db213b..daa017b 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/poller/ScheduledWeBlockInfoSynchronizerOptimisticLockTest.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/poller/ScheduledWeBlockInfoSynchronizerOptimisticLockTest.kt @@ -9,6 +9,7 @@ import com.wavesenterprise.we.tx.observer.core.spring.executor.poller.SourceExec import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfoService import com.wavesenterprise.we.tx.observer.jpa.TxObserverJpaAutoConfig import com.wavesenterprise.we.tx.observer.jpa.config.TxObserverJpaConfig +import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.starter.observer.config.NodeBlockingServiceFactoryMockConfiguration import io.mockk.every import org.junit.jupiter.api.Assertions.assertNull @@ -47,6 +48,9 @@ class ScheduledWeBlockInfoSynchronizerOptimisticLockTest { @MockkBean lateinit var sourceExecutor: SourceExecutor + @Autowired + lateinit var enqueuedTxJpaRepository: EnqueuedTxJpaRepository + @Autowired lateinit var syncInfoService: SyncInfoService @@ -67,6 +71,8 @@ class ScheduledWeBlockInfoSynchronizerOptimisticLockTest { sourceExecutor = sourceExecutor, txExecutor = txExecutor, syncInfoService = syncInfoService, + enqueuedTxJpaRepository = enqueuedTxJpaRepository, + pauseSyncAtQueueSize = 10000L, liquidBlockPollingDelay = 1, blockHeightWindow = 1 ) diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/poller/ScheduledWeBlockInfoSynchronizerTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/poller/ScheduledWeBlockInfoSynchronizerTest.kt index e220106..c2968cd 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/poller/ScheduledWeBlockInfoSynchronizerTest.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/poller/ScheduledWeBlockInfoSynchronizerTest.kt @@ -6,11 +6,15 @@ import com.wavesenterprise.we.tx.observer.core.spring.executor.poller.ScheduledB import com.wavesenterprise.we.tx.observer.core.spring.executor.poller.SourceExecutor import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfo import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfoService +import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus +import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.starter.observer.util.TxExecutorStub +import io.mockk.called import io.mockk.every import io.mockk.impl.annotations.MockK import io.mockk.junit5.MockKExtension import io.mockk.mockk +import io.mockk.verify import io.mockk.verifyOrder import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -26,6 +30,10 @@ internal class ScheduledWeBlockInfoSynchronizerTest { @MockK lateinit var syncInfoService: SyncInfoService + @MockK + lateinit var enqueuedTxJpaRepository: EnqueuedTxJpaRepository + + private val pauseSyncAtQueueSize = 100L private val blockHeightWindow = 10L private lateinit var scheduledBlockInfoSynchronizer: ScheduledBlockInfoSynchronizer @@ -35,12 +43,23 @@ internal class ScheduledWeBlockInfoSynchronizerTest { scheduledBlockInfoSynchronizer = ScheduledBlockInfoSynchronizer( sourceExecutor = sourceExecutor, syncInfoService = syncInfoService, + enqueuedTxJpaRepository = enqueuedTxJpaRepository, + pauseSyncAtQueueSize = pauseSyncAtQueueSize, liquidBlockPollingDelay = 0, blockHeightWindow = blockHeightWindow, txExecutor = TxExecutorStub ) } + @Test + fun `should pause sync when queue size too big`() { + every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns pauseSyncAtQueueSize + + scheduledBlockInfoSynchronizer.syncNodeBlockInfo() + + verify { syncInfoService wasNot called } + } + @Test fun `should sync from observer height to node height using window`() { val observerHeight = 3L @@ -51,6 +70,7 @@ internal class ScheduledWeBlockInfoSynchronizerTest { } every { syncInfoService.syncInfo() } returns syncInfo every { syncInfoService.syncedTo(any(), any()) } returns Unit + every { enqueuedTxJpaRepository.countByStatus(any()) } returns pauseSyncAtQueueSize - 1 every { sourceExecutor.execute(any(), any()) } answers { arg(1) as Long + 1 } scheduledBlockInfoSynchronizer.syncNodeBlockInfo() @@ -75,6 +95,7 @@ internal class ScheduledWeBlockInfoSynchronizerTest { } every { syncInfoService.syncInfo() } returns syncInfo every { syncInfoService.syncedTo(any(), any()) } returns Unit + every { enqueuedTxJpaRepository.countByStatus(any()) } returns pauseSyncAtQueueSize - 1 every { sourceExecutor.execute(any(), any()) } answers { arg(1) as Long } scheduledBlockInfoSynchronizer.syncNodeBlockInfo() From 745c0df0331937fac4a85f602451940835de7aea Mon Sep 17 00:00:00 2001 From: dgridnev Date: Mon, 6 May 2024 20:06:38 +0300 Subject: [PATCH 08/10] [WTCH-232] Disable block synchronization while receiving queue status --- .../core/spring/web/service/TxQueueStatusServiceImpl.kt | 9 ++------- .../we/tx/observer/starter/TxQueueConfig.kt | 4 ---- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/web/service/TxQueueStatusServiceImpl.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/web/service/TxQueueStatusServiceImpl.kt index ff94c4f..f5ec6d5 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/web/service/TxQueueStatusServiceImpl.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/web/service/TxQueueStatusServiceImpl.kt @@ -1,7 +1,5 @@ package com.wavesenterprise.we.tx.observer.core.spring.web.service -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.wavesenterprise.sdk.node.client.blocking.blocks.BlocksService import com.wavesenterprise.sdk.node.client.blocking.node.NodeBlockingServiceFactory import com.wavesenterprise.sdk.node.client.blocking.tx.TxService @@ -12,7 +10,6 @@ import com.wavesenterprise.sdk.node.domain.tx.TxInfo import com.wavesenterprise.we.tx.observer.api.block.WeBlockInfo import com.wavesenterprise.we.tx.observer.api.block.subscriber.BlockSubscriber import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfoService -import com.wavesenterprise.we.tx.observer.core.spring.partition.TxQueuePartitionResolveService import com.wavesenterprise.we.tx.observer.core.spring.web.dto.PatchTxApiDto import com.wavesenterprise.we.tx.observer.core.spring.web.dto.PrivacyStatusApiDto import com.wavesenterprise.we.tx.observer.core.spring.web.dto.QueueStatusApiDto @@ -30,10 +27,8 @@ open class TxQueueStatusServiceImpl( val nodeBlockingServiceFactory: NodeBlockingServiceFactory, private val syncInfoService: SyncInfoService, val enqueuedTxJpaRepository: EnqueuedTxJpaRepository, - val txQueuePartitionResolveService: TxQueuePartitionResolveService, val enqueueingBlockSubscriber: BlockSubscriber, val errorPriorityOffset: Int, - private val objectMapper: ObjectMapper = jacksonObjectMapper(), // todo ) : TxQueueService { private val txService: TxService = nodeBlockingServiceFactory.txService() @@ -43,12 +38,12 @@ open class TxQueueStatusServiceImpl( override fun getQueueStatus(): QueueStatusApiDto { val nodeHeight = blocksService.blockHeight() - val observerHeight = syncInfoService.syncInfo().observerHeight + val observerHeight = syncInfoService.observerHeight() val queueHeight = enqueuedTxJpaRepository.findMinHeightForStatus(EnqueuedTxStatus.NEW) val queueSize = enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) return QueueStatusApiDto( nodeHeight = nodeHeight.value, - observerHeight = observerHeight.value, + observerHeight = observerHeight, queueHeight = queueHeight, queueSize = queueSize, privacyStatusApiDto = PrivacyStatusApiDto( diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/TxQueueConfig.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/TxQueueConfig.kt index fff7fa3..7615f8a 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/TxQueueConfig.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/TxQueueConfig.kt @@ -1,6 +1,5 @@ package com.wavesenterprise.we.tx.observer.starter -import com.fasterxml.jackson.databind.ObjectMapper import com.wavesenterprise.sdk.node.client.blocking.node.NodeBlockingServiceFactory import com.wavesenterprise.we.tx.observer.api.block.subscriber.BlockSubscriber import com.wavesenterprise.we.tx.observer.api.partition.TxQueuePartitionResolver @@ -63,15 +62,12 @@ class TxQueueConfig { @Qualifier("enqueueingBlockSubscriber") enqueueingBlockSubscriber: BlockSubscriber, txObserverProperties: TxObserverProperties, - objectMapper: ObjectMapper, ): TxQueueService = TxQueueStatusServiceImpl( nodeBlockingServiceFactory = nodeBlockingServiceFactory, syncInfoService = syncInfoService, enqueuedTxJpaRepository = enqueuedTxJpaRepository, - txQueuePartitionResolveService = txQueuePartitionResolveService, enqueueingBlockSubscriber = enqueueingBlockSubscriber, errorPriorityOffset = txObserverProperties.errorPriorityOffset, - objectMapper = objectMapper, ) @Bean From ba86f59e19f39cd954c6ffe4493c2340440ae5a2 Mon Sep 17 00:00:00 2001 From: dgeorgiev Date: Tue, 7 May 2024 10:27:09 +0000 Subject: [PATCH 09/10] [WTCH-233] Remove cycle `while` from sync node blocks in job --- .../poller/ScheduledBlockInfoSynchronizer.kt | 2 +- .../properties/TxObserverProperties.kt | 2 +- .../ScheduledWeBlockInfoSynchronizerTest.kt | 23 +++---------------- 3 files changed, 5 insertions(+), 22 deletions(-) diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/poller/ScheduledBlockInfoSynchronizer.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/poller/ScheduledBlockInfoSynchronizer.kt index a48fcbc..5e0a73b 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/poller/ScheduledBlockInfoSynchronizer.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/poller/ScheduledBlockInfoSynchronizer.kt @@ -48,7 +48,7 @@ open class ScheduledBlockInfoSynchronizer( private fun syncNodeBlockInfo(startHeight: Long, nodeHeight: Long) { var syncedToHeight = startHeight - while (syncedToHeight <= nodeHeight) { + if (syncedToHeight <= nodeHeight) { try { syncedToHeight = sync( syncedToHeight, diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt index 4026bfb..260540b 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt @@ -17,7 +17,7 @@ data class TxObserverProperties( @DefaultValue("true") override var enabled: Boolean, override var queueMode: String = "JPA", - @DurationUnit(ChronoUnit.MILLIS) @DefaultValue("2s") + @DurationUnit(ChronoUnit.MILLIS) @DefaultValue("200ms") override var fixedDelay: Duration, @DataSizeUnit(DataUnit.MEGABYTES) override var blockSizeWindow: DataSize = DataSize.ofMegabytes(10), diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/poller/ScheduledWeBlockInfoSynchronizerTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/poller/ScheduledWeBlockInfoSynchronizerTest.kt index c2968cd..304c093 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/poller/ScheduledWeBlockInfoSynchronizerTest.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/poller/ScheduledWeBlockInfoSynchronizerTest.kt @@ -2,7 +2,6 @@ package com.wavesenterprise.we.tx.observer.starter.observer.executor.poller import com.wavesenterprise.sdk.node.domain.Height import com.wavesenterprise.we.tx.observer.core.spring.executor.poller.ScheduledBlockInfoSynchronizer -import com.wavesenterprise.we.tx.observer.core.spring.executor.poller.ScheduledBlockInfoSynchronizer.Companion.OFFSET import com.wavesenterprise.we.tx.observer.core.spring.executor.poller.SourceExecutor import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfo import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfoService @@ -19,7 +18,6 @@ import io.mockk.verifyOrder import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith -import java.lang.Long.min @ExtendWith(MockKExtension::class) internal class ScheduledWeBlockInfoSynchronizerTest { @@ -61,7 +59,7 @@ internal class ScheduledWeBlockInfoSynchronizerTest { } @Test - fun `should sync from observer height to node height using window`() { + fun `should sync from observer height to height plus window `() { val observerHeight = 3L val nodeHeight = 50L val syncInfo: SyncInfo = mockk { @@ -77,10 +75,8 @@ internal class ScheduledWeBlockInfoSynchronizerTest { verifyOrder { syncInfoService.syncInfo() - sectionsOfHeight(observerHeight, nodeHeight).forEach { section -> - sourceExecutor.execute(section.first, section.second) - syncInfoService.syncedTo(section.second + 1) - } + sourceExecutor.execute(observerHeight, observerHeight + blockHeightWindow) + syncInfoService.syncedTo(observerHeight + blockHeightWindow + 1) } } @@ -106,17 +102,4 @@ internal class ScheduledWeBlockInfoSynchronizerTest { syncInfoService.syncedTo(nodeHeight + 1) } } - - private fun sectionsOfHeight( - observerHeight: Long, - nodeHeight: Long - ): Sequence> = - generateSequence(observerHeight) { it + blockHeightWindow + 1 } - .map { first -> first to first + blockHeightWindow } - .takeWhile { section -> - section.first < nodeHeight - } - .map { section -> - section.first to min(section.second, nodeHeight + OFFSET) - } } From cfcab2a7194997fff79cbd6f88aa176b6a1d063d Mon Sep 17 00:00:00 2001 From: dgeorgiev Date: Tue, 7 May 2024 12:39:36 +0000 Subject: [PATCH 10/10] Revert "[WTCH-229] Add TxPartitionPollerAccelerationHelper for caching method... --- .../we/tx/observer/api/cache/CacheConsts.kt | 3 -- .../partition/DefaultTxPartitionPoller.kt | 9 +++- ...aultTxPartitionPollerAccelerationHelper.kt | 16 ------ .../TxPartitionPollerAccelerationHelper.kt | 5 -- .../we-tx-observer-starter/build.gradle.kts | 2 - .../tx/observer/starter/JpaExecutorsConfig.kt | 2 - .../observer/starter/PartitionPollerConfig.kt | 15 +----- .../DefaultTimeBasedCacheConfiguration.kt | 37 ------------- ...TxPartitionPollerAccelerationHelperTest.kt | 52 ------------------- .../executor/DefaultTxPartitionPollerTest.kt | 13 +++-- 10 files changed, 18 insertions(+), 136 deletions(-) delete mode 100644 we-tx-observer-module/we-tx-observer-api/src/main/kotlin/com/wavesenterprise/we/tx/observer/api/cache/CacheConsts.kt delete mode 100644 we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPollerAccelerationHelper.kt delete mode 100644 we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/TxPartitionPollerAccelerationHelper.kt delete mode 100644 we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/cache/DefaultTimeBasedCacheConfiguration.kt delete mode 100644 we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/cache/DefaultTxPartitionPollerAccelerationHelperTest.kt diff --git a/we-tx-observer-module/we-tx-observer-api/src/main/kotlin/com/wavesenterprise/we/tx/observer/api/cache/CacheConsts.kt b/we-tx-observer-module/we-tx-observer-api/src/main/kotlin/com/wavesenterprise/we/tx/observer/api/cache/CacheConsts.kt deleted file mode 100644 index e1cf864..0000000 --- a/we-tx-observer-module/we-tx-observer-api/src/main/kotlin/com/wavesenterprise/we/tx/observer/api/cache/CacheConsts.kt +++ /dev/null @@ -1,3 +0,0 @@ -package com.wavesenterprise.we.tx.observer.api.cache - -const val DEFAULT_TIME_BASED_CACHE_MANAGER_NAME = "defaultTimeBasedCacheManager" diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPoller.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPoller.kt index 058fe35..badaacf 100644 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPoller.kt +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPoller.kt @@ -2,6 +2,8 @@ package com.wavesenterprise.we.tx.observer.core.spring.partition import com.wavesenterprise.we.tx.observer.api.BlockListenerException import com.wavesenterprise.we.tx.observer.api.PartitionHandlingException +import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionPollerConfig +import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository import org.slf4j.Logger @@ -13,14 +15,14 @@ open class DefaultTxPartitionPoller( val enqueuedTxJpaRepository: EnqueuedTxJpaRepository, val pollingTxSubscriber: PollingTxSubscriber, val partitionHandler: PartitionHandler, - val txPartitionPollerAccelerationHelper: TxPartitionPollerAccelerationHelper, + val partitionPollerProperties: PartitionPollerConfig, ) : TxPartitionPoller { private val logger: Logger = LoggerFactory.getLogger(DefaultTxPartitionPoller::class.java) @Transactional override fun pollPartition(): String? { - val partitionId = if (txPartitionPollerAccelerationHelper.isAccelerationRequired()) { + val partitionId = if (isAccelerationRequired()) { txQueuePartitionJpaRepository.findAndLockRandomPartition() } else { txQueuePartitionJpaRepository.findAndLockLatestPartition() @@ -48,4 +50,7 @@ open class DefaultTxPartitionPoller( throw PartitionHandlingException(partitionId, e) } } + + private fun isAccelerationRequired(): Boolean = + enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) >= partitionPollerProperties.accelerateAtQueueSize } diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPollerAccelerationHelper.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPollerAccelerationHelper.kt deleted file mode 100644 index 9bac356..0000000 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/DefaultTxPartitionPollerAccelerationHelper.kt +++ /dev/null @@ -1,16 +0,0 @@ -package com.wavesenterprise.we.tx.observer.core.spring.partition - -import com.wavesenterprise.we.tx.observer.api.cache.DEFAULT_TIME_BASED_CACHE_MANAGER_NAME -import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionPollerConfig -import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus -import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository -import org.springframework.cache.annotation.Cacheable - -open class DefaultTxPartitionPollerAccelerationHelper( - val enqueuedTxJpaRepository: EnqueuedTxJpaRepository, - val partitionPollerProperties: PartitionPollerConfig, -) : TxPartitionPollerAccelerationHelper { - @Cacheable("isAccelerationRequired", cacheManager = DEFAULT_TIME_BASED_CACHE_MANAGER_NAME) - override fun isAccelerationRequired(): Boolean = - enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) >= partitionPollerProperties.accelerateAtQueueSize -} diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/TxPartitionPollerAccelerationHelper.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/TxPartitionPollerAccelerationHelper.kt deleted file mode 100644 index 5b62f48..0000000 --- a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/partition/TxPartitionPollerAccelerationHelper.kt +++ /dev/null @@ -1,5 +0,0 @@ -package com.wavesenterprise.we.tx.observer.core.spring.partition - -interface TxPartitionPollerAccelerationHelper { - fun isAccelerationRequired(): Boolean -} diff --git a/we-tx-observer-module/we-tx-observer-starter/build.gradle.kts b/we-tx-observer-module/we-tx-observer-starter/build.gradle.kts index 22404e0..3a8660d 100644 --- a/we-tx-observer-module/we-tx-observer-starter/build.gradle.kts +++ b/we-tx-observer-module/we-tx-observer-starter/build.gradle.kts @@ -12,8 +12,6 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter") implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-data-jpa") - implementation("org.springframework.boot:spring-boot-starter-cache") - implementation("com.github.ben-manes.caffeine:caffeine") implementation("org.flywaydb:flyway-core") implementation("com.wavesenterprise:we-flyway-starter") diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/JpaExecutorsConfig.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/JpaExecutorsConfig.kt index 3c3c2b8..0f13d80 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/JpaExecutorsConfig.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/JpaExecutorsConfig.kt @@ -1,7 +1,6 @@ package com.wavesenterprise.we.tx.observer.starter import com.wavesenterprise.we.tx.observer.common.conditional.ConditionalOnJpaMode -import com.wavesenterprise.we.tx.observer.starter.cache.DefaultTimeBasedCacheConfiguration import com.wavesenterprise.we.tx.observer.starter.lock.LockConfig import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Import @@ -18,6 +17,5 @@ import org.springframework.context.annotation.Import TxObserverSchedulerConfig::class, LockConfig::class, PartitionPausedOnTxIdCleanerConfig::class, - DefaultTimeBasedCacheConfiguration::class, ) class JpaExecutorsConfig diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionPollerConfig.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionPollerConfig.kt index 105829b..fd272a3 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionPollerConfig.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionPollerConfig.kt @@ -3,12 +3,10 @@ package com.wavesenterprise.we.tx.observer.starter import com.wavesenterprise.we.tx.observer.core.spring.executor.AppContextPollingTxSubscriber import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionPoller import com.wavesenterprise.we.tx.observer.core.spring.partition.DefaultTxPartitionPoller -import com.wavesenterprise.we.tx.observer.core.spring.partition.DefaultTxPartitionPollerAccelerationHelper import com.wavesenterprise.we.tx.observer.core.spring.partition.ErrorHandlingTxPartitionPoller import com.wavesenterprise.we.tx.observer.core.spring.partition.PartitionHandler import com.wavesenterprise.we.tx.observer.core.spring.partition.PollingTxSubscriber import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPoller -import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPollerAccelerationHelper import com.wavesenterprise.we.tx.observer.jpa.config.TxObserverJpaConfig import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository @@ -50,28 +48,19 @@ class PartitionPollerConfig { partitionHandler = partitionHandler, ) - @Bean - fun txPartitionPollerAccelerationHelper( - enqueuedTxJpaRepository: EnqueuedTxJpaRepository, - partitionPollerProperties: PartitionPollerProperties, - ): TxPartitionPollerAccelerationHelper = DefaultTxPartitionPollerAccelerationHelper( - enqueuedTxJpaRepository = enqueuedTxJpaRepository, - partitionPollerProperties = partitionPollerProperties, - ) - @Bean fun defaultTxPartitionPoller( txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository, enqueuedTxJpaRepository: EnqueuedTxJpaRepository, pollingTxSubscriber: PollingTxSubscriber, partitionHandler: PartitionHandler, - txPartitionPollerAccelerationHelper: TxPartitionPollerAccelerationHelper, + partitionPollerProperties: PartitionPollerProperties, ): TxPartitionPoller = DefaultTxPartitionPoller( txQueuePartitionJpaRepository = txQueuePartitionJpaRepository, enqueuedTxJpaRepository = enqueuedTxJpaRepository, pollingTxSubscriber = pollingTxSubscriber, partitionHandler = partitionHandler, - txPartitionPollerAccelerationHelper = txPartitionPollerAccelerationHelper, + partitionPollerProperties = partitionPollerProperties, ) @Bean diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/cache/DefaultTimeBasedCacheConfiguration.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/cache/DefaultTimeBasedCacheConfiguration.kt deleted file mode 100644 index d097c02..0000000 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/cache/DefaultTimeBasedCacheConfiguration.kt +++ /dev/null @@ -1,37 +0,0 @@ -package com.wavesenterprise.we.tx.observer.starter.cache - -import com.github.benmanes.caffeine.cache.Caffeine -import org.springframework.boot.context.properties.ConfigurationProperties -import org.springframework.boot.context.properties.ConstructorBinding -import org.springframework.boot.context.properties.EnableConfigurationProperties -import org.springframework.boot.context.properties.bind.DefaultValue -import org.springframework.boot.convert.DurationUnit -import org.springframework.cache.CacheManager -import org.springframework.cache.annotation.EnableCaching -import org.springframework.cache.caffeine.CaffeineCacheManager -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import java.time.Duration -import java.time.temporal.ChronoUnit - -@Configuration -@EnableCaching -@EnableConfigurationProperties(DefaultTimeBasedCacheProperties::class) -class DefaultTimeBasedCacheConfiguration(val defaultTimeBasedCacheProperties: DefaultTimeBasedCacheProperties) { - @Bean - fun defaultTimeBasedCacheManager(): CacheManager = - CaffeineCacheManager().apply { - setCaffeine( - Caffeine.newBuilder() - .expireAfterWrite(defaultTimeBasedCacheProperties.expireAfterWrite) - ) - } -} - -@ConfigurationProperties("cache.default-time-based") -@ConstructorBinding -data class DefaultTimeBasedCacheProperties( - @DurationUnit(ChronoUnit.SECONDS) - @DefaultValue("60s") - var expireAfterWrite: Duration -) diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/cache/DefaultTxPartitionPollerAccelerationHelperTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/cache/DefaultTxPartitionPollerAccelerationHelperTest.kt deleted file mode 100644 index 234813a..0000000 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/cache/DefaultTxPartitionPollerAccelerationHelperTest.kt +++ /dev/null @@ -1,52 +0,0 @@ -package com.wavesenterprise.we.tx.observer.starter.observer.cache - -import com.ninjasquad.springmockk.MockkBean -import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPollerAccelerationHelper -import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus -import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository -import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository -import com.wavesenterprise.we.tx.observer.starter.PartitionPollerConfig -import com.wavesenterprise.we.tx.observer.starter.cache.DefaultTimeBasedCacheConfiguration -import com.wavesenterprise.we.tx.observer.starter.properties.PartitionPollerProperties -import io.mockk.every -import io.mockk.verify -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.context.annotation.Profile -import org.springframework.test.context.ContextConfiguration - -@ContextConfiguration( - classes = [ - DefaultTimeBasedCacheConfiguration::class, - PartitionPollerConfig::class - ] -) -@SpringBootTest -@Profile("test") -class DefaultTxPartitionPollerAccelerationHelperTest { - - @MockkBean(relaxed = true) - private lateinit var txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository - - @MockkBean - private lateinit var enqueuedTxJpaRepository: EnqueuedTxJpaRepository - - @MockkBean - private lateinit var partitionPollerProperties: PartitionPollerProperties - - @Autowired - private lateinit var txPartitionPollerAccelerationHelper: TxPartitionPollerAccelerationHelper - - @Test - fun `should use cache of query`() { - val newEnqueuedTxCount = 100L - every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns newEnqueuedTxCount - every { partitionPollerProperties.accelerateAtQueueSize } returns newEnqueuedTxCount - - val firstResult = txPartitionPollerAccelerationHelper.isAccelerationRequired() - assertEquals(firstResult, txPartitionPollerAccelerationHelper.isAccelerationRequired()) - verify(exactly = 1) { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } - } -} diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultTxPartitionPollerTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultTxPartitionPollerTest.kt index 08bb180..0139508 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultTxPartitionPollerTest.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/DefaultTxPartitionPollerTest.kt @@ -5,7 +5,8 @@ import com.wavesenterprise.we.tx.observer.api.PartitionHandlingException import com.wavesenterprise.we.tx.observer.core.spring.partition.DefaultTxPartitionPoller import com.wavesenterprise.we.tx.observer.core.spring.partition.PartitionHandler import com.wavesenterprise.we.tx.observer.core.spring.partition.PollingTxSubscriber -import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPollerAccelerationHelper +import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionPollerConfig +import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository import io.mockk.confirmVerified @@ -42,19 +43,23 @@ internal class DefaultTxPartitionPollerTest { lateinit var enqueuedTxJpaRepository: EnqueuedTxJpaRepository @MockK - lateinit var txPartitionPollerAccelerationHelper: TxPartitionPollerAccelerationHelper + lateinit var partitionPollerProperties: PartitionPollerConfig @InjectMockKs lateinit var defaultTxPartitionPoller: DefaultTxPartitionPoller + private val accelerateAtQueueSize = 200L + @BeforeEach fun init() { - every { txPartitionPollerAccelerationHelper.isAccelerationRequired() } returns false + every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns accelerateAtQueueSize - 1 + every { partitionPollerProperties.accelerateAtQueueSize } returns accelerateAtQueueSize } @Test fun `should get latest actual partition and handle it with success`() { val partitionId = "partId" + every { partitionPollerProperties.accelerateAtQueueSize } returns accelerateAtQueueSize every { pollingTxSubscriber.dequeuePartitionAndSendToSubscribers(any()) } returns 1 every { txQueuePartitionJpaRepository.findAndLockLatestPartition() } returns partitionId @@ -70,7 +75,7 @@ internal class DefaultTxPartitionPollerTest { @Test fun `should get random actual partition and handle it with success`() { val partitionId = "partId" - every { txPartitionPollerAccelerationHelper.isAccelerationRequired() } returns true + every { enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) } returns accelerateAtQueueSize + 1 every { pollingTxSubscriber.dequeuePartitionAndSendToSubscribers(any()) } returns 1 every { txQueuePartitionJpaRepository.findAndLockRandomPartition() } returns partitionId