Skip to content

Commit

Permalink
Merge branch 'release/1.2.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
dgeorgiev committed May 7, 2024
2 parents aba60fd + a872273 commit dc6172f
Show file tree
Hide file tree
Showing 24 changed files with 222 additions and 103 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.wavesenterprise.we.tx.observer.core.spring.executor

import com.wavesenterprise.we.tx.observer.core.spring.partition.LatestTxPartitionPoller
import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPoller

open class ScheduledPartitionPoller(
private val errorHandlingLatestTxPartitionPoller: LatestTxPartitionPoller,
private val errorHandlingTxPartitionPoller: TxPartitionPoller,
) {

open fun pollWhileHavingActivePartitions() {
while (true) {
errorHandlingLatestTxPartitionPoller.pollLatestActualPartition() ?: break
errorHandlingTxPartitionPoller.pollPartition() ?: break
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.wavesenterprise.we.tx.observer.core.spring.executor.poller

import com.wavesenterprise.we.tx.observer.common.tx.executor.TxExecutor
import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfoService
import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus
import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock
Expand All @@ -13,6 +15,8 @@ import java.lang.Long.min
open class ScheduledBlockInfoSynchronizer(
private val sourceExecutor: SourceExecutor,
private val syncInfoService: SyncInfoService,
private val enqueuedTxJpaRepository: EnqueuedTxJpaRepository,
private val pauseSyncAtQueueSize: Long,
private val liquidBlockPollingDelay: Long,
private val blockHeightWindow: Long,
private val txExecutor: TxExecutor,
Expand All @@ -24,6 +28,9 @@ open class ScheduledBlockInfoSynchronizer(
name = "syncNodeBlockInfo_task",
)
open fun syncNodeBlockInfo() {
if (pauseSyncRequired()) {
return
}
val blockSyncInfo = txExecutor.requiresNew {
syncInfoService.syncInfo()
}
Expand All @@ -33,12 +40,15 @@ open class ScheduledBlockInfoSynchronizer(
syncNodeBlockInfo(startHeight, nodeHeight.value)
}

private fun pauseSyncRequired(): Boolean =
enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) >= pauseSyncAtQueueSize

private fun stableNodeHeight(nodeHeight: Long) =
nodeHeight - 1

private fun syncNodeBlockInfo(startHeight: Long, nodeHeight: Long) {
var syncedToHeight = startHeight
while (syncedToHeight <= nodeHeight) {
if (syncedToHeight <= nodeHeight) {
try {
syncedToHeight = sync(
syncedToHeight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,39 @@ package com.wavesenterprise.we.tx.observer.core.spring.partition

import com.wavesenterprise.we.tx.observer.api.BlockListenerException
import com.wavesenterprise.we.tx.observer.api.PartitionHandlingException
import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionPollerConfig
import com.wavesenterprise.we.tx.observer.domain.EnqueuedTxStatus
import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository
import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.transaction.annotation.Transactional

open class DefaultLatestTxPartitionPoller(
open class DefaultTxPartitionPoller(
val txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository,
val enqueuedTxJpaRepository: EnqueuedTxJpaRepository,
val pollingTxSubscriber: PollingTxSubscriber,
val partitionHandler: PartitionHandler,
) : LatestTxPartitionPoller {
val partitionPollerProperties: PartitionPollerConfig,
) : TxPartitionPoller {

private val logger: Logger = LoggerFactory.getLogger(DefaultLatestTxPartitionPoller::class.java)
private val logger: Logger = LoggerFactory.getLogger(DefaultTxPartitionPoller::class.java)

@Transactional
override fun pollLatestActualPartition(): String? =
txQueuePartitionJpaRepository.findAndLockLatestActualPartition()?.also {
override fun pollPartition(): String? {
val partitionId = if (isAccelerationRequired()) {
txQueuePartitionJpaRepository.findAndLockRandomPartition()
} else {
txQueuePartitionJpaRepository.findAndLockLatestPartition()
}
return partitionId?.also {
logger.debug("Started polling partition with ID = $it")
tryToDequeueTxForPartition(it)
logger.debug("Finished polling partition with ID = $it")
}.also {
it ?: logger.debug("No actual partitions found to be polled")
}
}

private fun tryToDequeueTxForPartition(partitionId: String) {
try {
Expand All @@ -39,4 +50,7 @@ open class DefaultLatestTxPartitionPoller(
throw PartitionHandlingException(partitionId, e)
}
}

private fun isAccelerationRequired(): Boolean =
enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW) >= partitionPollerProperties.accelerateAtQueueSize
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package com.wavesenterprise.we.tx.observer.core.spring.partition

import com.wavesenterprise.we.tx.observer.api.PartitionHandlingException

class ErrorHandlingLatestTxPartitionPoller(
private val defaultLatestTxPartitionPoller: LatestTxPartitionPoller,
class ErrorHandlingTxPartitionPoller(
private val defaultTxPartitionPoller: TxPartitionPoller,
private val partitionHandler: PartitionHandler,
) : LatestTxPartitionPoller {
) : TxPartitionPoller {

override fun pollLatestActualPartition(): String? = try {
defaultLatestTxPartitionPoller.pollLatestActualPartition()
override fun pollPartition(): String? = try {
defaultTxPartitionPoller.pollPartition()
} catch (ex: PartitionHandlingException) {
partitionHandler.handleErrorWhenReading(ex.partitionId)
ex.partitionId
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.wavesenterprise.we.tx.observer.core.spring.partition

interface TxPartitionPoller {
fun pollPartition(): String?
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ interface PartitionPollerConfig {
var enabled: Boolean
var fixedDelay: Duration
var threadCount: Int
var accelerateAtQueueSize: Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ interface TxObserverConfig {
var activationHeight: Long
var blockHeightWindow: Long
var syncHistory: Boolean
var pauseSyncAtQueueSize: Long
var blockHistoryDepth: Int
var forkNotResolvedHeightDrop: Long
var blockHistoryCleanDelay: Duration
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.wavesenterprise.we.tx.observer.core.spring.web.service

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.wavesenterprise.sdk.node.client.blocking.blocks.BlocksService
import com.wavesenterprise.sdk.node.client.blocking.node.NodeBlockingServiceFactory
import com.wavesenterprise.sdk.node.client.blocking.tx.TxService
Expand All @@ -12,7 +10,6 @@ import com.wavesenterprise.sdk.node.domain.tx.TxInfo
import com.wavesenterprise.we.tx.observer.api.block.WeBlockInfo
import com.wavesenterprise.we.tx.observer.api.block.subscriber.BlockSubscriber
import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfoService
import com.wavesenterprise.we.tx.observer.core.spring.partition.TxQueuePartitionResolveService
import com.wavesenterprise.we.tx.observer.core.spring.web.dto.PatchTxApiDto
import com.wavesenterprise.we.tx.observer.core.spring.web.dto.PrivacyStatusApiDto
import com.wavesenterprise.we.tx.observer.core.spring.web.dto.QueueStatusApiDto
Expand All @@ -30,10 +27,8 @@ open class TxQueueStatusServiceImpl(
val nodeBlockingServiceFactory: NodeBlockingServiceFactory,
private val syncInfoService: SyncInfoService,
val enqueuedTxJpaRepository: EnqueuedTxJpaRepository,
val txQueuePartitionResolveService: TxQueuePartitionResolveService,
val enqueueingBlockSubscriber: BlockSubscriber,
val errorPriorityOffset: Int,
private val objectMapper: ObjectMapper = jacksonObjectMapper(), // todo
) : TxQueueService {

private val txService: TxService = nodeBlockingServiceFactory.txService()
Expand All @@ -43,12 +38,12 @@ open class TxQueueStatusServiceImpl(

override fun getQueueStatus(): QueueStatusApiDto {
val nodeHeight = blocksService.blockHeight()
val observerHeight = syncInfoService.syncInfo().observerHeight
val observerHeight = syncInfoService.observerHeight()
val queueHeight = enqueuedTxJpaRepository.findMinHeightForStatus(EnqueuedTxStatus.NEW)
val queueSize = enqueuedTxJpaRepository.countByStatus(EnqueuedTxStatus.NEW)
return QueueStatusApiDto(
nodeHeight = nodeHeight.value,
observerHeight = observerHeight.value,
observerHeight = observerHeight,
queueHeight = queueHeight,
queueSize = queueSize,
privacyStatusApiDto = PrivacyStatusApiDto(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ import javax.persistence.LockModeType
@Repository
interface EnqueuedTxJpaRepository : JpaRepository<EnqueuedTx, String>, JpaSpecificationExecutor<EnqueuedTx> {

@Query(
value = """
select count(*) from $TX_OBSERVER_SCHEMA_NAME.enqueued_tx tx where tx.status = :#{#enqueuedTxStatus.name()}
""",
nativeQuery = true
)
fun countByStatus(enqueuedTxStatus: EnqueuedTxStatus): Long

@Query("select count(tx) from EnqueuedTx tx where status = 'NEW' and txType = 114")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ interface TxQueuePartitionJpaRepository :
JpaRepository<TxQueuePartition, String>,
JpaSpecificationExecutor<TxQueuePartition> {

@Query(
"""
select tqp.id from $TX_OBSERVER_SCHEMA_NAME.tx_queue_partition tqp
join $TX_OBSERVER_SCHEMA_NAME.enqueued_tx etx on etx.partition_id = tqp.id and etx.status = 'NEW'
where tqp.paused_on_tx_id is null
order by tqp.priority desc, etx.tx_timestamp
for update of tqp skip locked
limit 1
""",
nativeQuery = true
)
fun findAndLockLatestPartition(): String?

@Query(
"""
select tqp.id
Expand All @@ -27,9 +40,9 @@ interface TxQueuePartitionJpaRepository :
for update of tqp skip locked
limit 1
""",
nativeQuery = true
nativeQuery = true,
)
fun findAndLockLatestActualPartition(): String?
fun findAndLockRandomPartition(): String?

@Query(
"""
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package com.wavesenterprise.we.tx.observer.starter

import com.wavesenterprise.we.tx.observer.core.spring.executor.AppContextPollingTxSubscriber
import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionPoller
import com.wavesenterprise.we.tx.observer.core.spring.partition.DefaultLatestTxPartitionPoller
import com.wavesenterprise.we.tx.observer.core.spring.partition.ErrorHandlingLatestTxPartitionPoller
import com.wavesenterprise.we.tx.observer.core.spring.partition.LatestTxPartitionPoller
import com.wavesenterprise.we.tx.observer.core.spring.partition.DefaultTxPartitionPoller
import com.wavesenterprise.we.tx.observer.core.spring.partition.ErrorHandlingTxPartitionPoller
import com.wavesenterprise.we.tx.observer.core.spring.partition.PartitionHandler
import com.wavesenterprise.we.tx.observer.core.spring.partition.PollingTxSubscriber
import com.wavesenterprise.we.tx.observer.core.spring.partition.TxPartitionPoller
import com.wavesenterprise.we.tx.observer.jpa.config.TxObserverJpaConfig
import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository
import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository
Expand Down Expand Up @@ -34,30 +34,33 @@ class PartitionPollerConfig {

@Bean
fun scheduledPartitionPoller(
errorHandlingLatestTxPartitionPoller: LatestTxPartitionPoller,
errorHandlingTxPartitionPoller: TxPartitionPoller,
): ScheduledPartitionPoller = ScheduledPartitionPoller(
errorHandlingLatestTxPartitionPoller = errorHandlingLatestTxPartitionPoller,
errorHandlingTxPartitionPoller = errorHandlingTxPartitionPoller,
)

@Bean
fun errorHandlingLatestTxPartitionPoller(
defaultLatestTxPartitionPoller: LatestTxPartitionPoller,
fun errorHandlingTxPartitionPoller(
defaultTxPartitionPoller: TxPartitionPoller,
partitionHandler: PartitionHandler,
): LatestTxPartitionPoller = ErrorHandlingLatestTxPartitionPoller(
defaultLatestTxPartitionPoller = defaultLatestTxPartitionPoller,
): TxPartitionPoller = ErrorHandlingTxPartitionPoller(
defaultTxPartitionPoller = defaultTxPartitionPoller,
partitionHandler = partitionHandler,
)

@Bean
fun defaultLatestTxPartitionPoller(
fun defaultTxPartitionPoller(
txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository,
enqueuedTxJpaRepository: EnqueuedTxJpaRepository,
pollingTxSubscriber: PollingTxSubscriber,
partitionHandler: PartitionHandler,
partitionPollerProperties: PartitionPollerProperties,
): LatestTxPartitionPoller = DefaultLatestTxPartitionPoller(
): TxPartitionPoller = DefaultTxPartitionPoller(
txQueuePartitionJpaRepository = txQueuePartitionJpaRepository,
enqueuedTxJpaRepository = enqueuedTxJpaRepository,
pollingTxSubscriber = pollingTxSubscriber,
partitionHandler = partitionHandler,
partitionPollerProperties = partitionPollerProperties,
)

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.wavesenterprise.we.tx.observer.starter

import com.fasterxml.jackson.databind.ObjectMapper
import com.wavesenterprise.sdk.node.client.blocking.node.NodeBlockingServiceFactory
import com.wavesenterprise.we.tx.observer.api.block.subscriber.BlockSubscriber
import com.wavesenterprise.we.tx.observer.api.partition.TxQueuePartitionResolver
Expand Down Expand Up @@ -63,15 +62,12 @@ class TxQueueConfig {
@Qualifier("enqueueingBlockSubscriber")
enqueueingBlockSubscriber: BlockSubscriber,
txObserverProperties: TxObserverProperties,
objectMapper: ObjectMapper,
): TxQueueService = TxQueueStatusServiceImpl(
nodeBlockingServiceFactory = nodeBlockingServiceFactory,
syncInfoService = syncInfoService,
enqueuedTxJpaRepository = enqueuedTxJpaRepository,
txQueuePartitionResolveService = txQueuePartitionResolveService,
enqueueingBlockSubscriber = enqueueingBlockSubscriber,
errorPriorityOffset = txObserverProperties.errorPriorityOffset,
objectMapper = objectMapper,
)

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.wavesenterprise.we.tx.observer.core.spring.executor.poller.ScheduledB
import com.wavesenterprise.we.tx.observer.core.spring.executor.poller.SourceExecutor
import com.wavesenterprise.we.tx.observer.core.spring.executor.poller.SourceExecutorImpl
import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.SyncInfoService
import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository
import com.wavesenterprise.we.tx.observer.starter.executor.BLOCK_SOURCE_MODE
import com.wavesenterprise.we.tx.observer.starter.properties.TxObserverProperties
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
Expand Down Expand Up @@ -36,10 +37,13 @@ class PollerBlockSourceConfiguration(
fun scheduledSourceExecutor(
sourceExecutor: SourceExecutor,
txExecutor: TxExecutor,
enqueuedTxJpaRepository: EnqueuedTxJpaRepository,
) =
ScheduledBlockInfoSynchronizer(
sourceExecutor = sourceExecutor,
syncInfoService = syncInfoService,
enqueuedTxJpaRepository = enqueuedTxJpaRepository,
pauseSyncAtQueueSize = txObserverProperties.pauseSyncAtQueueSize,
blockHeightWindow = txObserverProperties.blockHeightWindow,
liquidBlockPollingDelay = txObserverProperties.liquidBlockPollingDelay,
txExecutor = txExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ data class PartitionPollerProperties(
override var fixedDelay: Duration,
@DefaultValue("4")
override var threadCount: Int,
@DefaultValue("200")
override var accelerateAtQueueSize: Long,
) : PartitionPollerConfig
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ data class TxObserverProperties(
@DefaultValue("true")
override var enabled: Boolean,
override var queueMode: String = "JPA",
@DurationUnit(ChronoUnit.MILLIS) @DefaultValue("2s")
@DurationUnit(ChronoUnit.MILLIS) @DefaultValue("200ms")
override var fixedDelay: Duration,
@DataSizeUnit(DataUnit.MEGABYTES)
override var blockSizeWindow: DataSize = DataSize.ofMegabytes(10),
override var activationHeight: Long = 1,
override var blockHeightWindow: Long = 99,
override var syncHistory: Boolean = true,
override var pauseSyncAtQueueSize: Long = 10000,
override var blockHistoryDepth: Int = 100,
override var forkNotResolvedHeightDrop: Long = 10,
@DurationUnit(ChronoUnit.MILLIS)
Expand Down
Loading

0 comments on commit dc6172f

Please sign in to comment.