Skip to content

Commit

Permalink
Merge pull request #46 from virtualeconomy/blockchainTime
Browse files Browse the repository at this point in the history
add utils/time API, rewrite ntp time sync module
  • Loading branch information
ncying authored Apr 4, 2019
2 parents b8b3857 + 776e2de commit 53a0028
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 54 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ libraryDependencies ++=
.exclude("org.slf4j", "slf4j-api"),
"commons-net" % "commons-net" % "3.+",
"org.typelevel" %% "cats-core" % "0.9.0",
"io.monix" %% "monix" % "2.3.0"
"io.monix" %% "monix" % "3.0.0-M2"
)

dependencyOverrides ++= Seq(
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class Application(val actorSystem: ActorSystem, val settings: VsysSettings) exte
SposConsensusApiRoute(settings.restAPISettings, stateReader, history, settings.blockchainSettings.functionalitySettings),
WalletApiRoute(settings.restAPISettings, wallet),
PaymentApiRoute(settings.restAPISettings, wallet, utxStorage, allChannels, time),
UtilsApiRoute(settings.restAPISettings),
UtilsApiRoute(time, settings.restAPISettings),
PeersApiRoute(settings.restAPISettings, network.connect, peerDatabase, establishedConnections),
AddressApiRoute(settings.restAPISettings, wallet, stateReader, settings.blockchainSettings.functionalitySettings),
DebugApiRoute(settings.restAPISettings, wallet, stateReader, history, peerDatabase, establishedConnections, blockchainUpdater, allChannels, utxStorage),
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/wavesplatform/network/Handshake.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ object Handshake {
val appVersion = (in.readInt(), in.readInt(), in.readInt())
val nodeNameSize = in.readByte()

val nodeName = in.readSlice(nodeNameSize).toString(Charsets.UTF_8)
if(nodeNameSize < 0 || nodeNameSize > Byte.MaxValue) {
throw new InvalidHandshakeException(s"An invalid node name's size: $nodeNameSize")
}
val nodeName = in.readSlice(nodeNameSize).toString(Charsets.UTF_8)

val nonce = in.readLong()
val declaredAddressLength = in.readInt()
Expand Down
16 changes: 14 additions & 2 deletions src/main/scala/scorex/api/http/UtilsApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import play.api.libs.json.Json
import scorex.crypto.EllipticCurveImpl
import scorex.crypto.encode.Base58
import scorex.crypto.hash.{FastCryptographicHash, SecureCryptographicHash}
import scorex.utils.Time

@Path("/utils")
@Api(value = "/utils", description = "Useful functions", position = 3, produces = "application/json")
case class UtilsApiRoute(settings: RestAPISettings) extends ApiRoute {
case class UtilsApiRoute(timeService: Time, settings: RestAPISettings) extends ApiRoute {
import UtilsApiRoute._

private def seed(length: Int) = {
Expand All @@ -23,7 +24,18 @@ case class UtilsApiRoute(settings: RestAPISettings) extends ApiRoute {
}

override val route = pathPrefix("utils") {
seedRoute ~ length ~ hashFast ~ hashSecure ~ sign
seedRoute ~ length ~ hashFast ~ hashSecure ~ sign ~ time
}

@Path("/time")
@ApiOperation(value = "Time", notes = "Current Node time (UTC)", httpMethod = "GET")
@ApiResponses(
Array(
new ApiResponse(code = 200, message = "Json with time or error")
))
def time: Route = (path("time") & get) {
val t = System.currentTimeMillis()*1000000L + System.nanoTime()%1000000L
complete(Json.obj("system" -> t, "NTP" -> timeService.correctedTime()))
}

@Path("/seed")
Expand Down
39 changes: 39 additions & 0 deletions src/main/scala/scorex/utils/ScorexLogging.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package scorex.utils

import com.google.common.util.concurrent.UncheckedExecutionException
import monix.eval.Task
import monix.execution.{CancelableFuture, Scheduler}
import monix.reactive.Observable
import org.slf4j.{Logger, LoggerFactory}

case class LoggerFacade(logger: Logger) {
Expand Down Expand Up @@ -56,4 +60,39 @@ case class LoggerFacade(logger: Logger) {

trait ScorexLogging {
protected def log = LoggerFacade(LoggerFactory.getLogger(this.getClass))

implicit class TaskExt[A](t: Task[A]) {
def runAsyncLogErr(implicit s: Scheduler): CancelableFuture[A] = logErr.runAsync

def logErr: Task[A] = {
t.onErrorHandleWith(ex => {
log.error(s"Error executing task", ex)
Task.raiseError[A](ex)
})
}

def logErrDiscardNoSuchElementException: Task[A] = {
t.onErrorHandleWith(ex => {
ex match {
case gex: UncheckedExecutionException =>
Option(gex.getCause) match {
case Some(nseex: NoSuchElementException) =>
case _ => log.error(s"Error executing task", ex)
}
case _ => log.error(s"Error executing task", ex)
}
Task.raiseError[A](ex)
})
}
}

implicit class ObservableExt[A](o: Observable[A]) {

def logErr: Observable[A] = {
o.onErrorHandleWith(ex => {
log.error(s"Error observing item", ex)
Observable.raiseError[A](ex)
})
}
}
}
97 changes: 53 additions & 44 deletions src/main/scala/scorex/utils/Time.scala
Original file line number Diff line number Diff line change
@@ -1,47 +1,37 @@
package scorex.utils

import java.net.InetAddress
import java.net.{InetAddress, SocketTimeoutException}

import org.apache.commons.net.ntp.NTPUDPClient
import monix.eval.Task
import monix.execution.Scheduler
import monix.execution.schedulers.SchedulerService

import scala.util.Try
import scala.concurrent.duration.DurationInt

trait Time {
def correctedTime(): Long
def getTimestamp() : Long
}

class TimeImpl extends Time with ScorexLogging {
//TimeTillUpdate: 10min
private val TimeTillUpdate = 1000000000L * 60 * 10L
class TimeImpl extends Time with ScorexLogging with AutoCloseable {
private val offsetPanicThreshold = 1000000L
private val ExpirationTimeout = 100.seconds
private val RetryDelay = 10.seconds
private val ResponseTimeout = 10.seconds
private val NtpServer = "pool.ntp.org"

private var lastUpdate = 0L
private var offset = 0L
private implicit val scheduler: SchedulerService = Scheduler.singleThread(name = "time-impl")

private var cntTime = 0L

def correctedTime(): Long = {
//CHECK IF OFFSET NEEDS TO BE UPDATED
if (System.currentTimeMillis()*1000000L+System.nanoTime()%1000000L > lastUpdate + TimeTillUpdate) {
Try {
//update the offset in nanoseconds
updateOffSet()
lastUpdate = System.currentTimeMillis()*1000000L+System.nanoTime()%1000000L

log.info("Adjusting time with " + offset + " nanoseconds.")
} recover {
case e: Throwable =>
log.warn("Unable to get corrected time", e)
}
}

//CALCULATE CORRECTED TIME
val cnt = System.currentTimeMillis()*1000000L+System.nanoTime()%1000000L + offset
cntTime = if (cnt<=cntTime && cntTime-cnt<=1000000L) cnt+1000000L else cnt
val cnt = System.currentTimeMillis() * 1000000L + System.nanoTime() % 1000000L + offset
cntTime = if (cnt <= cntTime && cntTime - cnt <= 1000000L) cnt + 1000000L else cnt
cntTime
}


private var txTime: Long = 0

def getTimestamp: Long = {
Expand All @@ -50,28 +40,47 @@ class TimeImpl extends Time with ScorexLogging {
txTime
}

private def updateOffSet() {
val client = new NTPUDPClient()
//setDefaultTimeout(int), Set the default timeout in milliseconds
// to use when opening a socket. After a call to open,
//the timeout for the socket is set using this value.
//so will not change here
client.setDefaultTimeout(10000)

try {
client.open()
//Retrieves the time information from the specified server on the default NTP
//port and returns it. The time is the number of miliiseconds since 00:00 (midnight) 1 January 1900 UTC,
val info = client.getTime(InetAddress.getByName(NtpServer))
info.computeDetails()
//return in milliseconds, change to nanoseconds
if (Option(info.getOffset).isDefined) offset = info.getOffset*1000000L
} catch {
case t: Throwable => log.warn("Problems with NTP: ", t)
} finally {
client.close()
private val client = new NTPUDPClient()
client.setDefaultTimeout(ResponseTimeout.toMillis.toInt)

@volatile private var offset = 0L
private val updateTask: Task[Unit] = {
def newOffsetTask: Task[Option[(InetAddress, java.lang.Long)]] = Task {
try {
client.open()
val info = client.getTime(InetAddress.getByName(NtpServer))
info.computeDetails()
Option(info.getOffset).map { offset =>
if (Math.abs(offset) > offsetPanicThreshold) throw new Exception("Offset is suspiciously large") else (info.getAddress, offset * 1000000L)
}
} catch {
case _: SocketTimeoutException =>
None
case t: Throwable =>
log.warn("Problems with NTP: ", t)
None
} finally {
client.close()
}
}

newOffsetTask.flatMap {
case None if !scheduler.isShutdown => updateTask.delayExecution(RetryDelay)
case Some((server, newOffset)) if !scheduler.isShutdown =>
log.trace(s"Adjusting time with $newOffset nanoseconds, source: ${server.getHostAddress}.")
offset = newOffset
updateTask.delayExecution(ExpirationTimeout)
case _ => Task.unit
}
}

private val taskHandle = updateTask.runAsyncLogErr

override def close(): Unit = {
log.info("Shutting down Time")
taskHandle.cancel()
scheduler.shutdown()
}
}

object NTP extends TimeImpl
9 changes: 8 additions & 1 deletion src/test/scala/com/wavesplatform/http/UtilsRouteSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@ import org.scalatest.prop.PropertyChecks
import play.api.libs.json.{JsObject, JsValue}
import scorex.api.http.{TooBigArrayAllocation, UtilsApiRoute}
import scorex.crypto.encode.Base58
import scorex.utils.Time
import scorex.crypto.hash.{FastCryptographicHash, SecureCryptographicHash}

class UtilsRouteSpec extends RouteSpec("/utils") with RestAPISettingsHelper with PropertyChecks {
private val route = UtilsApiRoute(restAPISettings).route
private val route = UtilsApiRoute(
new Time {
def correctedTime(): Long = System.currentTimeMillis() * 1000000L
def getTimestamp(): Long = System.currentTimeMillis() * 1000000L
},
restAPISettings
).route

routePath("/seed") in {
Get(routePath("/seed")) ~> route ~> check {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@ import java.nio.charset.StandardCharsets

import com.google.common.primitives.{Ints, Longs}
import com.wavesplatform.TransactionGen
import com.wavesplatform.network.client.NopPeerDatabase
import io.netty.buffer.Unpooled
import io.netty.channel.embedded.EmbeddedChannel
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
import org.scalacheck.{Arbitrary, Gen, Shrink}
import org.scalamock.scalatest.MockFactory
import org.scalatest.prop.{GeneratorDrivenPropertyChecks, PropertyChecks}
import org.scalatest.prop.PropertyChecks
import org.scalatest.{FreeSpec, Matchers}

class HandshakeDecoderSpec extends FreeSpec
with Matchers
with MockFactory
with PropertyChecks
with GeneratorDrivenPropertyChecks
with TransactionGen {

private implicit def noShrink[A]: Shrink[A] = Shrink(_ => Stream.empty)
Expand All @@ -26,7 +24,7 @@ class HandshakeDecoderSpec extends FreeSpec
var mayBeDecodedHandshake: Option[Handshake] = None

val channel = new EmbeddedChannel(
new HandshakeDecoder(NopPeerDatabase),
new HandshakeDecoder(PeerDatabase.NoOp),
new ChannelInboundHandlerAdapter {
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = msg match {
case x: Handshake => mayBeDecodedHandshake = Some(x)
Expand Down

0 comments on commit 53a0028

Please sign in to comment.