diff --git a/app/src/main/scala/org/alephium/explorer/ExplorerState.scala b/app/src/main/scala/org/alephium/explorer/ExplorerState.scala index 51067a80..6326a3d6 100644 --- a/app/src/main/scala/org/alephium/explorer/ExplorerState.scala +++ b/app/src/main/scala/org/alephium/explorer/ExplorerState.scala @@ -25,6 +25,7 @@ import slick.jdbc.PostgresProfile import org.alephium.explorer.cache.{BlockCache, MetricCache, TransactionCache} import org.alephium.explorer.config.{BootMode, ExplorerConfig} +import org.alephium.explorer.config.ExplorerConfig.Consensus import org.alephium.explorer.persistence.Database import org.alephium.explorer.service._ import org.alephium.explorer.util.Scheduler @@ -46,6 +47,8 @@ sealed trait ExplorerState extends Service with StrictLogging { lazy val database: Database = new Database(config.bootMode)(executionContext, databaseConfig) + implicit lazy val consensus: Consensus = config.consensus + implicit lazy val blockCache: BlockCache = BlockCache( config.cacheRowCountReloadPeriod, diff --git a/app/src/main/scala/org/alephium/explorer/SyncServices.scala b/app/src/main/scala/org/alephium/explorer/SyncServices.scala index abefbd55..eb85a13e 100644 --- a/app/src/main/scala/org/alephium/explorer/SyncServices.scala +++ b/app/src/main/scala/org/alephium/explorer/SyncServices.scala @@ -32,6 +32,7 @@ import org.alephium.api.model.{ChainParams, PeerAddress} import org.alephium.explorer.RichAVector._ import org.alephium.explorer.cache.BlockCache import org.alephium.explorer.config.{BootMode, ExplorerConfig} +import org.alephium.explorer.config.ExplorerConfig.Consensus import org.alephium.explorer.error.ExplorerError._ import org.alephium.explorer.service._ import org.alephium.explorer.util.Scheduler @@ -46,6 +47,7 @@ object SyncServices extends StrictLogging { ec: ExecutionContext, dc: DatabaseConfig[PostgresProfile], blockFlowClient: BlockFlowClient, + consensus: Consensus, blockCache: BlockCache, groupSetting: GroupSetting ): Future[Unit] = @@ -88,6 +90,7 @@ object SyncServices extends StrictLogging { dc: DatabaseConfig[PostgresProfile], blockFlowClient: BlockFlowClient, blockCache: BlockCache, + consensus: Consensus, groupSetting: GroupSetting ): Future[Unit] = Future.fromTry { diff --git a/app/src/main/scala/org/alephium/explorer/error/ExplorerError.scala b/app/src/main/scala/org/alephium/explorer/error/ExplorerError.scala index 07a7108e..85ebfc4c 100644 --- a/app/src/main/scala/org/alephium/explorer/error/ExplorerError.scala +++ b/app/src/main/scala/org/alephium/explorer/error/ExplorerError.scala @@ -88,6 +88,12 @@ object ExplorerError { extends Exception(s"Cannot parse config file: $file", exception) with ConfigError + final case class WebSocketError(cause: Throwable) + extends Exception(s"WebSocket error. $cause") + with ExplorerError + + /** ****** Group: [[ConfigError]] ******* + */ final case class InvalidGroupNumber(groupNum: Int) extends Exception(s"Invalid groupNum: $groupNum. It should be > 0") with ConfigError diff --git a/app/src/main/scala/org/alephium/explorer/service/BlockFlowSyncService.scala b/app/src/main/scala/org/alephium/explorer/service/BlockFlowSyncService.scala index c9f707f3..1c76956b 100644 --- a/app/src/main/scala/org/alephium/explorer/service/BlockFlowSyncService.scala +++ b/app/src/main/scala/org/alephium/explorer/service/BlockFlowSyncService.scala @@ -19,7 +19,7 @@ package org.alephium.explorer.service import java.util.concurrent.atomic.AtomicBoolean import scala.collection.immutable.ArraySeq -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.duration.{Duration => ScalaDuration, FiniteDuration} import scala.util.{Failure, Success} @@ -31,6 +31,7 @@ import sttp.model.Uri import org.alephium.explorer.{foldFutures, GroupSetting} import org.alephium.explorer.api.model.Height import org.alephium.explorer.cache.BlockCache +import org.alephium.explorer.config.ExplorerConfig.Consensus import org.alephium.explorer.error.ExplorerError.BlocksInDifferentChains import org.alephium.explorer.persistence.DBRunner._ import org.alephium.explorer.persistence.dao.BlockDao @@ -53,6 +54,8 @@ import org.alephium.util.{Duration, TimeStamp} * 5. For each last block of each chains, mark it as part of the main chain and travel * down the parents recursively until we found back a parent that is part of the main chain. * 6. During step 5, if a parent is missing, we download it and continue the procces at 5. + * 7. Once the blocks are up-to-date with the node, we switch to websocket syncing + * 8. If the websocket close or is late, in case of network issue, we go back to step 1. * * TODO: Step 5 is costly, but it's an easy way to handle reorg. In step 3 we know we receive the current main chain * for that timerange, so in step 4 we could directly insert them as `mainChain = true`, but we need to sync @@ -66,6 +69,7 @@ case object BlockFlowSyncService extends StrictLogging { private val defaultStep = Duration.ofMinutesUnsafe(30L) private val defaultBackStep = Duration.ofSecondsUnsafe(10L) private val initialBackStep = Duration.ofMinutesUnsafe(30L) + private val upToDateDelta = Duration.ofSecondsUnsafe(30L) // scalastyle:on magic.number def start(nodeUris: ArraySeq[Uri], interval: FiniteDuration)(implicit @@ -73,6 +77,7 @@ case object BlockFlowSyncService extends StrictLogging { dc: DatabaseConfig[PostgresProfile], blockFlowClient: BlockFlowClient, cache: BlockCache, + consensus: Consensus, groupSetting: GroupSetting, scheduler: Scheduler ): Future[Unit] = @@ -87,14 +92,38 @@ case object BlockFlowSyncService extends StrictLogging { ec: ExecutionContext, dc: DatabaseConfig[PostgresProfile], blockFlowClient: BlockFlowClient, + consensus: Consensus, cache: BlockCache, groupSetting: GroupSetting ): Future[Unit] = { - if (initialBackStepDone.get()) { - syncOnceWith(nodeUris, defaultStep, defaultBackStep) - } else { - syncOnceWith(nodeUris, defaultStep, initialBackStep).map { _ => - initialBackStepDone.set(true) + val syncResult = + if (initialBackStepDone.get()) { + syncOnceWith(nodeUris, defaultStep, defaultBackStep) + } else { + syncOnceWith(nodeUris, defaultStep, initialBackStep).map { result => + initialBackStepDone.set(true) + result + } + } + + syncResult.flatMap { isUpToDate => + if (isUpToDate) { + logger.info("Blocks are up to date, switching to web socket syncing") + val stopPromise = Promise[Unit]() + // TODO Use config values + // scalastyle:off magic.number + WebSocketSyncService.sync( + stopPromise, + host = "127.0.0.1", + port = 22973, + flushInterval = Duration.ofMillisUnsafe(500) + ) + // scalastyle:on magic.number + stopPromise.future.map { _ => + logger.info("WebSocket syncing stopped, resuming http syncing") + } + } else { + Future.successful(()) } } } @@ -106,7 +135,7 @@ case object BlockFlowSyncService extends StrictLogging { blockFlowClient: BlockFlowClient, cache: BlockCache, groupSetting: GroupSetting - ): Future[Unit] = { + ): Future[Boolean] = { getTimeStampRange(step, backStep) .flatMap { ranges => Future.sequence { @@ -116,12 +145,16 @@ case object BlockFlowSyncService extends StrictLogging { s"Syncing from ${TimeUtil.toInstant(from)} to ${TimeUtil .toInstant(to)} (${from.millis} - ${to.millis})" ) - syncTimeRange(from, to, uri) + syncTimeRange(from, to, uri).map { _ => + (TimeStamp.now() -- to).map(_ < upToDateDelta).getOrElse(false) + } } } } } - .map(_ => ()) + .map { upToDates => + upToDates.flatten.contains(true) + } } // scalastyle:on magic.number @@ -360,7 +393,7 @@ case object BlockFlowSyncService extends StrictLogging { } } - private def insertBlocks(blocksWithEvents: ArraySeq[BlockEntityWithEvents])(implicit + def insertBlocks(blocksWithEvents: ArraySeq[BlockEntityWithEvents])(implicit ec: ExecutionContext, dc: DatabaseConfig[PostgresProfile], blockFlowClient: BlockFlowClient, diff --git a/app/src/main/scala/org/alephium/explorer/service/WebSocketSyncService.scala b/app/src/main/scala/org/alephium/explorer/service/WebSocketSyncService.scala new file mode 100644 index 00000000..5ece8b54 --- /dev/null +++ b/app/src/main/scala/org/alephium/explorer/service/WebSocketSyncService.scala @@ -0,0 +1,228 @@ +// Copyright 2018 The Alephium Authors +// This file is part of the alephium project. +// +// The library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the library. If not, see . + +package org.alephium.explorer.service + +import scala.collection.immutable.ArraySeq +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util._ + +import com.typesafe.scalalogging.StrictLogging +import io.vertx.core.AbstractVerticle +import io.vertx.core.Vertx +import io.vertx.core.eventbus.Message +import io.vertx.core.http.WebSocket +import slick.basic.DatabaseConfig +import slick.jdbc.PostgresProfile + +import org.alephium.api +import org.alephium.explorer.GroupSetting +import org.alephium.explorer.cache.BlockCache +import org.alephium.explorer.config.ExplorerConfig.Consensus +import org.alephium.explorer.persistence.model.BlockEntityWithEvents +import org.alephium.explorer.persistence.queries.InputUpdateQueries +import org.alephium.json.Json._ +import org.alephium.rpc.model.JsonRPC.Notification +import org.alephium.util.{discard, Duration, TimeStamp} +import org.alephium.ws._ +import org.alephium.ws.WsClient.KeepAlive +import org.alephium.ws.WsParams.WsNotificationParams._ +import org.alephium.ws.WsUtils._ + +@SuppressWarnings( + Array( + "org.wartremover.warts.NonUnitStatements", + "org.wartremover.warts.Var", + "org.wartremover.warts.DefaultArguments" + ) +) +case object WebSocketSyncService extends StrictLogging { + + val maybeApiKey: Option[api.model.ApiKey] = None + + private val batchAddress = "blocks.batch" + +// scalastyle:off parameter.number + def sync(stopPromise: Promise[Unit], host: String, port: Int, flushInterval: Duration)(implicit + ec: ExecutionContext, + dc: DatabaseConfig[PostgresProfile], + blockFlowClient: BlockFlowClient, + consensus: Consensus, + cache: BlockCache, + groupSetting: GroupSetting + ): Unit = { + + val vertx: Vertx = Vertx.vertx() + + val wsClient = WsClient(vertx) + + def closeHandler(client: ClientWs, deploymentId: String): WebSocket = { + client.underlying.closeHandler { _ => + logger.info("WebSocket connection closed") + vertx.undeploy(deploymentId) + stopPromise.trySuccess(()) + () + } + } + + // format: off + (for { + client <- wsClient.connect(port, host)(notif => handleNotification(notif, vertx))(handleKeepAlive) + blockBatching = new BlockBatchingVerticle(batchAddress, flushInterval, inserts, client) + deploymentId <- vertx.deployVerticle(blockBatching).asScala + _ = closeHandler(client, deploymentId) + _ <- client.subscribeToBlock(0) + } yield ()) + .onComplete { + case Success(_) => + logger.info("WebSocket syncing started") + case Failure(exception) => + logger.error("WebSocket syncing failed", exception) + stopPromise.trySuccess(()) + () + } + // format: on + } + + def handleNotification(notification: Notification, vertx: Vertx): Unit = { + notification.method match { + case "subscription" => + discard(vertx.eventBus().send(batchAddress, writeBinary(notification.params))) + case _ => + // TODO Should we support error notification? + () + } + } + + def handleKeepAlive(keepAlive: KeepAlive): Unit = { + // TODO: do we want to do something with this? + // I think ping are anyway automatcially handled by the vertx client + logger.debug(s"Keep alive: ${keepAlive}") + } + + def inserts(blockAndEvents: ArraySeq[BlockEntityWithEvents])(implicit + ec: ExecutionContext, + dc: DatabaseConfig[PostgresProfile], + blockFlowClient: BlockFlowClient, + cache: BlockCache, + groupSetting: GroupSetting + ): Future[Unit] = + for { + _ <- BlockFlowSyncService.insertBlocks(blockAndEvents) + _ <- dc.db.run(InputUpdateQueries.updateInputs()) + } yield (()) + + /* + * This verticle is used to batch the blocks received from the websocket + * and insert them into the database after every flushInterval. + * Verticles are equivalent to akka actors + */ + @SuppressWarnings(Array("org.wartremover.warts.MutableDataStructures")) + private class BlockBatchingVerticle( + address: String, + flushInterval: Duration, + insertBlocksToDb: ArraySeq[BlockEntityWithEvents] => Future[Unit], + client: ClientWs + )(implicit + ec: ExecutionContext, + consensus: Consensus, + groupSetting: GroupSetting + ) extends AbstractVerticle { + + /* + * Passing this delay, the websocket syincg will stop and go back to regular sync. + * This is to prevent the case where the websocket become too slow for some reason. + */ + + // TODO: this should be configurable + // scalastyle:off magic.number + private val delayAllowed = Duration.ofSecondsUnsafe(30L) + // scalastyle:on magic.number + + /* + * Vertx vertices are equivalent to akka actors + * So message are process in a single thread and + * with the security of `pending` we are safe + * to use mutable data structure + */ + private var buffer = ArrayBuffer.empty[BlockEntityWithEvents] + private var pending = false + + override def start(): Unit = { + discard(this.vertx.eventBus().consumer[Array[Byte]](address, handleBlock)) + discard(this.vertx.setPeriodic(flushInterval.millis, _ => flush())) + } + + private def handleBlock(msg: Message[Array[Byte]]): Unit = { + handleJsonMessage(readBinary[ujson.Value](msg.body())).foreach { blockAndEvents => + buffer += blockAndEvents + } + } + + private def handleJsonMessage(notification: ujson.Value): Option[BlockEntityWithEvents] = { + Try(read[WsParams.WsNotificationParams](notification)).toEither match { + case Left(exception) => + logger.error(s"Failed to parse notification: $notification", exception) + None + case Right(params) => + params match { + case WsParams.WsBlockNotificationParams(_, protocolBlockAndEvents) => + val blockAndEvents = + BlockFlowClient.blockAndEventsToEntities(protocolBlockAndEvents) + + validateBlockAndEvents(blockAndEvents) + + Some(blockAndEvents) + case other => + logger.error(s"Expected block notification, got: $other") + None + } + } + } + + private def flush(): Unit = { + if (buffer.nonEmpty && !pending) { + val batch = ArraySeq.from(buffer) + buffer = ArrayBuffer.empty + pending = true + + insertBlocksToDb(batch).onComplete { result => + pending = false + + result.failed.foreach { ex => + logger.error(s"DB insert failed: ${ex.getMessage}") + // Resume to normal sync + client.close() + } + } + } + } + + private def validateBlockAndEvents( + blockAndEvents: BlockEntityWithEvents + ): Unit = { + if ( + (TimeStamp.now() -- blockAndEvents.block.timestamp) + .map(_ > delayAllowed) + .getOrElse(true) + ) { + logger.error("WebSocket message is to old") + discard(client.close()) + } + } + } +} diff --git a/app/src/test/scala/org/alephium/explorer/ExplorerSpec.scala b/app/src/test/scala/org/alephium/explorer/ExplorerSpec.scala index 898d9904..daea6d49 100644 --- a/app/src/test/scala/org/alephium/explorer/ExplorerSpec.scala +++ b/app/src/test/scala/org/alephium/explorer/ExplorerSpec.scala @@ -575,7 +575,7 @@ object ExplorerSpec { val cliqueId = CliqueId.generate - private val peer = model.PeerAddress(address, port, 0, 0) + private val peer = model.PeerAddress(address, port, 0) def fetchHashesAtHeight( from: GroupIndex, diff --git a/app/src/test/scala/org/alephium/explorer/GenCoreApi.scala b/app/src/test/scala/org/alephium/explorer/GenCoreApi.scala index 66d73571..29707c1d 100644 --- a/app/src/test/scala/org/alephium/explorer/GenCoreApi.scala +++ b/app/src/test/scala/org/alephium/explorer/GenCoreApi.scala @@ -65,12 +65,10 @@ object GenCoreApi { for { address <- genInetAddress restPort <- genPortNum - wsPort <- genPortNum minerApiPort <- genPortNum } yield PeerAddress( address = address, restPort = restPort, - wsPort = wsPort, minerApiPort = minerApiPort ) diff --git a/app/src/test/scala/org/alephium/explorer/service/BlockFlowClientSpec.scala b/app/src/test/scala/org/alephium/explorer/service/BlockFlowClientSpec.scala index 2646420d..99044247 100644 --- a/app/src/test/scala/org/alephium/explorer/service/BlockFlowClientSpec.scala +++ b/app/src/test/scala/org/alephium/explorer/service/BlockFlowClientSpec.scala @@ -170,7 +170,7 @@ object BlockFlowClientSpec extends ScalaFutures with IntegrationPatience { val cliqueId = CliqueId.generate private val peer = - model.PeerAddress(localhost, SocketUtil.temporaryLocalPort(SocketUtil.Both), 0, 0) + model.PeerAddress(localhost, SocketUtil.temporaryLocalPort(SocketUtil.Both), 0) private val vertx = Vertx.vertx() private val router = Router.router(vertx) diff --git a/build.sbt b/build.sbt index 59073ac6..16fb0eb8 100644 --- a/build.sbt +++ b/build.sbt @@ -166,6 +166,7 @@ lazy val app = mainProject("app") alephiumUtil, alephiumProtocol, alephiumApi, + alephiumWs, alephiumCrypto, alephiumJson, rxJava, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 2de78edb..899d614e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ import sbt._ object Version { - lazy val common = "3.11.0" + lazy val common = "3.12.7+97-eccb34cf+20250430-1522-SNAPSHOT" lazy val akka = "2.6.20" lazy val rxJava = "3.1.8" @@ -39,6 +39,7 @@ object Dependencies { lazy val alephiumJson = "org.alephium" %% "alephium-json" % Version.common lazy val alephiumHttp = "org.alephium" %% "alephium-http" % Version.common lazy val alephiumConf = "org.alephium" %% "alephium-conf" % Version.common + lazy val alephiumWs = "org.alephium" %% "alephium-ws" % Version.common lazy val vertx = "io.vertx" % "vertx-core" % Version.vertx lazy val vertxRxJava = "io.vertx" % "vertx-rx-java3" % Version.vertx