Skip to content

Commit

Permalink
Remove obsolete WatchFundingConfirmed when using RBF (#2961)
Browse files Browse the repository at this point in the history
When using RBF for a dual-funded channel or a splice, we set multiple
`WatchFundingConfirmed` for conflicting transactions. When one of those
transactions confirms, the others will never confirm: it is wasteful to
keep watching for their confirmation.

The watcher doesn't have enough information on its own to efficiently
detect that some watches are double-spent: we instead rely on the
consumer of the watch to tell the watcher to stop watching the RBF
attempts.

Fixes #2954
  • Loading branch information
t-bast authored Dec 9, 2024
1 parent 8381fc4 commit 189e282
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ object ZmqWatcher {
private case object Keep extends AddWatchResult
private case object Ignore extends AddWatchResult

/** Stop watching confirmations for a given transaction: must be used to stop watching obsolete RBF attempts. */
case class UnwatchTxConfirmed(txId: TxId) extends Command

sealed trait WatchHint
/**
* In some cases we don't need to check watches every time a block is found and only need to check again after we
Expand Down Expand Up @@ -364,6 +367,14 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) }
watching(watches -- deprecatedWatches, watchedUtxos1)

case UnwatchTxConfirmed(txId) =>
// We remove watches that match the given txId.
val deprecatedWatches = watches.keySet.filter {
case w: WatchConfirmed[_] => w.txId == txId
case _ => false
}
watching(watches -- deprecatedWatches, watchedUtxos)

case ValidateRequest(replyTo, ann) =>
client.validate(ann).map(replyTo ! _)
Behaviors.same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,13 @@ trait CommonFundingHandlers extends CommonHandlers {
// Children splice transactions may already spend that confirmed funding transaction.
val spliceSpendingTxs = commitments1.all.collect { case c if c.fundingTxIndex == commitment.fundingTxIndex + 1 => c.fundingTxId }
watchFundingSpent(commitment, additionalKnownSpendingTxs = spliceSpendingTxs.toSet, None)
// in the dual-funding case we can forget all other transactions, they have been double spent by the tx that just confirmed
rollbackDualFundingTxs(d.commitments.active // note how we use the unpruned original commitments
// In the dual-funding/splicing case we can forget all other transactions (RBF attempts), they have been
// double-spent by the tx that just confirmed.
val conflictingTxs = d.commitments.active // note how we use the unpruned original commitments
.filter(c => c.fundingTxIndex == commitment.fundingTxIndex && c.fundingTxId != commitment.fundingTxId)
.map(_.localFundingStatus).collect { case fundingTx: DualFundedUnconfirmedFundingTx => fundingTx.sharedTx })
.map(_.localFundingStatus).collect { case fundingTx: DualFundedUnconfirmedFundingTx => fundingTx.sharedTx }
conflictingTxs.foreach(tx => blockchain ! UnwatchTxConfirmed(tx.txId))
rollbackDualFundingTxs(conflictingTxs)
(commitments1, commitment)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,37 +189,53 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
import f._

val address = getNewAddress(probe)
val tx = sendToAddress(address, Btc(1), probe)

watcher ! WatchFundingConfirmed(probe.ref, tx.txid, 1)
watcher ! WatchFundingDeeplyBuried(probe.ref, tx.txid, 4)
watcher ! WatchFundingDeeplyBuried(probe.ref, tx.txid, 4) // setting the watch multiple times should be a no-op
val tx1 = sendToAddress(address, Btc(0.7), probe)
val tx2 = sendToAddress(address, Btc(0.5), probe)

watcher ! WatchFundingConfirmed(probe.ref, tx1.txid, 1)
watcher ! WatchFundingDeeplyBuried(probe.ref, tx1.txid, 4)
watcher ! WatchFundingDeeplyBuried(probe.ref, tx1.txid, 4) // setting the watch multiple times should be a no-op
watcher ! WatchFundingConfirmed(probe.ref, tx2.txid, 3)
watcher ! WatchFundingDeeplyBuried(probe.ref, tx2.txid, 6)
probe.expectNoMessage(100 millis)

watcher ! ListWatches(probe.ref)
assert(probe.expectMsgType[Set[Watch[_]]].size == 2)
assert(probe.expectMsgType[Set[Watch[_]]].size == 4)

generateBlocks(1)
assert(probe.expectMsgType[WatchFundingConfirmedTriggered].tx.txid == tx.txid)
assert(probe.expectMsgType[WatchFundingConfirmedTriggered].tx.txid == tx1.txid)
probe.expectNoMessage(100 millis)

watcher ! ListWatches(probe.ref)
assert(probe.expectMsgType[Set[Watch[_]]].size == 3)

generateBlocks(2)
assert(probe.expectMsgType[WatchFundingConfirmedTriggered].tx.txid == tx2.txid)
probe.expectNoMessage(100 millis)

watcher ! ListWatches(probe.ref)
assert(probe.expectMsgType[Set[Watch[_]]].size == 2)

watcher ! UnwatchTxConfirmed(tx2.txid)
watcher ! ListWatches(probe.ref)
assert(probe.expectMsgType[Set[Watch[_]]].size == 1)

generateBlocks(3)
assert(probe.expectMsgType[WatchFundingDeeplyBuriedTriggered].tx.txid == tx.txid)
generateBlocks(1)
assert(probe.expectMsgType[WatchFundingDeeplyBuriedTriggered].tx.txid == tx1.txid)
probe.expectNoMessage(100 millis)

watcher ! ListWatches(probe.ref)
assert(probe.expectMsgType[Set[Watch[_]]].isEmpty)

// If we try to watch a transaction that has already been confirmed, we should immediately receive a WatchEventConfirmed.
watcher ! WatchFundingConfirmed(probe.ref, tx.txid, 1)
assert(probe.expectMsgType[WatchFundingConfirmedTriggered].tx.txid == tx.txid)
watcher ! WatchFundingConfirmed(probe.ref, tx.txid, 2)
assert(probe.expectMsgType[WatchFundingConfirmedTriggered].tx.txid == tx.txid)
watcher ! WatchFundingDeeplyBuried(probe.ref, tx.txid, 4)
assert(probe.expectMsgType[WatchFundingDeeplyBuriedTriggered].tx.txid == tx.txid)
watcher ! WatchFundingConfirmed(probe.ref, tx1.txid, 1)
assert(probe.expectMsgType[WatchFundingConfirmedTriggered].tx.txid == tx1.txid)
watcher ! WatchFundingConfirmed(probe.ref, tx2.txid, 2)
assert(probe.expectMsgType[WatchFundingConfirmedTriggered].tx.txid == tx2.txid)
watcher ! WatchFundingDeeplyBuried(probe.ref, tx1.txid, 4)
assert(probe.expectMsgType[WatchFundingDeeplyBuriedTriggered].tx.txid == tx1.txid)
watcher ! WatchFundingDeeplyBuried(probe.ref, tx2.txid, 4)
assert(probe.expectMsgType[WatchFundingDeeplyBuriedTriggered].tx.txid == tx2.txid)
probe.expectNoMessage(100 millis)

watcher ! ListWatches(probe.ref)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package fr.acinq.eclair.channel.states.c
import akka.actor.typed.scaladsl.adapter.{ClassicActorRefOps, actorRefAdapter}
import akka.testkit.{TestFSMRef, TestProbe}
import com.softwaremill.quicklens.{ModifyPimp, QuicklensAt}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, SatoshiLong, Transaction, TxIn}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, SatoshiLong, Transaction}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.blockchain.{CurrentBlockHeight, SingleKeyOnChainWallet}
Expand All @@ -33,6 +33,7 @@ import fr.acinq.eclair.channel.publish.TxPublisher.{PublishFinalTx, SetChannelId
import fr.acinq.eclair.channel.states.ChannelStateTestsBase.FakeTxPublisherFactory
import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags}
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.transactions.Transactions.ClaimLocalAnchorOutputTx
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, MilliSatoshiLong, TestConstants, TestKitBaseClass, ToMilliSatoshiConversion}
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
Expand Down Expand Up @@ -751,7 +752,7 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
val bobCommitTx = bob.stateData.asInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED].commitments.latest.localCommit.commitTxAndRemoteSig.commitTx
alice ! WatchFundingSpentTriggered(bobCommitTx.tx)
aliceListener.expectMsgType[TransactionPublished]
val claimAnchor = alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx]
assert(alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
val claimMain = alice2blockchain.expectMsgType[TxPublisher.PublishFinalTx]
assert(claimMain.input.txid == bobCommitTx.tx.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == bobCommitTx.tx.txid)
Expand All @@ -777,6 +778,7 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
assert(aliceListener.expectMsgType[TransactionConfirmed].tx == fundingTx1)
assert(aliceListener.expectMsgType[ShortChannelIdAssigned].shortIds.real.isInstanceOf[RealScidStatus.Temporary])
assert(alice2blockchain.expectMsgType[WatchFundingSpent].txId == fundingTx1.txid)
alice2blockchain.expectMsg(UnwatchTxConfirmed(fundingTx2.txId))
alice2blockchain.expectNoMessage(100 millis)
alice2bob.expectNoMessage(100 millis)
awaitCond(alice.stateData.isInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_READY])
Expand All @@ -786,7 +788,7 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
// Bob broadcasts his commit tx.
alice ! WatchFundingSpentTriggered(bobCommitTx1)
assert(aliceListener.expectMsgType[TransactionPublished].tx.txid == bobCommitTx1.txid)
val claimAnchor = alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx]
assert(alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
val claimMain = alice2blockchain.expectMsgType[TxPublisher.PublishFinalTx]
assert(claimMain.input.txid == bobCommitTx1.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == bobCommitTx1.txid)
Expand All @@ -807,7 +809,7 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
assert(aliceListener.expectMsgType[TransactionConfirmed].tx == fundingTx)
assert(alice2blockchain.expectMsgType[WatchFundingSpent].txId == fundingTx.txid)
alice2 ! WatchFundingSpentTriggered(bobData.commitments.latest.localCommit.commitTxAndRemoteSig.commitTx.tx)
val claimAnchorAlice = alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx]
assert(alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
val claimMainAlice = alice2blockchain.expectMsgType[TxPublisher.PublishFinalTx]
assert(claimMainAlice.input.txid == bobData.commitments.latest.localCommit.commitTxAndRemoteSig.commitTx.tx.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == bobData.commitments.latest.localCommit.commitTxAndRemoteSig.commitTx.tx.txid)
Expand All @@ -818,7 +820,7 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
assert(bobListener.expectMsgType[TransactionConfirmed].tx == fundingTx)
assert(bob2blockchain.expectMsgType[WatchFundingSpent].txId == fundingTx.txid)
bob2 ! WatchFundingSpentTriggered(aliceData.commitments.latest.localCommit.commitTxAndRemoteSig.commitTx.tx)
val claimAnchorBob = bob2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx]
assert(bob2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
val claimMainBob = bob2blockchain.expectMsgType[TxPublisher.PublishFinalTx]
assert(claimMainBob.input.txid == aliceData.commitments.latest.localCommit.commitTxAndRemoteSig.commitTx.tx.txid)
assert(bob2blockchain.expectMsgType[WatchTxConfirmed].txId == aliceData.commitments.latest.localCommit.commitTxAndRemoteSig.commitTx.tx.txid)
Expand All @@ -844,8 +846,9 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
alice2 ! WatchFundingConfirmedTriggered(BlockHeight(42000), 42, fundingTx1)
assert(aliceListener.expectMsgType[TransactionConfirmed].tx == fundingTx1)
assert(alice2blockchain.expectMsgType[WatchFundingSpent].txId == fundingTx1.txid)
alice2blockchain.expectMsg(UnwatchTxConfirmed(fundingTx2.txId))
alice2 ! WatchFundingSpentTriggered(bobCommitTx1)
val claimAnchorAlice = alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx]
assert(alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
val claimMainAlice = alice2blockchain.expectMsgType[TxPublisher.PublishFinalTx]
assert(claimMainAlice.input.txid == bobCommitTx1.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == bobCommitTx1.txid)
Expand All @@ -856,8 +859,9 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
bob2 ! WatchFundingConfirmedTriggered(BlockHeight(42000), 42, fundingTx1)
assert(bobListener.expectMsgType[TransactionConfirmed].tx == fundingTx1)
assert(bob2blockchain.expectMsgType[WatchFundingSpent].txId == fundingTx1.txid)
bob2blockchain.expectMsg(UnwatchTxConfirmed(fundingTx2.txId))
bob2 ! WatchFundingSpentTriggered(aliceCommitTx1)
val claimAnchorBob = bob2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx]
assert(bob2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
val claimMainBob = bob2blockchain.expectMsgType[TxPublisher.PublishFinalTx]
assert(claimMainBob.input.txid == aliceCommitTx1.txid)
assert(bob2blockchain.expectMsgType[WatchTxConfirmed].txId == aliceCommitTx1.txid)
Expand Down Expand Up @@ -1016,7 +1020,7 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
awaitCond(alice.stateName == CLOSING)
aliceListener.expectMsgType[ChannelAborted]
assert(alice2blockchain.expectMsgType[TxPublisher.PublishFinalTx].tx.txid == aliceCommitTx.txid)
val claimAnchorLocal = alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx]
assert(alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
val claimMainLocal = alice2blockchain.expectMsgType[TxPublisher.PublishFinalTx]
assert(claimMainLocal.input.txid == aliceCommitTx.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == aliceCommitTx.txid)
Expand All @@ -1025,7 +1029,7 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
val bobCommitTx = bob.stateData.asInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED].commitments.latest.localCommit.commitTxAndRemoteSig.commitTx.tx
alice ! WatchFundingSpentTriggered(bobCommitTx)
alice2blockchain.expectMsgType[WatchOutputSpent]
val claimAnchorRemote = alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx]
assert(alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
val claimMainRemote = alice2blockchain.expectMsgType[TxPublisher.PublishFinalTx]
assert(claimMainRemote.input.txid == bobCommitTx.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == bobCommitTx.txid)
Expand All @@ -1050,7 +1054,7 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
awaitCond(alice.stateName == CLOSING)
aliceListener.expectMsgType[ChannelAborted]
assert(alice2blockchain.expectMsgType[TxPublisher.PublishFinalTx].tx.txid == aliceCommitTx2.tx.txid)
val claimAnchor2 = alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx]
assert(alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
val claimMain2 = alice2blockchain.expectMsgType[TxPublisher.PublishFinalTx]
assert(claimMain2.input.txid == aliceCommitTx2.tx.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == aliceCommitTx2.tx.txid)
Expand All @@ -1061,8 +1065,9 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
assert(aliceListener.expectMsgType[TransactionConfirmed].tx == fundingTx1)
alice2blockchain.expectMsgType[WatchOutputSpent]
assert(alice2blockchain.expectMsgType[WatchFundingSpent].txId == fundingTx1.txid)
alice2blockchain.expectMsg(UnwatchTxConfirmed(fundingTx2.txId))
assert(alice2blockchain.expectMsgType[TxPublisher.PublishFinalTx].tx.txid == aliceCommitTx1.tx.txid)
val claimAnchor1 = alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx]
assert(alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
val claimMain1 = alice2blockchain.expectMsgType[TxPublisher.PublishFinalTx]
assert(claimMain1.input.txid == aliceCommitTx1.tx.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == aliceCommitTx1.tx.txid)
Expand All @@ -1072,7 +1077,7 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
// Bob publishes his commit tx, Alice reacts by spending her remote main output.
alice ! WatchFundingSpentTriggered(bobCommitTx1.tx)
alice2blockchain.expectMsgType[WatchOutputSpent]
val claimAnchorRemote = alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx]
assert(alice2blockchain.expectMsgType[TxPublisher.PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
val claimMainRemote = alice2blockchain.expectMsgType[TxPublisher.PublishFinalTx]
assert(claimMainRemote.input.txid == bobCommitTx1.tx.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == bobCommitTx1.tx.txid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,13 +737,15 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik

val spliceTx = initiateSplice(f, spliceIn_opt = Some(SpliceIn(500_000 sat)), spliceOut_opt = Some(SpliceOut(300_000 sat, defaultSpliceOutScriptPubKey)))
val spliceCommitment = alice.stateData.asInstanceOf[DATA_NORMAL].commitments.active.find(_.fundingTxId == spliceTx.txid).get
assert(alice2blockchain.expectMsgType[WatchFundingConfirmed].txId == spliceTx.txid)

// Alice RBFs the splice transaction.
// Our dummy bitcoin wallet adds an additional input at every funding attempt.
val rbfTx1 = initiateRbf(f, FeeratePerKw(15_000 sat), sInputsCount = 2, sOutputsCount = 2)
assert(rbfTx1.txIn.size == spliceTx.txIn.size + 1)
spliceTx.txIn.foreach(txIn => assert(rbfTx1.txIn.map(_.outPoint).contains(txIn.outPoint)))
assert(rbfTx1.txOut.size == spliceTx.txOut.size)
assert(alice2blockchain.expectMsgType[WatchFundingConfirmed].txId == rbfTx1.txid)

// Bob RBFs the splice transaction: he needs to add an input to pay the fees.
// Our dummy bitcoin wallet adds an additional input for Alice: a real bitcoin wallet would simply lower the previous change output.
Expand All @@ -752,6 +754,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik
assert(rbfTx2.txIn.size > rbfTx1.txIn.size)
rbfTx1.txIn.foreach(txIn => assert(rbfTx2.txIn.map(_.outPoint).contains(txIn.outPoint)))
assert(rbfTx2.txOut.size == rbfTx1.txOut.size + 1)
assert(alice2blockchain.expectMsgType[WatchFundingConfirmed].txId == rbfTx2.txid)

// There are three pending splice transactions that double-spend each other.
inside(alice.stateData.asInstanceOf[DATA_NORMAL]) { data =>
Expand All @@ -767,6 +770,11 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik
confirmSpliceTx(f, rbfTx2)
inside(alice.stateData.asInstanceOf[DATA_NORMAL]) { data =>
assert(data.commitments.active.map(_.fundingTxId) == Seq(rbfTx2.txid))
assert(alice2blockchain.expectMsgType[WatchFundingSpent].txId == rbfTx2.txid)
alice2blockchain.expectMsgAllOf(
UnwatchTxConfirmed(spliceTx.txid),
UnwatchTxConfirmed(rbfTx1.txid),
)
data.commitments.active.foreach(c => assert(c.localCommit.spec.toLocal == spliceCommitment.localCommit.spec.toLocal))
data.commitments.active.foreach(c => assert(c.localCommit.spec.toRemote == spliceCommitment.localCommit.spec.toRemote))
}
Expand Down

0 comments on commit 189e282

Please sign in to comment.