Skip to content

Commit

Permalink
Abandon transactions whose ancestors have been double spent (#2818)
Browse files Browse the repository at this point in the history
When a transaction is directly double-spent, bitcoind is able to
automatically detect it and free up wallet inputs that are used
in double-spent transactions. However, when a transaction becomes
invalid because one of its ancestors is double-spent, bitcoind is
not able to detect it and will keep wallet inputs locked forever.

The only cases where this can happen are:

- splice transactions when a commitment transaction that is not
  based on the latest splice is confirmed
- anchor transactions in the above case, or when a different
  version of the commitment confirms (e.g. remote commit while
  we're trying to CPFP ou local commit)

When we detect that this happened, we abandon the corresponding
transactions, which ensures that we don't end up with unusable
liquidity in our wallet.

We also remove cases where we eagerly abandoned transactions,
which could lead us to double-spend ourselves inadvertently
because the abandoned transaction could still confirm while
we reused its inputs in another transaction (which is a real
issue when using 0-conf).
  • Loading branch information
t-bast authored Feb 7, 2024
1 parent 5b1c69c commit 3d8dc88
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,25 @@ trait OnChainChannelFunder {
/** Rollback a transaction that we failed to commit: this probably translates to "release locks on utxos". */
def rollback(tx: Transaction)(implicit ec: ExecutionContext): Future[Boolean]

/**
* Mark a transaction as abandoned, which will allow for its wallet inputs to be re-spent.
*
* If the transaction has been permanently double-spent by a direct conflict, there is no need to call this function,
* it will automatically be detected and the wallet inputs will be re-spent.
*
* This should only be used when the transaction has become invalid because one of its ancestors has been permanently
* double-spent. Since the wallet doesn't keep track of unconfirmed ancestors status, it cannot know that the
* transaction has become permanently invalid and will never be publishable again.
*
* This function must never be called on a transaction that isn't permanently invalidated, otherwise it would create
* a risk of accidentally double-spending ourselves:
* - the transaction is abandoned
* - its inputs are re-spent in another transaction
* - but the initial transaction confirms because it had already reached the mempool of other nodes, unexpectedly
* double-spending the second transaction
*/
def abandon(txId: TxId)(implicit ec: ExecutionContext): Future[Boolean]

/**
* Tests whether the inputs of the provided transaction have been spent by another transaction.
* Implementations may always return false if they don't want to implement it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,12 +483,7 @@ class BitcoinCoreClient(val rpcClient: BitcoinJsonRPCClient, val onChainKeyManag
getRawTransaction(tx.txid).map(_ => tx.txid).recoverWith { case _ => Future.failed(e) }
}

/**
* Mark a transaction as abandoned, which will allow for its wallet inputs to be re-spent.
* This method can be used to replace "stuck" or evicted transactions.
* It only works on transactions which are not included in a block and are not currently in the mempool.
*/
def abandonTransaction(txId: TxId)(implicit ec: ExecutionContext): Future[Boolean] = {
override def abandon(txId: TxId)(implicit ec: ExecutionContext): Future[Boolean] = {
rpcClient.invoke("abandontransaction", txId).map(_ => true).recover(_ => false)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,23 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
d.commitments.resolveCommitment(tx) match {
case Some(commitment) =>
log.warning("a commit tx for fundingTxIndex={} fundingTxId={} has been confirmed", commitment.fundingTxIndex, commitment.fundingTxId)
// Funding transactions with a greater index will never confirm: we abandon them to unlock their wallet inputs,
// which would otherwise stay locked forever in our bitcoind wallet.
d.commitments.all
.collect { case c: Commitment if commitment.fundingTxIndex <= c.fundingTxIndex => c.fundingTxId }
.foreach { txId =>
log.warning("abandoning splice txId={} (alternative commitment was confirmed)", txId)
wallet.abandon(txId)
}
// Any anchor transaction that we created based on the latest local or remote commit will never confirm either
// so we need to abandon them to unlock their wallet inputs.
nodeParams.db.audit.listPublished(d.channelId).collect {
case tx if tx.desc == "local-anchor" => tx
case tx if tx.desc == "remote-anchor" => tx
}.foreach { tx =>
log.warning("abandoning {} txId={} (alternative commitment was confirmed)", tx.desc, tx.txId)
wallet.abandon(tx.txId)
}
val commitments1 = d.commitments.copy(
active = commitment +: Nil,
inactive = Nil
Expand Down Expand Up @@ -1709,6 +1726,31 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
if (d1.localCommitPublished.exists(_.commitTx.txid == tx.txid)) {
context.system.eventStream.publish(LocalCommitConfirmed(self, remoteNodeId, d.channelId, blockHeight + d.commitments.params.remoteParams.toSelfDelay.toInt))
}
// if the local or remote commitment tx just got confirmed, we abandon anchor transactions that were created based
// on the other commitment: they will never confirm so we must free their wallet inputs.
if (d1.localCommitPublished.exists(_.commitTx.txid == tx.txid)) {
nodeParams.db.audit.listPublished(d.channelId).collect {
case tx if tx.desc == "remote-anchor" =>
log.warning("abandoning remote-anchor txId={} (local commit was confirmed)", tx.txId)
wallet.abandon(tx.txId)
}
}
if (d1.remoteCommitPublished.exists(_.commitTx.txid == tx.txid) || d1.nextRemoteCommitPublished.exists(_.commitTx.txid == tx.txid)) {
nodeParams.db.audit.listPublished(d.channelId).collect {
case tx if tx.desc == "local-anchor" =>
log.warning("abandoning local-anchor txId={} (remote commit was confirmed)", tx.txId)
wallet.abandon(tx.txId)
}
}
if (d1.futureRemoteCommitPublished.exists(_.commitTx.txid == tx.txid) || d1.revokedCommitPublished.exists(_.commitTx.txid == tx.txid)) {
nodeParams.db.audit.listPublished(d.channelId).collect {
case tx if tx.desc == "local-anchor" => tx
case tx if tx.desc == "remote-anchor" => tx
}.foreach { tx =>
log.warning("abandoning {} txId={} (future or revoked commitment was confirmed)", tx.desc, tx.txId)
wallet.abandon(tx.txId)
}
}
// we may need to fail some htlcs in case a commitment tx was published and they have reached the timeout threshold
val timedOutHtlcs = Closing.isClosingTypeAlreadyKnown(d1) match {
case Some(c: Closing.LocalClose) => Closing.trimmedOrTimedOutHtlcs(d.commitments.params.commitmentFormat, c.localCommit, c.localCommitPublished, d.commitments.params.localParams.dustLimit, tx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,17 +327,14 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams,

// Clean up the failed transaction attempt. Once that's done, go back to the waiting state with the new transaction.
def cleanUpFailedTxAndWait(failedTx: Transaction, mempoolTx: FundedTx): Behavior[Command] = {
context.pipeToSelf(bitcoinClient.abandonTransaction(failedTx.txid))(_ => UnlockUtxos)
val toUnlock = failedTx.txIn.map(_.outPoint).toSet -- mempoolTx.signedTx.txIn.map(_.outPoint).toSet
if (toUnlock.isEmpty) {
context.self ! UtxosUnlocked
} else {
log.debug("unlocking utxos={}", toUnlock.mkString(", "))
context.pipeToSelf(bitcoinClient.unlockOutpoints(toUnlock.toSeq))(_ => UtxosUnlocked)
}
Behaviors.receiveMessagePartial {
case UnlockUtxos =>
val toUnlock = failedTx.txIn.map(_.outPoint).toSet -- mempoolTx.signedTx.txIn.map(_.outPoint).toSet
if (toUnlock.isEmpty) {
context.self ! UtxosUnlocked
} else {
log.debug("unlocking utxos={}", toUnlock.mkString(", "))
context.pipeToSelf(bitcoinClient.unlockOutpoints(toUnlock.toSeq))(_ => UtxosUnlocked)
}
Behaviors.same
case UtxosUnlocked =>
// Now that we've cleaned up the failed transaction, we can go back to waiting for the current mempool transaction
// or bump it if it doesn't confirm fast enough either.
Expand Down Expand Up @@ -367,22 +364,21 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams,
}

def unlockAndStop(input: OutPoint, txs: Seq[Transaction]): Behavior[Command] = {
// The bitcoind wallet will keep transactions around even when they can't be published (e.g. one of their inputs has
// disappeared but bitcoind thinks it may reappear later), hoping that it will be able to automatically republish
// them later. In our case this is unnecessary, we will publish ourselves, and we don't want to pollute the wallet
// state with transactions that will never be valid, so we eagerly abandon every time.
// If the transaction is in the mempool or confirmed, it will be a no-op.
context.pipeToSelf(Future.traverse(txs)(tx => bitcoinClient.abandonTransaction(tx.txid)))(_ => UnlockUtxos)
// Note that we unlock utxos but we don't abandon failed transactions:
// - if they were successfully published:
// - the utxos have automatically been unlocked, so the call to unlock is a (safe) no-op
// - there is still a risk that transactions may confirm later, so it's unsafe to abandon them
// - if they failed to be published:
// - we must unlock the utxos, otherwise they would stay locked forever
// - abandoning the transaction would be a no-op, as it was never added to our wallet
val toUnlock = txs.flatMap(_.txIn).filterNot(_.outPoint == input).map(_.outPoint).toSet
if (toUnlock.isEmpty) {
context.self ! UtxosUnlocked
} else {
log.debug("unlocking utxos={}", toUnlock.mkString(", "))
context.pipeToSelf(bitcoinClient.unlockOutpoints(toUnlock.toSeq))(_ => UtxosUnlocked)
}
Behaviors.receiveMessagePartial {
case UnlockUtxos =>
val toUnlock = txs.flatMap(_.txIn).filterNot(_.outPoint == input).map(_.outPoint).toSet
if (toUnlock.isEmpty) {
context.self ! UtxosUnlocked
} else {
log.debug("unlocking utxos={}", toUnlock.mkString(", "))
context.pipeToSelf(bitcoinClient.unlockOutpoints(toUnlock.toSeq))(_ => UtxosUnlocked)
}
Behaviors.same
case UtxosUnlocked =>
log.debug("utxos unlocked")
Behaviors.stopped
Expand Down
10 changes: 7 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi}
import fr.acinq.eclair.{Paginated, TimestampMilli}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, TxId}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.AuditDb.{NetworkFee, PublishedTransaction, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.payment.{PathFindingExperimentMetrics, PaymentReceived, PaymentRelayed, PaymentSent}
import fr.acinq.eclair.{Paginated, TimestampMilli}

trait AuditDb {

Expand All @@ -44,6 +44,8 @@ trait AuditDb {

def addPathFindingExperimentMetrics(metrics: PathFindingExperimentMetrics): Unit

def listPublished(channelId: ByteVector32): Seq[PublishedTransaction]

def listSent(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentSent]

def listReceived(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentReceived]
Expand All @@ -58,6 +60,8 @@ trait AuditDb {

object AuditDb {

case class PublishedTransaction(txId: TxId, desc: String, miningFee: Satoshi)

case class NetworkFee(remoteNodeId: PublicKey, channelId: ByteVector32, txId: ByteVector32, fee: Satoshi, txType: String, timestamp: TimestampMilli)

case class Stats(channelId: ByteVector32, direction: String, avgPaymentAmount: Satoshi, paymentCount: Int, relayFee: Satoshi, networkFee: Satoshi)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Crypto, Satoshi, TxId}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.PublishedTransaction
import fr.acinq.eclair.db.Databases.{FileBackup, PostgresDatabases, SqliteDatabases}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.DualDatabases.runAsync
Expand Down Expand Up @@ -175,6 +176,11 @@ case class DualAuditDb(primary: AuditDb, secondary: AuditDb) extends AuditDb {
primary.addPathFindingExperimentMetrics(metrics)
}

override def listPublished(channelId: ByteVector32): Seq[PublishedTransaction] = {
runAsync(secondary.listPublished(channelId))
primary.listPublished(channelId)
}

override def listSent(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated]): Seq[PaymentSent] = {
runAsync(secondary.listSent(from, to, paginated_opt))
primary.listSent(from, to, paginated_opt)
Expand Down
27 changes: 23 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package fr.acinq.eclair.db.pg

import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong, TxId}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.AuditDb.{NetworkFee, PublishedTransaction, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
Expand All @@ -36,7 +36,7 @@ import javax.sql.DataSource

object PgAuditDb {
val DB_NAME = "audit"
val CURRENT_VERSION = 11
val CURRENT_VERSION = 12
}

class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
Expand Down Expand Up @@ -110,6 +110,10 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX metrics_recipient_idx ON audit.path_finding_metrics(recipient_node_id)")
}

def migration1112(statement: Statement): Unit = {
statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON audit.transactions_published(channel_id)")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA audit")
Expand Down Expand Up @@ -142,9 +146,10 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX metrics_name_idx ON audit.path_finding_metrics(experiment_name)")
statement.executeUpdate("CREATE INDEX metrics_recipient_idx ON audit.path_finding_metrics(recipient_node_id)")
statement.executeUpdate("CREATE INDEX metrics_hash_idx ON audit.path_finding_metrics(payment_hash)")
statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON audit.transactions_published(channel_id)")
statement.executeUpdate("CREATE INDEX transactions_published_timestamp_idx ON audit.transactions_published(timestamp)")
statement.executeUpdate("CREATE INDEX transactions_confirmed_timestamp_idx ON audit.transactions_confirmed(timestamp)")
case Some(v@(4 | 5 | 6 | 7 | 8 | 9 | 10)) =>
case Some(v@(4 | 5 | 6 | 7 | 8 | 9 | 10 | 11)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
if (v < 5) {
migration45(statement)
Expand All @@ -167,6 +172,9 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
if (v < 11) {
migration1011(statement)
}
if (v < 12) {
migration1112(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down Expand Up @@ -334,6 +342,17 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def listPublished(channelId: ByteVector32): Seq[PublishedTransaction] = withMetrics("audit/list-published", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM audit.transactions_published WHERE channel_id = ?")) { statement =>
statement.setString(1, channelId.toHex)
statement.executeQuery().map { rs =>
PublishedTransaction(TxId.fromValidHex(rs.getString("tx_id")), rs.getString("tx_type"), rs.getLong("mining_fee_sat").sat)
}.toSeq
}
}
}

override def listSent(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentSent] =
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM audit.sent WHERE timestamp BETWEEN ? AND ?")) { statement =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package fr.acinq.eclair.db.sqlite

import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong, TxId}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.AuditDb.{NetworkFee, PublishedTransaction, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
Expand All @@ -34,7 +34,7 @@ import java.util.UUID

object SqliteAuditDb {
val DB_NAME = "audit"
val CURRENT_VERSION = 8
val CURRENT_VERSION = 9
}

class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
Expand Down Expand Up @@ -110,6 +110,10 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate("DROP TABLE network_fees")
}

def migration89(statement: Statement): Unit = {
statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON transactions_published(channel_id)")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, recipient_amount_msat INTEGER NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, recipient_node_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
Expand Down Expand Up @@ -138,9 +142,10 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX metrics_timestamp_idx ON path_finding_metrics(timestamp)")
statement.executeUpdate("CREATE INDEX metrics_mpp_idx ON path_finding_metrics(is_mpp)")
statement.executeUpdate("CREATE INDEX metrics_name_idx ON path_finding_metrics(experiment_name)")
statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON transactions_published(channel_id)")
statement.executeUpdate("CREATE INDEX transactions_published_timestamp_idx ON transactions_published(timestamp)")
statement.executeUpdate("CREATE INDEX transactions_confirmed_timestamp_idx ON transactions_confirmed(timestamp)")
case Some(v@(1 | 2 | 3 | 4 | 5 | 6 | 7)) =>
case Some(v@(1 | 2 | 3 | 4 | 5 | 6 | 7 | 8)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
if (v < 2) {
migration12(statement)
Expand All @@ -163,6 +168,9 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
if (v < 8) {
migration78(statement)
}
if (v < 9) {
migration89(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down Expand Up @@ -310,6 +318,15 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
}
}

override def listPublished(channelId: ByteVector32): Seq[PublishedTransaction] = withMetrics("audit/list-published", DbBackends.Sqlite) {
using(sqlite.prepareStatement("SELECT * FROM transactions_published WHERE channel_id = ?")) { statement =>
statement.setBytes(1, channelId.toArray)
statement.executeQuery().map { rs =>
PublishedTransaction(TxId(rs.getByteVector32("tx_id")), rs.getString("tx_type"), rs.getLong("mining_fee_sat").sat)
}.toSeq
}
}

override def listSent(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentSent] =
using(sqlite.prepareStatement("SELECT * FROM sent WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from.toLong)
Expand Down
Loading

0 comments on commit 3d8dc88

Please sign in to comment.