Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot committed Dec 4, 2023
1 parent 6fc92bb commit 306dd67
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 55 deletions.
6 changes: 5 additions & 1 deletion node/src/main/scala/com/wavesplatform/mining/Miner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,12 @@ class MinerImpl(
}.uncancelable

for {
_ <- waitBlockAppendedTask
elapsed <- waitBlockAppendedTask.timed.map(_._1)
newOffset = (offset - elapsed).max(Duration.Zero)

_ <- Task(microBlockAttempt := SerialCancelable()).delayExecution(newOffset)
result <- Task(forgeBlock(account)).executeOn(minerScheduler)

_ <- result match {
case Right((block, totalConstraint)) =>
appendTask(block, totalConstraint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ trait MicroBlockMiner {
account: KeyPair,
accumulatedBlock: Block,
restTotalConstraint: MiningConstraint,
lastMicroBlock: Long
prevMicroBlockTs: Long
): Task[Unit]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ class MicroBlockMinerImpl(
account: KeyPair,
accumulatedBlock: Block,
restTotalConstraint: MiningConstraint,
lastMicroBlock: Long
prevMicroBlockTs: Long
): Task[Unit] =
generateOneMicroBlockTask(account, accumulatedBlock, restTotalConstraint, lastMicroBlock)
generateOneMicroBlockTask(account, accumulatedBlock, restTotalConstraint, prevMicroBlockTs)
.flatMap {
case res @ Success(newBlock, newConstraint) =>
Task.defer(generateMicroBlockSequence(account, newBlock, newConstraint, res.nanoTime))
case Retry =>
Task
.defer(generateMicroBlockSequence(account, accumulatedBlock, restTotalConstraint, lastMicroBlock))
.defer(generateMicroBlockSequence(account, accumulatedBlock, restTotalConstraint, prevMicroBlockTs))
.delayExecution(1 second)
case Stop =>
setDebugState(MinerDebugInfo.MiningBlocks)
Expand All @@ -65,7 +65,7 @@ class MicroBlockMinerImpl(
account: KeyPair,
accumulatedBlock: Block,
restTotalConstraint: MiningConstraint,
lastMicroBlock: Long
prevMicroBlockTs: Long
): Task[MicroBlockMiningResult] = {
val packTask = Task.cancelable[(Option[Seq[Transaction]], MiningConstraint, Option[ByteStr])] { cb =>
@volatile var cancelled = false
Expand Down Expand Up @@ -93,8 +93,8 @@ class MicroBlockMinerImpl(
)
)
)
log.trace(s"Finished pack for ${accumulatedBlock.id()}")
val updatedTotalConstraint = updatedMdConstraint.head
log.trace(s"Finished pack for ${accumulatedBlock.id()}, updated total constraint: $updatedTotalConstraint")
cb.onSuccess((unconfirmed, updatedTotalConstraint, stateHash))
}
Task.eval {
Expand All @@ -104,24 +104,25 @@ class MicroBlockMinerImpl(

packTask.flatMap {
case (Some(unconfirmed), updatedTotalConstraint, stateHash) if unconfirmed.nonEmpty =>
val delay = {
val delay = System.nanoTime() - lastMicroBlock
val requiredDelay = settings.microBlockInterval.toNanos
if (delay >= requiredDelay) Duration.Zero else (requiredDelay - delay).nanos
}

for {
_ <- Task.now(if (delay > Duration.Zero) log.trace(s"Sleeping ${delay.toMillis} ms before applying microBlock"))
_ <- Task.sleep(delay)
r <-
if (blockchainUpdater.lastBlockId.forall(_ == accumulatedBlock.id())) {
log.trace(s"Generating microBlock for ${account.toAddress}, constraints: $updatedTotalConstraint")
appendAndBroadcastMicroBlock(account, accumulatedBlock, unconfirmed, updatedTotalConstraint, stateHash)
} else {
log.trace(s"Stopping generating microBlock for ${account.toAddress}, new key block was appended")
Task(Stop)
}
} yield r
blocks <- forgeBlocks(account, accumulatedBlock, unconfirmed, stateHash)
.leftWiden[Throwable]
.liftTo[Task]
(signedBlock, microBlock) = blocks
delay = {
val delay = System.nanoTime() - prevMicroBlockTs
val requiredDelay = settings.microBlockInterval.toNanos
if (delay >= requiredDelay) Duration.Zero else (requiredDelay - delay).nanos
}
_ <-
if (delay > Duration.Zero) {
log.trace(s"Sleeping ${delay.toMillis} ms before applying microBlock")
Task.sleep(delay)
} else Task.unit
_ <- appendMicroBlock(microBlock, account)
} yield
if (updatedTotalConstraint.isFull) Stop
else Success(signedBlock, updatedTotalConstraint)

case (_, updatedTotalConstraint, _) =>
if (updatedTotalConstraint.isFull) {
Expand All @@ -139,39 +140,27 @@ class MicroBlockMinerImpl(
}
}

private def appendAndBroadcastMicroBlock(
account: KeyPair,
block: Block,
transactions: Seq[Transaction],
constraint: MiningConstraint,
stateHash: Option[BlockId]
): Task[MicroBlockMiningResult] =
for {
(signedBlock, microBlock) <- forgeBlocks(account, block, transactions, stateHash).leftWiden[Throwable].liftTo[Task]
blockId <- appendMicroBlock(microBlock)
_ = BlockStats.mined(microBlock, blockId)
_ <- broadcastMicroBlock(account, microBlock, blockId)
} yield
if (constraint.isFull) Stop
else Success(signedBlock, constraint)

private def broadcastMicroBlock(account: KeyPair, microBlock: MicroBlock, blockId: BlockId): Task[Unit] =
Task(if (allChannels != null) allChannels.broadcast(MicroBlockInv(account, blockId, microBlock.reference)))

private def appendMicroBlock(microBlock: MicroBlock): Task[BlockId] =
MicroblockAppender(blockchainUpdater, utx, appenderScheduler)(microBlock, None)
.flatMap {
case Left(err) => Task.raiseError(MicroBlockAppendError(microBlock, err))
case Right(v) => Task.now(v)
}
private def appendMicroBlock(microBlock: MicroBlock, account: KeyPair): Task[BlockId] =
MicroblockAppender(blockchainUpdater, utx, appenderScheduler)(microBlock, None).flatMap {
case Left(err) => Task.raiseError(MicroBlockAppendError(microBlock, err))
case Right(blockId) =>
Task.evalAsync {
BlockStats.mined(microBlock, blockId)
if (allChannels != null) {
allChannels.broadcast(MicroBlockInv(account, blockId, microBlock.reference))
}
blockId
}
}.uncancelable

private def forgeBlocks(
account: KeyPair,
accumulatedBlock: Block,
unconfirmed: Seq[Transaction],
packedTxs: Seq[Transaction],
stateHash: Option[ByteStr]
): Either[MicroBlockMiningError, (Block, MicroBlock)] =
microBlockBuildTimeStats.measureSuccessful {
log.trace(s"Forging microBlock for ${account.toAddress}")
for {
signedBlock <- Block
.buildAndSign(
Expand All @@ -180,7 +169,7 @@ class MicroBlockMinerImpl(
reference = accumulatedBlock.header.reference,
baseTarget = accumulatedBlock.header.baseTarget,
generationSignature = accumulatedBlock.header.generationSignature,
txs = accumulatedBlock.transactionData ++ unconfirmed,
txs = accumulatedBlock.transactionData ++ packedTxs,
signer = account,
featureVotes = accumulatedBlock.header.featureVotes,
rewardVote = accumulatedBlock.header.rewardVote,
Expand All @@ -189,7 +178,7 @@ class MicroBlockMinerImpl(
)
.leftMap(BlockBuildError)
microBlock <- MicroBlock
.buildAndSign(signedBlock.header.version, account, unconfirmed, accumulatedBlock.id(), signedBlock.signature, stateHash)
.buildAndSign(signedBlock.header.version, account, packedTxs, accumulatedBlock.id(), signedBlock.signature, stateHash)
.leftMap(MicroBlockBuildError)
} yield (signedBlock, microBlock)
}
Expand Down
6 changes: 3 additions & 3 deletions node/src/main/scala/com/wavesplatform/network/messages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ case class MicroBlockInv(sender: PublicKey, totalBlockId: ByteStr, reference: By
}

object MicroBlockInv {
def apply(sender: KeyPair, totalBlockRef: ByteStr, prevBlockRef: ByteStr): MicroBlockInv = {
val signature = crypto.sign(sender.privateKey, sender.toAddress.bytes ++ totalBlockRef.arr ++ prevBlockRef.arr)
new MicroBlockInv(sender.publicKey, totalBlockRef, prevBlockRef, signature)
def apply(sender: KeyPair, totalBlockId: ByteStr, prevBlockRef: ByteStr): MicroBlockInv = {
val signature = crypto.sign(sender.privateKey, sender.toAddress.bytes ++ totalBlockId.arr ++ prevBlockRef.arr)
new MicroBlockInv(sender.publicKey, totalBlockId, prevBlockRef, signature)
}
}

Expand Down

0 comments on commit 306dd67

Please sign in to comment.