diff --git a/docs/rbf.md b/docs/rbf.md new file mode 100644 index 0000000..ad1e4b2 --- /dev/null +++ b/docs/rbf.md @@ -0,0 +1,349 @@ +# RBF (Replace-By-Fee) Mechanism Documentation + +## Overview + +When the Relayer broadcasts a transaction to the BTC network, it may encounter UTXO conflict errors (`bad-txns-inputs-missingorspent`, error code -25). This indicates that the UTXOs used by the transaction have already been spent by another transaction. The RBF mechanism handles this situation with different strategies based on order type. + +## Trigger Conditions + +``` +BTC RPC SendRawTransaction returns error: +- Code: -25 (ErrRPCVerify) +- Message: "bad-txns-inputs-missingorspent" / "txn-mempool-conflict" / "missing-inputs" +``` + +## Order Type Differences + +### Safebox Orders - Complete Rollback Strategy + +**Characteristics:** +- Timelock address is calculated based on current block time +- Block time changes during re-aggregation cause timelock address changes +- Therefore, both vin and vout will change completely + +**Processing Flow:** +``` +1. Detect UTXO conflict +2. Call CleanInitializedNeedRbfWithdrawByOrderId +3. Check each UTXO status: + - If spent → mark as UTXO_STATUS_SPENT + - If not spent → restore to UTXO_STATUS_PROCESSED +4. Close current order (ORDER_STATUS_CLOSED) +5. Reset safebox_task status to received_ok +6. Next initWithdrawSig round will re-aggregate and generate a new transaction +``` + +**Database State Changes:** +``` +send_order: init/pending → closed +vin/vout: * → closed +safebox_task: init/init_ok → received_ok +utxo: updated to spent or processed based on chain status +``` + +### Withdrawal Orders - RBF Replacement Strategy + +**Characteristics:** +- Vout must remain consistent with the original ProcessWithdrawalV2 submission +- Consensus layer associates original withdrawal IDs through Pid +- Only vin (UTXO selection) and fee can change +- Must submit MsgReplaceWithdrawalV2 to goat consensus layer + +**Processing Flow:** +``` +1. Detect UTXO conflict +2. Call CleanInitializedNeedRbfWithdrawByOrderId +3. Check each UTXO status +4. Mark order as ORDER_STATUS_RBF_REQUEST (preserve Pid) +5. initRbfWithdrawSig detects RBF order +6. Select new UTXOs, calculate new fee +7. Create new transaction (same withdrawals, different vins) +8. Aggregate through BLS signature +9. Submit MsgReplaceWithdrawalV2 to consensus layer +``` + +**Database State Changes:** +``` +send_order: init/pending → rbf-request → (new order created) +vin/vout: * → closed +withdraw: pending → aggregating +utxo: updated based on chain status +``` + +## Goat Consensus Layer Validation Rules + +### ReplaceWithdrawalV2 Validation (`goat/x/bitcoin/keeper/tx.go:289`) + +```go +func (k msgServer) replaceWithdrawal(ctx context.Context, req types.ReplaceWithdrawalMsger) error { + // 1. Get Processing record by Pid + processing, err := k.Processing.Get(sdkctx, req.GetPid()) + + // 2. Fee must increase + if processing.Fee >= req.GetNewTxFee() { + return errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "new tx fee is less than before") + } + + // 3. Txid must be unique (cannot resubmit same transaction) + for _, item := range processing.Txid { + if bytes.Equal(item, txid) { + return errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "the tx doesn't have any change") + } + } + + // 4. Output count must match (with/without change allowed) + txoutLen, withdrawalLen := len(tx.TxOut), len(processing.Withdrawals) + if txoutLen != withdrawalLen && txoutLen != withdrawalLen+1 { + return errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "invalid tx output size for withdrawals") + } + + // 5. Iterate original withdrawals and validate each output + for idx, wid := range processing.Withdrawals { + withdrawal, err := k.Withdrawals.Get(sdkctx, wid) + + // Status must be PROCESSING + if withdrawal.Status != types.WITHDRAWAL_STATUS_PROCESSING { + return errorsmod.Wrapf(...) + } + + // txPrice must be <= MaxTxPrice + if txPrice > float64(withdrawal.MaxTxPrice) { + return errorsmod.Wrapf(...) + } + + // Output script must strictly match original withdrawal address + if !bytes.Equal(outputScript, txout.PkScript) { + return errorsmod.Wrapf(sdkerrors.ErrInvalidRequest, "witdhrawal %d script not matched", wid) + } + } + + // 6. Change output must be sent to current relayer pubkey address + if txoutLen != withdrawalLen { + change := tx.TxOut[withdrawalLen] + pubkey, _ := k.Pubkey.Get(ctx) + if !types.VerifySystemAddressScript(&pubkey, change.PkScript) { + return errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "give change to not a latest relayer pubkey") + } + } +} +``` + +### Processing Struct + +```go +type Processing struct { + Txid [][]byte // List of all RBF transaction txids (history) + Output []TxOuptut // Output amounts for each transaction + Withdrawals []uint64 // Associated withdrawal ID list (immutable) + Fee uint64 // Current fee +} +``` + +### Key Constraints + +| Constraint | Description | +|------------|-------------| +| `newFee > oldFee` | Fee must increase | +| `txPrice <= MinMaxTxPrice` | Cannot exceed any withdrawal's MaxTxPrice | +| `txid unique` | Each RBF must produce a different transaction | +| `vout order unchanged` | Output scripts must match original withdrawals order | +| `vout count constraint` | Must equal withdrawalLen or withdrawalLen+1 | + +## Fee Optimization Strategy + +### Constraints + +``` +1. newFee > oldFee (RBF requirement) +2. txPrice <= minMaxTxPrice (user limit) + where txPrice = fee / vbytes + vbytes = stripped_size + witness_size / 4 +``` + +### Calculation Logic + +```go +// 1. Find minimum MaxTxPrice in the batch +var minMaxTxPrice uint64 = ^uint64(0) +for _, withdraw := range withdraws { + if withdraw.TxPrice < minMaxTxPrice { + minMaxTxPrice = withdraw.TxPrice + } +} + +// 2. Calculate vbytes +vbytes := float64(tx.SerializeSizeStripped()) + float64(witnessSize)/4.0 + +// 3. Calculate maximum allowed fee +maxAllowedFee := uint64(float64(minMaxTxPrice) * vbytes) + +// 4. Calculate minimum required fee +minRequiredFee := oldTxFee + 1 + +// 5. Check if RBF is feasible +if minRequiredFee > maxAllowedFee { + // Cannot proceed with RBF, user's MaxTxPrice is too low + return +} + +// 6. Smart fee selection +networkBasedFee := uint64(float64(networkFeeRate) * vbytes) + +if networkBasedFee > oldTxFee && networkBasedFee <= maxAllowedFee { + // Optimal: use current network fee rate + actualFee = networkBasedFee +} else if networkBasedFee > maxAllowedFee { + // Network congestion: use user's maximum allowed + actualFee = maxAllowedFee +} else { + // Network idle: only increase by 1 satoshi + actualFee = minRequiredFee +} +``` + +### Fee Selection Priority + +| Scenario | Selected Fee | Description | +|----------|--------------|-------------| +| `networkFee > oldTxFee && networkFee <= maxAllowed` | `networkFee` | Optimal: use current network rate | +| `networkFee > maxAllowed` | `maxAllowedFee` | Network congested but limited: use maximum | +| `networkFee <= oldTxFee` | `oldTxFee + 1` | Network idle: minimum increment | +| `minRequired > maxAllowed` | Abort RBF | Cannot proceed: MaxTxPrice too low | + +## P2P Broadcast Mechanism + +When the Proposer detects a UTXO conflict and completes cleanup, it needs to notify other nodes to synchronize state. + +### Message Type + +```go +type MsgSendOrderRbf struct { + Txid string `json:"txid"` // Find order by txid + OrderId string `json:"order_id"` // For verification + OrderType string `json:"order_type"` // Order type + Reason string `json:"reason"` // RBF reason +} +``` + +### Flow + +``` +Proposer Node Other Nodes + | | + | 1. Detect UTXO conflict (-25) | + | 2. Check UTXO status via BTC RPC | + | 3. Execute CleanInitializedNeedRbfWithdrawByOrderId + | 4. Broadcast MsgSendOrderRbf | + |---------------------------------------->| + | | 5. Receive message + | | 6. Find local order by txid + | | 7. Verify orderId matches + | | 8. Execute cleanup (assume UTXO spent) +``` + +### Non-Proposer Node Handling + +```go +// Non-proposer nodes don't have BTC RPC access +// Trust proposer's cleanup result, assume all UTXOs are spent +_, cleanupErr := b.state.CleanInitializedNeedRbfWithdrawByOrderId( + sendOrder.OrderId, + func(utxoTxid string, outIndex int) (bool, error) { + return true, nil // Assume spent + }) +``` + +## State Machine + +### Safebox Order State Transitions + +``` + ┌─────────────┐ + │ create │ + └──────┬──────┘ + │ + ┌──────▼──────┐ + │ received_ok │◄────────────────┐ + └──────┬──────┘ │ + │ │ + ┌──────▼──────┐ │ + │ init │ │ + └──────┬──────┘ │ + │ │ + ┌──────▼──────┐ │ + │ pending │ │ + └──────┬──────┘ │ + │ │ + ┌────────────┼────────────-┐ │ + │ │ │ │ + ┌──────▼──────┐ ┌───▼────┐ ┌────-─▼────-─┐ │ + │ confirmed │ │ closed │ │UTXO conflict│───┘ + └─────────────┘ └────────┘ └──────────--─┘ + (reset task) +``` + +### Withdrawal Order State Transitions + +``` + ┌─────────────┐ + │ create │ + └──────┬──────┘ + │ + ┌──────▼──────┐ + │ aggregating │◄──────────────------──┐ + └──────┬──────┘ │ + │ │ + ┌──────▼──────┐ │ + │ init │ │ + └──────┬──────┘ │ + │ │ + ┌──────▼──────┐ │ + │ pending │ │ + └──────┬──────┘ │ + │ │ + ┌───────────────────┼───────────────────-┐ │ + │ │ │ │ +┌──────▼──────┐ ┌──────▼──────┐ ┌──────-▼──────┐ │ +│ confirmed │ │ closed │ │ rbf-request │ │ +└─────────────┘ └─────────────┘ └──────-┬──────┘ │ + │ │ + ┌─────-─▼──────┐ │ + │ New order │──┘ + │ aggregation │ + │(preserve Pid)│ + └─────────────-┘ +``` + +## Key Code Locations + +| Function | File Path | +|----------|-----------| +| UTXO conflict detection | `internal/wallet/withdraw_broadcast.go:315-358` | +| Cleanup logic | `internal/state/withdraw.go:815-943` | +| RBF signature initiation | `internal/wallet/withdraw.go:569-785` | +| Fee calculation | `internal/wallet/withdraw.go:717-766` | +| BLS signature aggregation | `internal/bls/handle_wallet.go:393-501` | +| Consensus layer submission | `internal/bls/handle_wallet.go:531-556` | +| P2P broadcast | `internal/wallet/withdraw_broadcast.go:349-365` | +| P2P receive handling | `internal/wallet/withdraw_broadcast.go:523-566` | + +## Important Notes + +### Strict Constraints for Withdrawal RBF + +1. **Vout order must be consistent**: RBF transaction outputs must strictly follow the order of withdrawal IDs in the original ProcessWithdrawalV2 +2. **Change output**: If change is less than dust, there may be no change output (vout count = withdrawalLen) +3. **Pid preservation**: RBF orders must preserve the original order's Pid; consensus layer associates original withdrawals through Pid +4. **Fee constraints**: New fee must be greater than old fee, and txPrice cannot exceed any withdrawal's MaxTxPrice + +### Complete Rollback for Safebox + +1. **Timestamp change**: Each aggregation uses current block time, causing timelock address changes +2. **Brand new transaction**: Equivalent to creating a transaction from scratch, no RBF constraints +3. **Simple recovery**: Just reset task status, next round will automatically re-aggregate + +### Concurrency Safety + +1. **Proposer uniqueness**: Only the current epoch's proposer can initiate RBF +2. **Signature state lock**: `sigMu` ensures only one signature process is running at a time +3. **Database transaction**: Cleanup operations are executed in a transaction to ensure atomicity diff --git a/internal/bls/handle.go b/internal/bls/handle.go index 1740f0b..d4c662e 100644 --- a/internal/bls/handle.go +++ b/internal/bls/handle.go @@ -40,6 +40,8 @@ func (s *Signer) handleSigStart(ctx context.Context, event interface{}) { log.Debugf("Event handleDepositReceive is of type MsgSignDeposit, request id %s", e.RequestId) if err := s.handleSigStartNewDeposit(ctx, e); err != nil { log.Errorf("Error handleSigStart MsgSignDeposit, %v", err) + // feedback SigFailed + s.state.EventBus.Publish(state.SigFailed, e) } case types.MsgSignSendOrder: log.Debugf("Event handleSigStartSendOrder is of type MsgSignSendOrder, request id %s", e.RequestId) @@ -66,6 +68,8 @@ func (s *Signer) handleSigStart(ctx context.Context, event interface{}) { log.Debugf("Event handleSigStartNewVoter is of type MsgSignNewVoter, request id %s", e.RequestId) if err := s.handleSigStartNewVoter(ctx, e); err != nil { log.Errorf("Error handleSigStart MsgSignNewVoter, %v", err) + // feedback SigFailed + s.state.EventBus.Publish(state.SigFailed, e) } default: log.Debug("Unknown event handleSigStart type") diff --git a/internal/bls/handle_wallet.go b/internal/bls/handle_wallet.go index 35de106..554037d 100644 --- a/internal/bls/handle_wallet.go +++ b/internal/bls/handle_wallet.go @@ -386,17 +386,17 @@ func (s *Signer) makeSigSendOrder(orderType string, withdrawIds []uint64, witnes } } -func (s *Signer) aggSigSendOrder(requestId string) (*bitcointypes.MsgProcessWithdrawalV2, *bitcointypes.MsgNewConsolidation, error) { +func (s *Signer) aggSigSendOrder(requestId string) (*bitcointypes.MsgProcessWithdrawalV2, *bitcointypes.MsgNewConsolidation, *bitcointypes.MsgReplaceWithdrawalV2, error) { epochVoter := s.state.GetEpochVoter() voteMap, ok := s.sigExists(requestId) if !ok { - return nil, nil, fmt.Errorf("no sig found of send order, request id: %s", requestId) + return nil, nil, nil, fmt.Errorf("no sig found of send order, request id: %s", requestId) } voterAll := strings.Split(epochVoter.VoteAddrList, ",") proposer := "" orderType := db.ORDER_TYPE_WITHDRAWAL - var txFee, epoch, sequence uint64 + var txFee, epoch, sequence, pid uint64 var noWitnessTx []byte var withdrawIds []uint64 var witnessSize uint64 @@ -410,7 +410,7 @@ func (s *Signer) aggSigSendOrder(requestId string) (*bitcointypes.MsgProcessWith err := json.Unmarshal(msgSendOrder.SendOrder, &order) if err != nil { log.Debug("Cannot unmarshal send order from vote msg") - return nil, nil, err + return nil, nil, nil, err } if msgSendOrder.IsProposer { proposer = address // proposer address @@ -423,6 +423,7 @@ func (s *Signer) aggSigSendOrder(requestId string) (*bitcointypes.MsgProcessWith txFee = order.TxFee noWitnessTx = order.NoWitnessTx orderType = order.OrderType + pid = order.Pid // Extract Pid to determine if this is RBF } else { pos := types.IndexOfSlice(voterAll, address) // voter address log.Debugf("Bitmap check, pos: %d, address: %s, all: %s", pos, address, epochVoter.VoteAddrList) @@ -434,14 +435,14 @@ func (s *Signer) aggSigSendOrder(requestId string) (*bitcointypes.MsgProcessWith } if proposer == "" { - return nil, nil, fmt.Errorf("missing proposer sig msg of send order, request id: %s", requestId) + return nil, nil, nil, fmt.Errorf("missing proposer sig msg of send order, request id: %s", requestId) } if epoch != epochVoter.Epoch { - return nil, nil, fmt.Errorf("incorrect epoch of send order, request id: %s, msg epoch: %d, current epoch: %d", requestId, epoch, epochVoter.Epoch) + return nil, nil, nil, fmt.Errorf("incorrect epoch of send order, request id: %s, msg epoch: %d, current epoch: %d", requestId, epoch, epochVoter.Epoch) } if sequence != epochVoter.Sequence { - return nil, nil, fmt.Errorf("incorrect sequence of send order, request id: %s, msg sequence: %d, current sequence: %d", requestId, sequence, epochVoter.Sequence) + return nil, nil, nil, fmt.Errorf("incorrect sequence of send order, request id: %s, msg sequence: %d, current sequence: %d", requestId, sequence, epochVoter.Sequence) } voteSig = append([][]byte{proposerSig}, voteSig...) @@ -449,13 +450,13 @@ func (s *Signer) aggSigSendOrder(requestId string) (*bitcointypes.MsgProcessWith // check threshold threshold := types.Threshold(len(voterAll)) if len(voteSig) < threshold { - return nil, nil, fmt.Errorf("threshold not reach of send order, request id: %s, has sig: %d, threshold: %d", requestId, len(voteSig), threshold) + return nil, nil, nil, fmt.Errorf("threshold not reach of send order, request id: %s, has sig: %d, threshold: %d", requestId, len(voteSig), threshold) } // aggregate aggSig, err := goatcryp.AggregateSignatures(voteSig) if err != nil { - return nil, nil, err + return nil, nil, nil, err } votes := &relayertypes.Votes{ @@ -466,6 +467,20 @@ func (s *Signer) aggSigSendOrder(requestId string) (*bitcointypes.MsgProcessWith } if orderType == db.ORDER_TYPE_WITHDRAWAL { + // Check if this is RBF (Pid > 0 means ReplaceWithdrawalV2) + if pid > 0 { + msgReplaceWithdrawal := bitcointypes.MsgReplaceWithdrawalV2{ + Proposer: proposer, + Vote: votes, + Pid: pid, + NewNoWitnessTx: noWitnessTx, + NewTxFee: txFee, + WitnessSize: witnessSize, + } + log.Infof("aggSigSendOrder: RBF order detected (Pid=%d), will use ReplaceWithdrawalV2", pid) + return nil, nil, &msgReplaceWithdrawal, nil + } + // Normal withdrawal (Pid == 0) msgWithdrawal := bitcointypes.MsgProcessWithdrawalV2{ Proposer: proposer, Vote: votes, @@ -474,14 +489,14 @@ func (s *Signer) aggSigSendOrder(requestId string) (*bitcointypes.MsgProcessWith TxFee: txFee, WitnessSize: witnessSize, } - return &msgWithdrawal, nil, nil + return &msgWithdrawal, nil, nil, nil } else { msgConsolidation := bitcointypes.MsgNewConsolidation{ Proposer: proposer, Vote: votes, NoWitnessTx: noWitnessTx, } - return nil, &msgConsolidation, nil + return nil, &msgConsolidation, nil, nil } } @@ -507,14 +522,24 @@ func (s *Signer) submitSendOrderToLayer2(ctx context.Context, e types.MsgSignSen s.sigMu.Unlock() // UNCHECK aggregate - msgWithdrawal, msgConsolidation, err := s.aggSigSendOrder(e.RequestId) + msgWithdrawal, msgConsolidation, msgReplaceWithdrawal, err := s.aggSigSendOrder(e.RequestId) if err != nil { log.Warnf("SigReceive send order proposer process aggregate sig, request id: %s, err: %v", e.RequestId, err) return nil } - // withdrawal && consolidation both submit to layer2, this - if msgWithdrawal != nil { + // Submit to layer2 based on message type + if msgReplaceWithdrawal != nil { + // RBF order (Pid > 0) - use ReplaceWithdrawalV2 + newProposal := layer2.NewProposal[*bitcointypes.MsgReplaceWithdrawalV2](s.layer2Listener) + err = newProposal.RetrySubmit(ctx, e.RequestId, msgReplaceWithdrawal, config.AppConfig.L2SubmitRetry) + if err != nil { + log.Errorf("SigReceive send RBF withdrawal proposer submit to RPC error, request id: %s, pid: %d, err: %v", e.RequestId, msgReplaceWithdrawal.Pid, err) + return err + } + log.Infof("SigReceive send RBF withdrawal submitted successfully, request id: %s, pid: %d", e.RequestId, msgReplaceWithdrawal.Pid) + } else if msgWithdrawal != nil { + // Normal withdrawal (Pid == 0) - use ProcessWithdrawalV2 newProposal := layer2.NewProposal[*bitcointypes.MsgProcessWithdrawalV2](s.layer2Listener) err = newProposal.RetrySubmit(ctx, e.RequestId, msgWithdrawal, config.AppConfig.L2SubmitRetry) if err != nil { @@ -605,3 +630,4 @@ func (s *Signer) submitSendOrderToContract(ctx context.Context, e types.MsgSignS return nil } + diff --git a/internal/btc/rpc_service.go b/internal/btc/rpc_service.go index 02f97b0..a76ddd8 100644 --- a/internal/btc/rpc_service.go +++ b/internal/btc/rpc_service.go @@ -159,6 +159,11 @@ func (s *BTCRPCService) GetBlockVerbose(blockHash *chainhash.Hash) (*btcjson.Get return s.client.GetBlockVerbose(blockHash) } +// GetTxOut returns the transaction output info if it's unspent, or nil if spent +func (s *BTCRPCService) GetTxOut(txHash *chainhash.Hash, index uint32, mempool bool) (*btcjson.GetTxOutResult, error) { + return s.client.GetTxOut(txHash, index, mempool) +} + // convertBlockToBlockData converts wire.MsgBlock to db.BtcBlockData func (s *BTCRPCService) convertBlockToBlockData(block *wire.MsgBlock, height uint64) (*db.BtcBlockData, error) { blockHash := block.BlockHash().String() diff --git a/internal/p2p/handle.go b/internal/p2p/handle.go index 5738dbf..7698dbf 100644 --- a/internal/p2p/handle.go +++ b/internal/p2p/handle.go @@ -36,6 +36,8 @@ func convertMsgData(msg Message[json.RawMessage]) any { return unmarshal[types.MsgSignNewVoter](msg.Data) case "MsgSafeboxTask": return unmarshal[types.MsgSignSafeboxTask](msg.Data) + case "MsgSendOrderRbf": + return unmarshal[types.MsgSendOrderRbf](msg.Data) } return unmarshal[any](msg.Data) } diff --git a/internal/p2p/network.go b/internal/p2p/network.go index 538d3ad..9a88b86 100644 --- a/internal/p2p/network.go +++ b/internal/p2p/network.go @@ -227,6 +227,8 @@ func (n *Network) handlePubSubMessages() { n.state.EventBus.Publish(state.NewVoter, convertMsgData(receivedMsg)) case MessageTypeSafeboxTask: n.state.EventBus.Publish(state.SafeboxTask, convertMsgData(receivedMsg)) + case MessageTypeSendOrderRbf: + n.state.EventBus.Publish(state.SendOrderRbf, convertMsgData(receivedMsg)) case MessageTypeHeartbeat: n.logger.Infof("💓 Received heartbeat from %s: %s", msg.GetFrom().String(), unmarshal[string](receivedMsg.Data)) default: diff --git a/internal/p2p/types.go b/internal/p2p/types.go index cabda3e..43b0677 100644 --- a/internal/p2p/types.go +++ b/internal/p2p/types.go @@ -25,4 +25,5 @@ const ( MessageTypeNewVoter MessageTypeSafeboxTask MessageTypeHeartbeat + MessageTypeSendOrderRbf ) diff --git a/internal/state/eventbus.go b/internal/state/eventbus.go index 6f5597a..717ab28 100644 --- a/internal/state/eventbus.go +++ b/internal/state/eventbus.go @@ -24,11 +24,12 @@ const ( SendOrderBroadcasted NewVoter SafeboxTask + SendOrderRbf ) func (e EventType) String() string { return [...]string{"EventUnkown", "SigStart", "SigReceive", "SigFinish", "SigFailed", "SigTimeout", "DepositReceive", - "BlockScanned", "WithdrawRequest", "WithdrawFinalize", "SendOrderBroadcasted", "NewVoter", "SafeboxTask"}[e] + "BlockScanned", "WithdrawRequest", "WithdrawFinalize", "SendOrderBroadcasted", "NewVoter", "SafeboxTask", "SendOrderRbf"}[e] } type EventBus struct { diff --git a/internal/state/state.go b/internal/state/state.go index 0cc2b26..eedf694 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -10,6 +10,15 @@ import ( "gorm.io/gorm" ) +// databaseProvider is the minimal DB interface State depends on. +// Production uses *db.DatabaseManager; tests can inject a stub implementation. +type databaseProvider interface { + GetL2InfoDB() *gorm.DB + GetBtcLightDB() *gorm.DB + GetBtcCacheDB() *gorm.DB + GetWalletDB() *gorm.DB +} + type StateLoader interface { GetL2Info() db.L2Info GetUtxo() db.Utxo @@ -20,7 +29,7 @@ type StateLoader interface { type State struct { EventBus *EventBus - dbm *db.DatabaseManager + dbm databaseProvider // Separate mutexes for different sub-modules layer2Mu sync.RWMutex @@ -43,7 +52,7 @@ var ( ) // InitializeState initializes the state by reading from the DB -func InitializeState(dbm *db.DatabaseManager) *State { +func InitializeState(dbm databaseProvider) *State { // Load layer2State, btcHeadState, walletState, depositState from db when start up var ( l2Info db.L2Info diff --git a/internal/state/state_test_helper_test.go b/internal/state/state_test_helper_test.go new file mode 100644 index 0000000..95a3a28 --- /dev/null +++ b/internal/state/state_test_helper_test.go @@ -0,0 +1,35 @@ +package state + +import ( + "gorm.io/gorm" +) + +// testDBProvider implements databaseProvider for tests using an in-memory DB. +type testDBProvider struct { + walletDb *gorm.DB +} + +func (m *testDBProvider) GetWalletDB() *gorm.DB { + return m.walletDb +} + +// These methods satisfy the databaseProvider interface; tests reuse the wallet DB. +func (m *testDBProvider) GetL2InfoDB() *gorm.DB { + return m.walletDb +} + +func (m *testDBProvider) GetBtcLightDB() *gorm.DB { + return m.walletDb +} + +func (m *testDBProvider) GetBtcCacheDB() *gorm.DB { + return m.walletDb +} + +// newStateForTest creates a State instance for testing with injected databases. +func newStateForTest(walletDb *gorm.DB) *State { + return &State{ + EventBus: NewEventBus(), + dbm: &testDBProvider{walletDb: walletDb}, + } +} diff --git a/internal/state/withdraw.go b/internal/state/withdraw.go index a68f99e..2158285 100644 --- a/internal/state/withdraw.go +++ b/internal/state/withdraw.go @@ -801,6 +801,147 @@ func (s *State) CleanProcessingWithdraw() error { return err } +// UtxoSpentChecker is a callback function to check if a UTXO is spent on BTC chain +// Returns true if the UTXO is spent, false if it's still unspent +type UtxoSpentChecker func(txid string, outIndex int) (spent bool, err error) + +// CleanInitializedNeedRbfWithdrawByOrderId cleans a specific order by orderId when UTXO conflict is detected +// It checks each UTXO's spent status via the callback: +// - If spent → mark as UTXO_STATUS_SPENT +// - If not spent → restore to UTXO_STATUS_PROCESSED +// For safebox orders: resets safebox_task to received_ok for re-aggregation +// For withdrawal orders: marks order as RBF_REQUEST for ReplaceWithdrawalV2 submission +// Returns the order type so caller can trigger appropriate follow-up action +func (s *State) CleanInitializedNeedRbfWithdrawByOrderId(orderId string, checkUtxoSpent UtxoSpentChecker) (orderType string, err error) { + s.walletMu.Lock() + defer s.walletMu.Unlock() + + err = s.dbm.GetWalletDB().Transaction(func(tx *gorm.DB) error { + order, err := s.getOrderByOrderId(tx, orderId) + if err != nil && err != gorm.ErrRecordNotFound { + return err + } + if order == nil { + log.Warnf("CleanInitializedNeedRbfWithdrawByOrderId order not found: %s", orderId) + return nil + } + if order.Status != db.ORDER_STATUS_INIT && order.Status != db.ORDER_STATUS_PENDING { + log.Warnf("CleanInitializedNeedRbfWithdrawByOrderId order %s status is %s, not init/pending, skip", orderId, order.Status) + return nil + } + + orderType = order.OrderType + log.Infof("CleanInitializedNeedRbfWithdrawByOrderId processing order: %s, type: %s", order.OrderId, orderType) + + // Check each UTXO's spent status and update accordingly + vins, err := s.getVinsByOrderId(tx, order.OrderId) + if err != nil { + return err + } + + for _, vin := range vins { + var utxoInDb db.Utxo + if err = tx.Where("txid = ? and out_index = ?", vin.Txid, vin.OutIndex).First(&utxoInDb).Error; err != nil { + log.Warnf("CleanInitializedNeedRbfWithdrawByOrderId UTXO %s:%d not found in db, skip", vin.Txid, vin.OutIndex) + continue + } + // Skip UTXOs already marked as spent + if utxoInDb.Status == db.UTXO_STATUS_SPENT { + continue + } + + // Check if UTXO is spent on BTC chain + spent, checkErr := checkUtxoSpent(vin.Txid, int(vin.OutIndex)) + if checkErr != nil { + log.Warnf("CleanInitializedNeedRbfWithdrawByOrderId failed to check UTXO %s:%d spent status: %v", vin.Txid, vin.OutIndex, checkErr) + continue + } + + if spent { + // UTXO is spent on chain, mark as spent + log.Infof("CleanInitializedNeedRbfWithdrawByOrderId UTXO %s:%d is spent, marking as spent", vin.Txid, vin.OutIndex) + err = tx.Model(&db.Utxo{}).Where("id = ?", utxoInDb.ID).Updates(&db.Utxo{ + Status: db.UTXO_STATUS_SPENT, + UpdatedAt: time.Now(), + }).Error + } else { + // UTXO is not spent, restore to processed + log.Infof("CleanInitializedNeedRbfWithdrawByOrderId UTXO %s:%d is not spent, restoring to processed", vin.Txid, vin.OutIndex) + err = tx.Model(&db.Utxo{}).Where("id = ?", utxoInDb.ID).Updates(&db.Utxo{ + Status: db.UTXO_STATUS_PROCESSED, + UpdatedAt: time.Now(), + }).Error + } + if err != nil { + log.Errorf("CleanInitializedNeedRbfWithdrawByOrderId update utxo txid %s - out %d error: %v", utxoInDb.Txid, utxoInDb.OutIndex, err) + return err + } + } + + // Handle differently based on order type + if orderType == db.ORDER_TYPE_SAFEBOX { + // For safebox: close order and reset task for re-aggregation + order.Status = db.ORDER_STATUS_CLOSED + order.UpdatedAt = time.Now() + err = s.saveOrder(tx, order) + if err != nil { + return err + } + + // Update vin/vout status to closed + err = s.updateOtherStatusByOrder(tx, order.OrderId, db.ORDER_STATUS_CLOSED, true) + if err != nil { + return err + } + + // Reset safebox task from init/init_ok to received_ok for re-aggregation + err = s.updateSafeboxTaskStatusByOrderId(tx, db.TASK_STATUS_RECEIVED_OK, order.OrderId, db.TASK_STATUS_INIT, db.TASK_STATUS_INIT_OK) + if err != nil { + return err + } + log.Infof("CleanInitializedNeedRbfWithdrawByOrderId safebox order %s: closed, task reset to received_ok", order.OrderId) + } else if orderType == db.ORDER_TYPE_WITHDRAWAL { + // For withdrawal: mark order as RBF_REQUEST for ReplaceWithdrawalV2 + // The order keeps its Pid so ReplaceWithdrawalV2 can reference it + order.Status = db.ORDER_STATUS_RBF_REQUEST + order.UpdatedAt = time.Now() + err = s.saveOrder(tx, order) + if err != nil { + return err + } + + // Update vin/vout status to closed (old vins are no longer valid) + err = s.updateOtherStatusByOrder(tx, order.OrderId, db.ORDER_STATUS_CLOSED, true) + if err != nil { + return err + } + + // Restore withdraws to aggregating status so they can be re-processed + err = s.updateWithdrawStatusByOrderId(tx, db.WITHDRAW_STATUS_AGGREGATING, order.OrderId, db.WITHDRAW_STATUS_PENDING) + if err != nil { + return err + } + log.Infof("CleanInitializedNeedRbfWithdrawByOrderId withdrawal order %s: marked as RBF_REQUEST, Pid: %d", order.OrderId, order.Pid) + } else { + // For consolidation or other types: just close + order.Status = db.ORDER_STATUS_CLOSED + order.UpdatedAt = time.Now() + err = s.saveOrder(tx, order) + if err != nil { + return err + } + err = s.updateOtherStatusByOrder(tx, order.OrderId, db.ORDER_STATUS_CLOSED, true) + if err != nil { + return err + } + log.Infof("CleanInitializedNeedRbfWithdrawByOrderId order %s type %s: closed", order.OrderId, orderType) + } + + return nil + }) + return orderType, err +} + func (s *State) GetWithdrawsCanStart() ([]*db.Withdraw, error) { s.walletMu.RLock() defer s.walletMu.RUnlock() @@ -892,6 +1033,96 @@ func (s *State) GetSendOrderByTxIdOrExternalId(id string) (*db.SendOrder, error) return sendOrder, nil } +// GetSendOrdersNeedRbf returns orders with RBF_REQUEST status that need to be re-submitted +// via ReplaceWithdrawalV2 +func (s *State) GetSendOrdersNeedRbf() ([]*db.SendOrder, error) { + s.walletMu.RLock() + defer s.walletMu.RUnlock() + + var orders []*db.SendOrder + err := s.dbm.GetWalletDB().Where("status = ? AND order_type = ?", + db.ORDER_STATUS_RBF_REQUEST, db.ORDER_TYPE_WITHDRAWAL). + Order("id asc").Find(&orders).Error + if err != nil && err != gorm.ErrRecordNotFound { + return nil, err + } + return orders, nil +} + +// GetSendOrderByOrderId returns a send order by its orderId +func (s *State) GetSendOrderByOrderId(orderId string) (*db.SendOrder, error) { + s.walletMu.RLock() + defer s.walletMu.RUnlock() + + return s.getOrderByOrderId(nil, orderId) +} + +// CreateRbfSendOrder creates a new RBF send order, closes the original order, and updates UTXO statuses +func (s *State) CreateRbfSendOrder(newOrder *db.SendOrder, originalOrderId string, selectedUtxos []*db.Utxo, vins []*db.Vin, vouts []*db.Vout) error { + s.walletMu.Lock() + defer s.walletMu.Unlock() + + err := s.dbm.GetWalletDB().Transaction(func(tx *gorm.DB) error { + // Close the original order + err := tx.Model(&db.SendOrder{}).Where("order_id = ?", originalOrderId).Updates(&db.SendOrder{ + Status: db.ORDER_STATUS_CLOSED, + UpdatedAt: time.Now(), + }).Error + if err != nil { + return fmt.Errorf("failed to close original order: %v", err) + } + + // Save new order + err = s.saveOrder(tx, newOrder) + if err != nil { + return err + } + + // Save vins + if err = tx.Create(&vins).Error; err != nil { + return err + } + + // Save vouts + if err = tx.Create(&vouts).Error; err != nil { + return err + } + + // Update UTXO statuses to pending + for _, utxo := range selectedUtxos { + var utxoInDb db.Utxo + if err = tx.Where("txid = ? and out_index = ?", utxo.Txid, utxo.OutIndex).Order("id desc").First(&utxoInDb).Error; err != nil { + return err + } + // Only update if UTXO is in confirmed status + if utxoInDb.Status == db.UTXO_STATUS_CONFIRMED { + err = tx.Model(&db.Utxo{}).Where("id = ?", utxoInDb.ID).Updates(&db.Utxo{ + Status: db.UTXO_STATUS_PENDING, + UpdatedAt: time.Now(), + }).Error + if err != nil { + return err + } + } + } + + // Update withdrawals to reference new order + err = tx.Model(&db.Withdraw{}).Where("order_id = ?", originalOrderId).Updates(&db.Withdraw{ + OrderId: newOrder.OrderId, + Txid: newOrder.Txid, + Status: db.WITHDRAW_STATUS_AGGREGATING, + UpdatedAt: time.Now(), + }).Error + if err != nil { + return fmt.Errorf("failed to update withdrawals for RBF: %v", err) + } + + return nil + }) + + return err +} + // GetLatestWithdrawSendOrderConfirmed get latest confirmed withdraw send order func (s *State) GetLatestWithdrawSendOrderConfirmed() (*db.SendOrder, error) { s.walletMu.RLock() diff --git a/internal/state/withdraw_rbf_test.go b/internal/state/withdraw_rbf_test.go new file mode 100644 index 0000000..477b856 --- /dev/null +++ b/internal/state/withdraw_rbf_test.go @@ -0,0 +1,605 @@ +package state + +import ( + "testing" + "time" + + "github.com/goatnetwork/goat-relayer/internal/db" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +// setupTestDB creates an in-memory SQLite database for testing +func setupTestDB(t *testing.T) *gorm.DB { + testDB, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + }) + require.NoError(t, err) + + // Auto migrate all models + err = testDB.AutoMigrate( + &db.SendOrder{}, + &db.Vin{}, + &db.Vout{}, + &db.Utxo{}, + &db.Withdraw{}, + &db.SafeboxTask{}, + ) + require.NoError(t, err) + + return testDB +} + +// TestRbfWithdrawalFlow tests the RBF flow for withdrawal orders +func TestRbfWithdrawalFlow(t *testing.T) { + testDB := setupTestDB(t) + + // Setup: Create a withdrawal order with UTXOs + orderId := "test-withdrawal-order-001" + pid := uint64(12345) + + // Create send order + sendOrder := &db.SendOrder{ + OrderId: orderId, + Proposer: "goat1proposer", + Pid: pid, + Amount: 1000000, // 0.01 BTC + TxPrice: 100, + Status: db.ORDER_STATUS_PENDING, + OrderType: db.ORDER_TYPE_WITHDRAWAL, + BtcBlock: 0, + Txid: "original-txid-abc123", + NoWitnessTx: []byte{0x01, 0x02, 0x03}, + TxFee: 5000, + UpdatedAt: time.Now(), + } + require.NoError(t, testDB.Create(sendOrder).Error) + + // Create UTXOs (simulating spent and unspent) + utxos := []*db.Utxo{ + { + Txid: "utxo-txid-001", + OutIndex: 0, + Amount: 500000, + Receiver: "bc1qtest1", + ReceiverType: "P2WPKH", + Status: db.UTXO_STATUS_PENDING, + UpdatedAt: time.Now(), + }, + { + Txid: "utxo-txid-002", + OutIndex: 1, + Amount: 600000, + Receiver: "bc1qtest2", + ReceiverType: "P2WPKH", + Status: db.UTXO_STATUS_PENDING, + UpdatedAt: time.Now(), + }, + } + for _, utxo := range utxos { + require.NoError(t, testDB.Create(utxo).Error) + } + + // Create Vins linking order to UTXOs + vins := []*db.Vin{ + { + OrderId: orderId, + BtcHeight: 100, + Txid: "utxo-txid-001", + OutIndex: 0, + Status: db.ORDER_STATUS_PENDING, + UpdatedAt: time.Now(), + }, + { + OrderId: orderId, + BtcHeight: 100, + Txid: "utxo-txid-002", + OutIndex: 1, + Status: db.ORDER_STATUS_PENDING, + UpdatedAt: time.Now(), + }, + } + for _, vin := range vins { + require.NoError(t, testDB.Create(vin).Error) + } + + // Create Vouts + vouts := []*db.Vout{ + { + OrderId: orderId, + BtcHeight: 0, + Txid: "original-txid-abc123", + OutIndex: 0, + WithdrawId: "1001", + Amount: 450000, + Receiver: "bc1qwithdraw1", + Status: db.ORDER_STATUS_PENDING, + UpdatedAt: time.Now(), + }, + { + OrderId: orderId, + BtcHeight: 0, + Txid: "original-txid-abc123", + OutIndex: 1, + WithdrawId: "1002", + Amount: 450000, + Receiver: "bc1qwithdraw2", + Status: db.ORDER_STATUS_PENDING, + UpdatedAt: time.Now(), + }, + } + for _, vout := range vouts { + require.NoError(t, testDB.Create(vout).Error) + } + + // Create Withdraws + withdraws := []*db.Withdraw{ + { + RequestId: 1001, + GoatBlock: 100, + Amount: 500000, + TxPrice: 100, + From: "0xuser1", + To: "bc1qwithdraw1", + Status: db.WITHDRAW_STATUS_PENDING, + OrderId: orderId, + Txid: "original-txid-abc123", + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + { + RequestId: 1002, + GoatBlock: 100, + Amount: 500000, + TxPrice: 100, + From: "0xuser2", + To: "bc1qwithdraw2", + Status: db.WITHDRAW_STATUS_PENDING, + OrderId: orderId, + Txid: "original-txid-abc123", + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + } + for _, withdraw := range withdraws { + require.NoError(t, testDB.Create(withdraw).Error) + } + + // Simulate UTXO spent check callback + // First UTXO is spent, second is not spent + spentUtxos := map[string]bool{ + "utxo-txid-001:0": true, // spent + "utxo-txid-002:1": false, // not spent + } + checkUtxoSpent := func(txid string, outIndex int) (bool, error) { + key := txid + ":" + string(rune('0'+outIndex)) + return spentUtxos[key], nil + } + + // Create mock State with test DB + mockState := newStateForTest(testDB) + + // Execute cleanup + orderType, err := mockState.CleanInitializedNeedRbfWithdrawByOrderId(orderId, checkUtxoSpent) + require.NoError(t, err) + assert.Equal(t, db.ORDER_TYPE_WITHDRAWAL, orderType) + + // Verify results + + // 1. Order should be marked as RBF_REQUEST + var updatedOrder db.SendOrder + require.NoError(t, testDB.Where("order_id = ?", orderId).First(&updatedOrder).Error) + assert.Equal(t, db.ORDER_STATUS_RBF_REQUEST, updatedOrder.Status) + assert.Equal(t, pid, updatedOrder.Pid) // Pid should be preserved + + // 2. First UTXO should be marked as spent + var utxo1 db.Utxo + require.NoError(t, testDB.Where("txid = ? AND out_index = ?", "utxo-txid-001", 0).First(&utxo1).Error) + assert.Equal(t, db.UTXO_STATUS_SPENT, utxo1.Status) + + // 3. Second UTXO should be restored to processed + var utxo2 db.Utxo + require.NoError(t, testDB.Where("txid = ? AND out_index = ?", "utxo-txid-002", 1).First(&utxo2).Error) + assert.Equal(t, db.UTXO_STATUS_PROCESSED, utxo2.Status) + + // 4. Vins and Vouts should be closed + var vinCount int64 + testDB.Model(&db.Vin{}).Where("order_id = ? AND status = ?", orderId, db.ORDER_STATUS_CLOSED).Count(&vinCount) + assert.Equal(t, int64(2), vinCount) + + var voutCount int64 + testDB.Model(&db.Vout{}).Where("order_id = ? AND status = ?", orderId, db.ORDER_STATUS_CLOSED).Count(&voutCount) + assert.Equal(t, int64(2), voutCount) + + // 5. Withdraws should be restored to aggregating + var withdrawCount int64 + testDB.Model(&db.Withdraw{}).Where("order_id = ? AND status = ?", orderId, db.WITHDRAW_STATUS_AGGREGATING).Count(&withdrawCount) + assert.Equal(t, int64(2), withdrawCount) +} + +// TestRbfSafeboxFlow tests the RBF flow for safebox orders +func TestRbfSafeboxFlow(t *testing.T) { + testDB := setupTestDB(t) + + // Setup: Create a safebox order with UTXOs + orderId := "test-safebox-order-001" + + // Create send order + sendOrder := &db.SendOrder{ + OrderId: orderId, + Proposer: "goat1proposer", + Pid: 0, // Safebox orders don't have Pid + Amount: 1000000, + TxPrice: 100, + Status: db.ORDER_STATUS_PENDING, + OrderType: db.ORDER_TYPE_SAFEBOX, + BtcBlock: 0, + Txid: "safebox-txid-abc123", + NoWitnessTx: []byte{0x01, 0x02, 0x03}, + TxFee: 5000, + UpdatedAt: time.Now(), + } + require.NoError(t, testDB.Create(sendOrder).Error) + + // Create UTXOs + utxos := []*db.Utxo{ + { + Txid: "utxo-txid-001", + OutIndex: 0, + Amount: 500000, + Receiver: "bc1qtest1", + ReceiverType: "P2WPKH", + Status: db.UTXO_STATUS_PENDING, + UpdatedAt: time.Now(), + }, + { + Txid: "utxo-txid-002", + OutIndex: 1, + Amount: 600000, + Receiver: "bc1qtest2", + ReceiverType: "P2WPKH", + Status: db.UTXO_STATUS_PENDING, + UpdatedAt: time.Now(), + }, + } + for _, utxo := range utxos { + require.NoError(t, testDB.Create(utxo).Error) + } + + // Create Vins + vins := []*db.Vin{ + { + OrderId: orderId, + BtcHeight: 100, + Txid: "utxo-txid-001", + OutIndex: 0, + Status: db.ORDER_STATUS_PENDING, + UpdatedAt: time.Now(), + }, + { + OrderId: orderId, + BtcHeight: 100, + Txid: "utxo-txid-002", + OutIndex: 1, + Status: db.ORDER_STATUS_PENDING, + UpdatedAt: time.Now(), + }, + } + for _, vin := range vins { + require.NoError(t, testDB.Create(vin).Error) + } + + // Create Vouts (safebox has timelock output) + vouts := []*db.Vout{ + { + OrderId: orderId, + BtcHeight: 0, + Txid: "safebox-txid-abc123", + OutIndex: 0, + Amount: 995000, + Receiver: "bc1qtimelock_address", + Status: db.ORDER_STATUS_PENDING, + UpdatedAt: time.Now(), + }, + } + for _, vout := range vouts { + require.NoError(t, testDB.Create(vout).Error) + } + + // Create SafeboxTask + safeboxTask := &db.SafeboxTask{ + TaskId: 1001, + PartnerId: "partner-001", + DepositAddress: "bc1qdeposit", + Amount: 1000000, + TimelockAddress: "bc1qtimelock_address", + BtcAddress: "bc1qbtc", + Status: db.TASK_STATUS_INIT, + OrderId: orderId, + UpdatedAt: time.Now(), + } + require.NoError(t, testDB.Create(safeboxTask).Error) + + // All UTXOs are spent + checkUtxoSpent := func(txid string, outIndex int) (bool, error) { + return true, nil + } + + // Create mock State with test DB + mockState := newStateForTest(testDB) + + // Execute cleanup + orderType, err := mockState.CleanInitializedNeedRbfWithdrawByOrderId(orderId, checkUtxoSpent) + require.NoError(t, err) + assert.Equal(t, db.ORDER_TYPE_SAFEBOX, orderType) + + // Verify results + + // 1. Order should be closed (not RBF_REQUEST) + var updatedOrder db.SendOrder + require.NoError(t, testDB.Where("order_id = ?", orderId).First(&updatedOrder).Error) + assert.Equal(t, db.ORDER_STATUS_CLOSED, updatedOrder.Status) + + // 2. UTXOs should be marked as spent + var utxo1 db.Utxo + require.NoError(t, testDB.Where("txid = ? AND out_index = ?", "utxo-txid-001", 0).First(&utxo1).Error) + assert.Equal(t, db.UTXO_STATUS_SPENT, utxo1.Status) + + var utxo2 db.Utxo + require.NoError(t, testDB.Where("txid = ? AND out_index = ?", "utxo-txid-002", 1).First(&utxo2).Error) + assert.Equal(t, db.UTXO_STATUS_SPENT, utxo2.Status) + + // 3. Vins and Vouts should be closed + var vinCount int64 + testDB.Model(&db.Vin{}).Where("order_id = ? AND status = ?", orderId, db.ORDER_STATUS_CLOSED).Count(&vinCount) + assert.Equal(t, int64(2), vinCount) + + // 4. SafeboxTask should be reset to received_ok + var updatedTask db.SafeboxTask + require.NoError(t, testDB.Where("order_id = ?", orderId).First(&updatedTask).Error) + assert.Equal(t, db.TASK_STATUS_RECEIVED_OK, updatedTask.Status) +} + +// TestRbfWithdrawalPreservesVoutOrder tests that withdrawal RBF preserves vout order +func TestRbfWithdrawalPreservesVoutOrder(t *testing.T) { + testDB := setupTestDB(t) + + orderId := "test-withdrawal-order-002" + pid := uint64(99999) + + // Create order with specific withdrawal order + sendOrder := &db.SendOrder{ + OrderId: orderId, + Proposer: "goat1proposer", + Pid: pid, + Amount: 3000000, // 0.03 BTC total + TxPrice: 50, + Status: db.ORDER_STATUS_PENDING, + OrderType: db.ORDER_TYPE_WITHDRAWAL, + Txid: "original-txid-xyz789", + NoWitnessTx: []byte{0x01, 0x02, 0x03}, + TxFee: 10000, + UpdatedAt: time.Now(), + } + require.NoError(t, testDB.Create(sendOrder).Error) + + // Create withdrawals with specific order (IDs: 2001, 2002, 2003) + // This order must be preserved in RBF + withdraws := []*db.Withdraw{ + {RequestId: 2001, Amount: 1000000, To: "bc1qaddr1", Status: db.WITHDRAW_STATUS_PENDING, OrderId: orderId, TxPrice: 50, CreatedAt: time.Now(), UpdatedAt: time.Now()}, + {RequestId: 2002, Amount: 1000000, To: "bc1qaddr2", Status: db.WITHDRAW_STATUS_PENDING, OrderId: orderId, TxPrice: 60, CreatedAt: time.Now(), UpdatedAt: time.Now()}, + {RequestId: 2003, Amount: 1000000, To: "bc1qaddr3", Status: db.WITHDRAW_STATUS_PENDING, OrderId: orderId, TxPrice: 70, CreatedAt: time.Now(), UpdatedAt: time.Now()}, + } + for _, w := range withdraws { + require.NoError(t, testDB.Create(w).Error) + } + + // Create UTXOs + utxo := &db.Utxo{ + Txid: "utxo-txid-large", + OutIndex: 0, + Amount: 3500000, + Receiver: "bc1qsystem", + ReceiverType: "P2WPKH", + Status: db.UTXO_STATUS_PENDING, + UpdatedAt: time.Now(), + } + require.NoError(t, testDB.Create(utxo).Error) + + // Create Vin + vin := &db.Vin{ + OrderId: orderId, + Txid: "utxo-txid-large", + OutIndex: 0, + Status: db.ORDER_STATUS_PENDING, + UpdatedAt: time.Now(), + } + require.NoError(t, testDB.Create(vin).Error) + + // Create Vouts (order matches withdrawal IDs) + vouts := []*db.Vout{ + {OrderId: orderId, Txid: "original-txid-xyz789", OutIndex: 0, WithdrawId: "2001", Amount: 990000, Receiver: "bc1qaddr1", Status: db.ORDER_STATUS_PENDING, UpdatedAt: time.Now()}, + {OrderId: orderId, Txid: "original-txid-xyz789", OutIndex: 1, WithdrawId: "2002", Amount: 990000, Receiver: "bc1qaddr2", Status: db.ORDER_STATUS_PENDING, UpdatedAt: time.Now()}, + {OrderId: orderId, Txid: "original-txid-xyz789", OutIndex: 2, WithdrawId: "2003", Amount: 990000, Receiver: "bc1qaddr3", Status: db.ORDER_STATUS_PENDING, UpdatedAt: time.Now()}, + {OrderId: orderId, Txid: "original-txid-xyz789", OutIndex: 3, WithdrawId: "", Amount: 500000, Receiver: "bc1qchange", Status: db.ORDER_STATUS_PENDING, UpdatedAt: time.Now()}, // change output + } + for _, vout := range vouts { + require.NoError(t, testDB.Create(vout).Error) + } + + // Create mock State with test DB + mockState := newStateForTest(testDB) + + // Execute cleanup + checkUtxoSpent := func(txid string, outIndex int) (bool, error) { + return true, nil // All spent + } + + orderType, err := mockState.CleanInitializedNeedRbfWithdrawByOrderId(orderId, checkUtxoSpent) + require.NoError(t, err) + assert.Equal(t, db.ORDER_TYPE_WITHDRAWAL, orderType) + + // Verify: Withdrawals should be in aggregating status (preserving original order) + var restoredWithdraws []db.Withdraw + require.NoError(t, testDB.Where("order_id = ?", orderId).Order("request_id").Find(&restoredWithdraws).Error) + assert.Len(t, restoredWithdraws, 3) + + // Verify order is preserved (sorted by request_id) + assert.Equal(t, uint64(2001), restoredWithdraws[0].RequestId) + assert.Equal(t, uint64(2002), restoredWithdraws[1].RequestId) + assert.Equal(t, uint64(2003), restoredWithdraws[2].RequestId) + + // All should be in aggregating status for re-processing + for _, w := range restoredWithdraws { + assert.Equal(t, db.WITHDRAW_STATUS_AGGREGATING, w.Status) + } + + // Verify Pid is preserved + var updatedOrder db.SendOrder + require.NoError(t, testDB.Where("order_id = ?", orderId).First(&updatedOrder).Error) + assert.Equal(t, pid, updatedOrder.Pid) +} + +// TestRbfFeeCalculation tests the fee optimization strategy +func TestRbfFeeCalculation(t *testing.T) { + tests := []struct { + name string + oldTxFee uint64 + minMaxTxPrice uint64 + vbytes float64 + networkFeeRate int64 + expectedFee uint64 + shouldProceed bool + }{ + { + name: "Network fee within bounds", + oldTxFee: 1000, + minMaxTxPrice: 50, // 50 sat/vB + vbytes: 200, // 200 vbytes + networkFeeRate: 20, // 20 sat/vB + expectedFee: 4000, // 20 * 200 = 4000 + shouldProceed: true, + }, + { + name: "Network fee exceeds max, use max", + oldTxFee: 1000, + minMaxTxPrice: 10, // 10 sat/vB + vbytes: 200, // 200 vbytes + networkFeeRate: 50, // 50 sat/vB (too high) + expectedFee: 2000, // maxAllowed = 10 * 200 = 2000 + shouldProceed: true, + }, + { + name: "Network fee too low, use minimum increment", + oldTxFee: 5000, + minMaxTxPrice: 50, + vbytes: 200, + networkFeeRate: 10, // 10 * 200 = 2000 < 5000 + expectedFee: 5001, // oldTxFee + 1 + shouldProceed: true, + }, + { + name: "Cannot proceed - max allowed less than min required", + oldTxFee: 10000, + minMaxTxPrice: 10, // 10 sat/vB + vbytes: 200, // maxAllowed = 2000 + networkFeeRate: 50, + expectedFee: 0, + shouldProceed: false, // 10001 > 2000 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + maxAllowedFee := uint64(float64(tt.minMaxTxPrice) * tt.vbytes) + minRequiredFee := tt.oldTxFee + 1 + networkBasedFee := uint64(float64(tt.networkFeeRate) * tt.vbytes) + + // Check if RBF is possible + canProceed := minRequiredFee <= maxAllowedFee + + if !canProceed { + assert.False(t, tt.shouldProceed, "Expected cannot proceed") + return + } + + assert.True(t, tt.shouldProceed, "Expected can proceed") + + // Calculate actual fee + var actualFee uint64 + if networkBasedFee > tt.oldTxFee && networkBasedFee <= maxAllowedFee { + actualFee = networkBasedFee + } else if networkBasedFee > maxAllowedFee { + actualFee = maxAllowedFee + } else { + actualFee = minRequiredFee + } + + assert.Equal(t, tt.expectedFee, actualFee) + }) + } +} + +// TestConsolidationOrderCleanup tests that consolidation orders are simply closed +func TestConsolidationOrderCleanup(t *testing.T) { + testDB := setupTestDB(t) + + orderId := "test-consolidation-order-001" + + sendOrder := &db.SendOrder{ + OrderId: orderId, + Proposer: "goat1proposer", + Pid: 0, + Amount: 5000000, + TxPrice: 20, + Status: db.ORDER_STATUS_PENDING, + OrderType: db.ORDER_TYPE_CONSOLIDATION, + Txid: "consolidation-txid-abc", + NoWitnessTx: []byte{0x01, 0x02}, + TxFee: 2000, + UpdatedAt: time.Now(), + } + require.NoError(t, testDB.Create(sendOrder).Error) + + // Create UTXOs and Vins + for i := 0; i < 5; i++ { + utxo := &db.Utxo{ + Txid: "consolidate-utxo-" + string(rune('0'+i)), + OutIndex: 0, + Amount: 1000000, + Receiver: "bc1qsystem", + ReceiverType: "P2WPKH", + Status: db.UTXO_STATUS_PENDING, + UpdatedAt: time.Now(), + } + require.NoError(t, testDB.Create(utxo).Error) + + vin := &db.Vin{ + OrderId: orderId, + Txid: "consolidate-utxo-" + string(rune('0'+i)), + OutIndex: 0, + Status: db.ORDER_STATUS_PENDING, + UpdatedAt: time.Now(), + } + require.NoError(t, testDB.Create(vin).Error) + } + + // Create mock State with test DB + mockState := newStateForTest(testDB) + + checkUtxoSpent := func(txid string, outIndex int) (bool, error) { + return true, nil + } + + orderType, err := mockState.CleanInitializedNeedRbfWithdrawByOrderId(orderId, checkUtxoSpent) + require.NoError(t, err) + assert.Equal(t, db.ORDER_TYPE_CONSOLIDATION, orderType) + + // Consolidation orders should simply be closed + var updatedOrder db.SendOrder + require.NoError(t, testDB.Where("order_id = ?", orderId).First(&updatedOrder).Error) + assert.Equal(t, db.ORDER_STATUS_CLOSED, updatedOrder.Status) +} diff --git a/internal/types/utxo.go b/internal/types/utxo.go index a1f2cd0..b18e0b0 100644 --- a/internal/types/utxo.go +++ b/internal/types/utxo.go @@ -46,6 +46,15 @@ type MsgSendOrderBroadcasted struct { WithdrawIds []uint64 `json:"withdraw_ids"` } +// MsgSendOrderRbf broadcasts RBF cleanup action for orders with UTXO conflicts +// Other nodes receive this and perform the same cleanup by finding order via Txid +type MsgSendOrderRbf struct { + Txid string `json:"txid"` // Transaction ID to find the order + OrderId string `json:"order_id"` // Order ID for verification + OrderType string `json:"order_type"` // Order type (withdrawal, consolidation, safebox) + Reason string `json:"reason"` // Reason for RBF (e.g., "utxo-conflict") +} + // MsgUtxoWithdraw defines withdraw UTXO broadcast to p2p which received in relayer rpc type MsgUtxoWithdraw struct { TxId string `json:"tx_id"` diff --git a/internal/wallet/rbf_fee_test.go b/internal/wallet/rbf_fee_test.go new file mode 100644 index 0000000..2e209de --- /dev/null +++ b/internal/wallet/rbf_fee_test.go @@ -0,0 +1,318 @@ +package wallet + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// TestRbfFeeCalculation tests the fee optimization strategy for RBF +// This tests the algorithm documented in docs/rbf.md +func TestRbfFeeCalculation(t *testing.T) { + tests := []struct { + name string + oldTxFee uint64 + minMaxTxPrice uint64 // minimum MaxTxPrice from all withdrawals + vbytes float64 + networkFeeRate int64 + expectedFee uint64 + shouldProceed bool + description string + }{ + { + name: "Network fee within bounds - optimal case", + oldTxFee: 1000, + minMaxTxPrice: 50, // 50 sat/vB + vbytes: 200, // 200 vbytes + networkFeeRate: 20, // 20 sat/vB + expectedFee: 4000, // 20 * 200 = 4000 + shouldProceed: true, + description: "Network fee (4000) > oldTxFee (1000) and <= maxAllowed (10000), use network fee", + }, + { + name: "Network fee exceeds max allowed", + oldTxFee: 1000, + minMaxTxPrice: 10, // 10 sat/vB (restrictive user setting) + vbytes: 200, // 200 vbytes + networkFeeRate: 50, // 50 sat/vB (network congestion) + expectedFee: 2000, // maxAllowed = 10 * 200 = 2000 + shouldProceed: true, + description: "Network fee (10000) > maxAllowed (2000), cap at maxAllowed", + }, + { + name: "Network fee too low - minimum increment", + oldTxFee: 5000, + minMaxTxPrice: 50, + vbytes: 200, + networkFeeRate: 10, // 10 * 200 = 2000 < 5000 + expectedFee: 5001, // oldTxFee + 1 + shouldProceed: true, + description: "Network fee (2000) <= oldTxFee (5000), use minimum increment", + }, + { + name: "Cannot proceed - MaxTxPrice too restrictive", + oldTxFee: 10000, + minMaxTxPrice: 10, // 10 sat/vB + vbytes: 200, // maxAllowed = 2000 + networkFeeRate: 50, + expectedFee: 0, + shouldProceed: false, + description: "minRequired (10001) > maxAllowed (2000), cannot proceed with RBF", + }, + { + name: "Exact boundary - minRequired equals maxAllowed", + oldTxFee: 1999, + minMaxTxPrice: 10, // 10 sat/vB + vbytes: 200, // maxAllowed = 2000 + networkFeeRate: 5, + expectedFee: 2000, // minRequired = 2000 = maxAllowed + shouldProceed: true, + description: "minRequired (2000) == maxAllowed (2000), proceed with minimum", + }, + { + name: "Large transaction with multiple withdrawals", + oldTxFee: 50000, + minMaxTxPrice: 100, // 100 sat/vB + vbytes: 1000, // 1000 vbytes (large tx) + networkFeeRate: 80, // 80 sat/vB + expectedFee: 80000,// 80 * 1000 = 80000 + shouldProceed: true, + description: "Large transaction with network fee within bounds", + }, + { + name: "Small transaction - dust considerations", + oldTxFee: 500, + minMaxTxPrice: 200, // 200 sat/vB + vbytes: 100, // 100 vbytes (small tx) + networkFeeRate: 50, // 50 sat/vB + expectedFee: 5000, // 50 * 100 = 5000 + shouldProceed: true, + description: "Small transaction with comfortable margins", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Calculate bounds (same logic as in initRbfWithdrawSig) + maxAllowedFee := uint64(float64(tt.minMaxTxPrice) * tt.vbytes) + minRequiredFee := tt.oldTxFee + 1 + networkBasedFee := uint64(float64(tt.networkFeeRate) * tt.vbytes) + + // Check if RBF is possible + canProceed := minRequiredFee <= maxAllowedFee + + if !canProceed { + assert.False(t, tt.shouldProceed, "Expected cannot proceed: %s", tt.description) + return + } + + assert.True(t, tt.shouldProceed, "Expected can proceed: %s", tt.description) + + // Calculate actual fee using the optimization strategy + var actualFee uint64 + if networkBasedFee > tt.oldTxFee && networkBasedFee <= maxAllowedFee { + // Optimal: use network fee rate + actualFee = networkBasedFee + } else if networkBasedFee > maxAllowedFee { + // Network congested but constrained by user's MaxTxPrice + actualFee = maxAllowedFee + } else { + // Network fee too low, use minimum increment + actualFee = minRequiredFee + } + + assert.Equal(t, tt.expectedFee, actualFee, "Fee calculation mismatch: %s", tt.description) + + // Verify constraints + assert.Greater(t, actualFee, tt.oldTxFee, "New fee must be greater than old fee") + assert.LessOrEqual(t, actualFee, maxAllowedFee, "New fee must not exceed max allowed") + }) + } +} + +// TestTxPriceCalculation tests the txPrice calculation formula +func TestTxPriceCalculation(t *testing.T) { + tests := []struct { + name string + fee uint64 + strippedSize int // no-witness tx size + witnessSize int + expectedVbytes float64 + expectedPrice float64 + }{ + { + name: "P2WPKH simple transaction", + fee: 5000, + strippedSize: 200, + witnessSize: 108, // typical P2WPKH witness + expectedVbytes: 227, // 200 + 108/4 = 227 + expectedPrice: 22.03, // 5000 / 227 ≈ 22.03 + }, + { + name: "P2WSH transaction", + fee: 10000, + strippedSize: 250, + witnessSize: 264, // 2 inputs with P2WSH + expectedVbytes: 316, // 250 + 264/4 = 316 + expectedPrice: 31.65, // 10000 / 316 ≈ 31.65 + }, + { + name: "Legacy P2PKH (no witness)", + fee: 8000, + strippedSize: 400, + witnessSize: 0, + expectedVbytes: 400, + expectedPrice: 20.0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // vbytes = stripped_size + witness_size / 4 + vbytes := float64(tt.strippedSize) + float64(tt.witnessSize)/4.0 + assert.InDelta(t, tt.expectedVbytes, vbytes, 0.1, "vbytes calculation") + + // txPrice = fee / vbytes + txPrice := float64(tt.fee) / vbytes + assert.InDelta(t, tt.expectedPrice, txPrice, 0.1, "txPrice calculation") + }) + } +} + +// TestWithdrawalVoutOrderConstraint tests that withdrawal vout order is preserved +// This documents the constraint from goat consensus layer +func TestWithdrawalVoutOrderConstraint(t *testing.T) { + // Simulating the constraint from replaceWithdrawal in goat/x/bitcoin/keeper/tx.go + + // Original ProcessWithdrawalV2 with withdrawal IDs [1001, 1002, 1003] + originalWithdrawals := []uint64{1001, 1002, 1003} + + // Original vout addresses (order must match withdrawal IDs) + originalVouts := []string{"bc1qaddr1", "bc1qaddr2", "bc1qaddr3"} + + // RBF transaction must have same vout addresses in same order + rbfVouts := []string{"bc1qaddr1", "bc1qaddr2", "bc1qaddr3"} + + // Verify order is preserved + for idx, wid := range originalWithdrawals { + assert.Equal(t, originalVouts[idx], rbfVouts[idx], + "Vout address for withdrawal %d must match original", wid) + } + + // Test invalid RBF (wrong order) + invalidRbfVouts := []string{"bc1qaddr2", "bc1qaddr1", "bc1qaddr3"} // swapped 1 and 2 + for idx, wid := range originalWithdrawals { + if originalVouts[idx] != invalidRbfVouts[idx] { + t.Logf("Invalid RBF detected: withdrawal %d has wrong address at index %d", wid, idx) + } + } + assert.NotEqual(t, originalVouts, invalidRbfVouts, "Invalid RBF should be detected") +} + +// TestSafeboxVsWithdrawalRbfStrategy documents the difference between safebox and withdrawal RBF +func TestSafeboxVsWithdrawalRbfStrategy(t *testing.T) { + t.Run("Safebox - Complete Rollback", func(t *testing.T) { + // Safebox RBF strategy: + // 1. Timelock address is based on current block time + // 2. When UTXO conflict detected, order is closed + // 3. Task is reset to received_ok + // 4. Next aggregation creates completely new transaction + // 5. New timelock address (due to new block time) + // 6. New vin selection + // 7. New vout (timelock output changes) + + strategy := struct { + vinChanges bool + voutChanges bool + pidRequired bool + newOrderId bool + }{ + vinChanges: true, // New UTXOs selected + voutChanges: true, // Timelock address changes + pidRequired: false, // No Pid needed + newOrderId: true, // Completely new order + } + + assert.True(t, strategy.vinChanges, "Safebox RBF: vins change") + assert.True(t, strategy.voutChanges, "Safebox RBF: vouts change") + assert.False(t, strategy.pidRequired, "Safebox RBF: no Pid required") + assert.True(t, strategy.newOrderId, "Safebox RBF: creates new order") + }) + + t.Run("Withdrawal - ReplaceWithdrawalV2", func(t *testing.T) { + // Withdrawal RBF strategy: + // 1. Vout addresses and order must be preserved (consensus constraint) + // 2. Pid is preserved for ReplaceWithdrawalV2 reference + // 3. Only vin (UTXO selection) can change + // 4. Fee must increase + // 5. txPrice must not exceed any withdrawal's MaxTxPrice + + strategy := struct { + vinChanges bool + voutChanges bool + pidRequired bool + feeIncreased bool + priceConstrained bool + }{ + vinChanges: true, // New UTXOs selected + voutChanges: false, // Vout addresses must be same + pidRequired: true, // Pid required for ReplaceWithdrawalV2 + feeIncreased: true, // newFee > oldFee + priceConstrained: true, // txPrice <= minMaxTxPrice + } + + assert.True(t, strategy.vinChanges, "Withdrawal RBF: vins change") + assert.False(t, strategy.voutChanges, "Withdrawal RBF: vouts preserved") + assert.True(t, strategy.pidRequired, "Withdrawal RBF: Pid required") + assert.True(t, strategy.feeIncreased, "Withdrawal RBF: fee must increase") + assert.True(t, strategy.priceConstrained, "Withdrawal RBF: price constrained by MaxTxPrice") + }) +} + +// TestRbfVoutCountConstraint tests the vout count validation from consensus +func TestRbfVoutCountConstraint(t *testing.T) { + tests := []struct { + name string + withdrawalLen int + txoutLen int + valid bool + reason string + }{ + { + name: "Exact match - no change output", + withdrawalLen: 3, + txoutLen: 3, + valid: true, + reason: "txoutLen == withdrawalLen (no change)", + }, + { + name: "With change output", + withdrawalLen: 3, + txoutLen: 4, + valid: true, + reason: "txoutLen == withdrawalLen + 1 (with change)", + }, + { + name: "Too few outputs", + withdrawalLen: 3, + txoutLen: 2, + valid: false, + reason: "txoutLen < withdrawalLen (missing withdrawal outputs)", + }, + { + name: "Too many outputs", + withdrawalLen: 3, + txoutLen: 5, + valid: false, + reason: "txoutLen > withdrawalLen + 1 (extra outputs not allowed)", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Validation from goat consensus layer + valid := tt.txoutLen == tt.withdrawalLen || tt.txoutLen == tt.withdrawalLen+1 + assert.Equal(t, tt.valid, valid, tt.reason) + }) + } +} diff --git a/internal/wallet/withdraw.go b/internal/wallet/withdraw.go index e9d86e0..36fdcec 100644 --- a/internal/wallet/withdraw.go +++ b/internal/wallet/withdraw.go @@ -54,6 +54,7 @@ func (w *WalletServer) withdrawLoop(ctx context.Context) { w.handleWithdrawSigFinish(sigFinish) case <-ticker.C: w.initWithdrawSig() + w.initRbfWithdrawSig() w.finalizeWithdrawSig() w.cancelWithdrawSig() } @@ -273,7 +274,7 @@ func (w *WalletServer) initWithdrawSig() { } log.Infof("WalletServer initWithdrawSig CreateRawTransaction for consolidation, tx: %s", tx.TxID()) - msgSignSendOrder, err = w.createSendOrder(tx, db.ORDER_TYPE_CONSOLIDATION, consolidationParams.UTXOs, nil, nil, consolidationParams.UtxoAmount, actualFee, uint64(consolidationParams.NetworkFee), uint64(consolidationParams.WitnessSize), epochVoter, network) + msgSignSendOrder, err = w.createSendOrder(tx, db.ORDER_TYPE_CONSOLIDATION, consolidationParams.UTXOs, nil, nil, consolidationParams.UtxoAmount, actualFee, uint64(consolidationParams.NetworkFee), uint64(consolidationParams.WitnessSize), epochVoter, network, 0) if err != nil { log.Errorf("WalletServer initWithdrawSig createSendOrder for consolidation error: %v", err) return @@ -319,7 +320,7 @@ func (w *WalletServer) initWithdrawSig() { } log.Infof("WalletServer initWithdrawSig CreateRawTransaction for safebox task, tx: %s", tx.TxID()) - msgSignSendOrder, err = w.createSendOrder(tx, db.ORDER_TYPE_SAFEBOX, safeboxParams.UTXOs, nil, safeboxParams.Tasks, safeboxParams.UtxoAmount, actualFee, uint64(safeboxParams.NetworkFee), uint64(safeboxParams.WitnessSize), epochVoter, network) + msgSignSendOrder, err = w.createSendOrder(tx, db.ORDER_TYPE_SAFEBOX, safeboxParams.UTXOs, nil, safeboxParams.Tasks, safeboxParams.UtxoAmount, actualFee, uint64(safeboxParams.NetworkFee), uint64(safeboxParams.WitnessSize), epochVoter, network, 0) if err != nil { log.Errorf("WalletServer initWithdrawSig createSendOrder for safebox task error: %v", err) return @@ -388,7 +389,7 @@ func (w *WalletServer) initWithdrawSig() { } log.Infof("WalletServer initWithdrawSig CreateRawTransaction for withdraw, tx: %s, network fee rate: %d", tx.TxID(), actualPrice) - msgSignSendOrder, err = w.createSendOrder(tx, db.ORDER_TYPE_WITHDRAWAL, withdrawParams.UTXOs, withdrawParams.Withdrawals, nil, withdrawParams.UtxoAmount, actualFee, uint64(withdrawParams.NetworkFee), uint64(withdrawParams.WitnessSize), epochVoter, network) + msgSignSendOrder, err = w.createSendOrder(tx, db.ORDER_TYPE_WITHDRAWAL, withdrawParams.UTXOs, withdrawParams.Withdrawals, nil, withdrawParams.UtxoAmount, actualFee, uint64(withdrawParams.NetworkFee), uint64(withdrawParams.WitnessSize), epochVoter, network, 0) if err != nil { log.Errorf("WalletServer initWithdrawSig createSendOrder for withdraw error: %v", err) return @@ -406,7 +407,8 @@ func (w *WalletServer) initWithdrawSig() { } // createSendOrder, create send order for selected utxos and withdraws (if orderType is consolidation, selectedWithdraws is nil) -func (w *WalletServer) createSendOrder(tx *wire.MsgTx, orderType string, selectedUtxos []*db.Utxo, selectedWithdraws []*db.Withdraw, safeboxTasks []*db.SafeboxTask, utxoAmount int64, txFee, networkTxPrice, witnessSize uint64, epochVoter db.EpochVoter, network *chaincfg.Params) (*types.MsgSignSendOrder, error) { +// pid is optional - when pid > 0, it indicates this is an RBF order for ReplaceWithdrawalV2 +func (w *WalletServer) createSendOrder(tx *wire.MsgTx, orderType string, selectedUtxos []*db.Utxo, selectedWithdraws []*db.Withdraw, safeboxTasks []*db.SafeboxTask, utxoAmount int64, txFee, networkTxPrice, witnessSize uint64, epochVoter db.EpochVoter, network *chaincfg.Params, pid uint64) (*types.MsgSignSendOrder, error) { noWitnessTx, err := types.SerializeTransactionNoWitness(tx) if err != nil { return nil, err @@ -415,6 +417,7 @@ func (w *WalletServer) createSendOrder(tx *wire.MsgTx, orderType string, selecte order := &db.SendOrder{ OrderId: uuid.New().String(), Proposer: config.AppConfig.RelayerAddress, + Pid: pid, // Set Pid for RBF orders (pid > 0 means ReplaceWithdrawalV2) Amount: uint64(utxoAmount), TxPrice: networkTxPrice, Status: db.ORDER_STATUS_AGGREGATING, @@ -562,3 +565,221 @@ func (w *WalletServer) cleanWithdrawProcess() { log.Fatalf("WalletServer cleanWithdrawProcess unexpected error %v", err) } } + +// initRbfWithdrawSig initiates RBF (Replace-By-Fee) for withdrawal orders that have UTXO conflicts +// This creates a new transaction with the same withdrawals but different UTXOs and higher fee, +// then triggers BLS signature for MsgReplaceWithdrawalV2 +func (w *WalletServer) initRbfWithdrawSig() { + log.Debug("WalletServer initRbfWithdrawSig") + + // Check preconditions (similar to initWithdrawSig) + l2Info := w.state.GetL2Info() + if l2Info.Syncing { + log.Debugf("WalletServer initRbfWithdrawSig ignore, layer2 is catching up") + return + } + + btcState := w.state.GetBtcHead() + if btcState.Syncing { + log.Debugf("WalletServer initRbfWithdrawSig ignore, btc is catching up") + return + } + + epochVoter := w.state.GetEpochVoter() + if epochVoter.Proposer != config.AppConfig.RelayerAddress { + log.Debugf("WalletServer initRbfWithdrawSig ignore, self is not proposer") + return + } + + w.sigMu.Lock() + if w.sigStatus { + w.sigMu.Unlock() + log.Debug("WalletServer initRbfWithdrawSig ignore, there is a sig in progress") + return + } + w.sigMu.Unlock() + + // Get orders that need RBF + rbfOrders, err := w.state.GetSendOrdersNeedRbf() + if err != nil { + log.Errorf("WalletServer initRbfWithdrawSig GetSendOrdersNeedRbf error: %v", err) + return + } + if len(rbfOrders) == 0 { + log.Debug("WalletServer initRbfWithdrawSig no orders need RBF") + return + } + + log.Infof("WalletServer initRbfWithdrawSig found %d orders need RBF", len(rbfOrders)) + + // Process first RBF order (one at a time) + order := rbfOrders[0] + if order.Pid == 0 { + log.Warnf("WalletServer initRbfWithdrawSig order %s has no Pid, cannot submit ReplaceWithdrawalV2", order.OrderId) + return + } + + // Get the original withdrawals for this order + withdraws, err := w.state.GetWithdrawsByOrderId(order.OrderId) + if err != nil { + log.Errorf("WalletServer initRbfWithdrawSig GetWithdrawsByOrderId error: %v, orderId: %s", err, order.OrderId) + return + } + if len(withdraws) == 0 { + log.Warnf("WalletServer initRbfWithdrawSig no withdraws found for order %s", order.OrderId) + return + } + + // Calculate total withdrawal amount + var totalWithdrawAmount int64 + for _, withdraw := range withdraws { + totalWithdrawAmount += int64(withdraw.Amount) + } + + // Get available UTXOs + utxos, err := w.state.GetUtxoCanSpend() + if err != nil { + log.Errorf("WalletServer initRbfWithdrawSig GetUtxoCanSpend error: %v", err) + return + } + if len(utxos) == 0 { + log.Warn("WalletServer initRbfWithdrawSig no utxos can spend") + return + } + + // Get network and change address + network := types.GetBTCNetwork(config.AppConfig.BTCNetworkType) + pubkey, err := w.state.GetDepositKeyByBtcBlock(0) + if err != nil { + log.Errorf("WalletServer initRbfWithdrawSig GetDepositKeyByBtcBlock error: %v", err) + return + } + pubkeyBytes, err := base64.StdEncoding.DecodeString(pubkey.PubKey) + if err != nil { + log.Errorf("WalletServer initRbfWithdrawSig decode pubkey error: %v", err) + return + } + p2wpkhAddress, err := types.GenerateP2WPKHAddress(pubkeyBytes, network) + if err != nil { + log.Errorf("WalletServer initRbfWithdrawSig GenerateP2WPKHAddress error: %v", err) + return + } + + // Extract receiver types from withdraws and find minimum MaxTxPrice + receiverTypes := make([]string, len(withdraws)) + var minMaxTxPrice uint64 = ^uint64(0) // max uint64 + for i, withdraw := range withdraws { + receiverTypes[i], _ = types.GetAddressType(withdraw.To, network) + if withdraw.TxPrice < minMaxTxPrice { + minMaxTxPrice = withdraw.TxPrice + } + } + + oldTxFee := order.TxFee + + // Use actual network fee for UTXO selection + networkFeeRate := int64(btcState.NetworkFee.FastestFee) + if networkFeeRate == 0 { + networkFeeRate = 10 // fallback to 10 sat/vB + } + + // Select UTXOs for the new transaction + selectOptimalUTXOs, totalSelectedAmount, _, changeAmount, estimateFee, witnessSize, err := SelectOptimalUTXOs( + utxos, receiverTypes, totalWithdrawAmount, 0, networkFeeRate, len(withdraws)) + if err != nil { + log.Errorf("WalletServer initRbfWithdrawSig SelectOptimalUTXOs error: %v", err) + return + } + + log.Infof("WalletServer initRbfWithdrawSig SelectOptimalUTXOs: totalSelectedAmount: %d, withdrawAmount: %d, changeAmount: %d, selectedUtxos: %d", + totalSelectedAmount, totalWithdrawAmount, changeAmount, len(selectOptimalUTXOs)) + + // Create new transaction + withdrawParams := &TransactionParams{ + UTXOs: selectOptimalUTXOs, + Withdrawals: withdraws, + Tasks: nil, + ChangeAddress: p2wpkhAddress.EncodeAddress(), + ChangeAmount: changeAmount, + EstimatedFee: estimateFee, + WitnessSize: witnessSize, + NetworkFee: networkFeeRate, + Net: network, + UtxoAmount: totalSelectedAmount, + WithdrawAmount: totalWithdrawAmount, + } + tx, actualFee, err := CreateRawTransaction(withdrawParams) + if err != nil { + log.Errorf("WalletServer initRbfWithdrawSig CreateRawTransaction error: %v", err) + return + } + + // Calculate vbytes for txPrice calculation + // vbytes = stripped_size + witness_size / 4 + vbytes := float64(tx.SerializeSizeStripped()) + float64(witnessSize)/4.0 + + // Calculate maximum allowed fee based on minimum MaxTxPrice + // txPrice = fee / vbytes, so maxFee = minMaxTxPrice * vbytes + maxAllowedFee := uint64(float64(minMaxTxPrice) * vbytes) + + // Calculate minimum required fee (just 1 satoshi more than old fee) + minRequiredFee := oldTxFee + 1 + + // Check if RBF is possible within MaxTxPrice constraint + if minRequiredFee > maxAllowedFee { + log.Warnf("WalletServer initRbfWithdrawSig cannot proceed: minRequiredFee(%d) > maxAllowedFee(%d), minMaxTxPrice: %d, vbytes: %.2f", + minRequiredFee, maxAllowedFee, minMaxTxPrice, vbytes) + return + } + + // Smart fee calculation: + // 1. Use network fee rate if available + // 2. Cap at maxAllowedFee + // 3. Ensure > oldTxFee + networkBasedFee := uint64(float64(networkFeeRate) * vbytes) + + if actualFee <= oldTxFee { + // If CreateRawTransaction's fee is too low, recalculate + if networkBasedFee > oldTxFee && networkBasedFee <= maxAllowedFee { + // Use network-based fee + actualFee = networkBasedFee + } else if networkBasedFee > maxAllowedFee { + // Network fee exceeds max allowed, use max allowed + actualFee = maxAllowedFee + } else { + // Network fee is still too low, use minimum increment + actualFee = minRequiredFee + } + } else if actualFee > maxAllowedFee { + // Cap at max allowed fee + actualFee = maxAllowedFee + } + + // Final sanity check + if actualFee <= oldTxFee { + log.Errorf("WalletServer initRbfWithdrawSig fee calculation error: actualFee(%d) <= oldTxFee(%d)", actualFee, oldTxFee) + return + } + + newTxPrice := float64(actualFee) / vbytes + log.Infof("WalletServer initRbfWithdrawSig CreateRawTransaction: tx: %s, actualFee: %d, oldTxFee: %d, witnessSize: %d, vbytes: %.2f, newTxPrice: %.2f, minMaxTxPrice: %d, networkFeeRate: %d", + tx.TxID(), actualFee, oldTxFee, witnessSize, vbytes, newTxPrice, minMaxTxPrice, networkFeeRate) + + // Create RBF order message using existing createSendOrder with pid > 0 + // This will be recognized as RBF order in aggSigSendOrder and submitted via ReplaceWithdrawalV2 + msgSignSendOrder, err := w.createSendOrder(tx, db.ORDER_TYPE_WITHDRAWAL, selectOptimalUTXOs, withdraws, nil, + totalSelectedAmount, actualFee, uint64(networkFeeRate), uint64(witnessSize), epochVoter, network, order.Pid) + if err != nil { + log.Errorf("WalletServer initRbfWithdrawSig createSendOrder error: %v", err) + return + } + + w.sigMu.Lock() + w.sigStatus = true + w.sigMu.Unlock() + + // Publish to event bus for BLS signing (same as normal withdrawal) + w.state.EventBus.Publish(state.SigStart, *msgSignSendOrder) + log.Infof("WalletServer initRbfWithdrawSig send MsgSignSendOrder to bus, requestId: %s, Pid: %d (RBF)", msgSignSendOrder.MsgSign.RequestId, order.Pid) +} + diff --git a/internal/wallet/withdraw_broadcast.go b/internal/wallet/withdraw_broadcast.go index 10d80a3..347e8d1 100644 --- a/internal/wallet/withdraw_broadcast.go +++ b/internal/wallet/withdraw_broadcast.go @@ -51,8 +51,9 @@ type BaseOrderBroadcaster struct { remoteClient RemoteClient state *state.State - txBroadcastMu sync.Mutex - txBroadcastCh chan interface{} + txBroadcastMu sync.Mutex + txBroadcastCh chan interface{} + sendOrderRbfCh chan interface{} // txBroadcastStatus bool // txBroadcastFinishBtcHeight uint64 } @@ -312,6 +313,63 @@ func (c *FireblocksClient) CheckPending(txid string, externalTxId string, update log.Warnf("Transaction signature verification failed, error reason: %v, txid: %s", rpcErr, txid) return false, 0, 0, fmt.Errorf("send raw transaction error: %v, txid: %s", err, txid) } + case btcjson.ErrRPCVerify: + // Handle bad-txns-inputs-missingorspent (-25) and similar errors + // Error -25 means the transaction was REJECTED by the node (UTXOs already spent) + // Our transaction never went on chain - UTXOs were spent by a different tx + if strings.Contains(rpcErr.Message, "bad-txns-inputs-missingorspent") || + strings.Contains(rpcErr.Message, "txn-mempool-conflict") || + strings.Contains(rpcErr.Message, "missing-inputs") { + log.Warnf("UTXO conflict detected (tx rejected), cleaning up UTXOs: %v, txid: %s", rpcErr, txid) + + // Get the send order to find orderId + sendOrder, orderErr := c.state.GetSendOrderByTxIdOrExternalId(txid) + if orderErr != nil { + log.Errorf("Failed to get send order for cleanup: %v, txid: %s", orderErr, txid) + return true, 0, 0, nil + } + + // Use CleanInitializedNeedRbfWithdrawByOrderId to check UTXOs and handle based on order type + orderType, cleanupErr := c.state.CleanInitializedNeedRbfWithdrawByOrderId(sendOrder.OrderId, func(utxoTxid string, outIndex int) (bool, error) { + txHash, err := chainhash.NewHashFromStr(utxoTxid) + if err != nil { + return false, err + } + // GetTxOut returns nil if UTXO is spent + txOut, err := c.btcRpc.GetTxOut(txHash, uint32(outIndex), true) + if err != nil { + return false, err + } + // If txOut is nil, UTXO is spent. + // Return true to indicate UTXO should be cleaned up. + return txOut == nil, nil + }) + if cleanupErr != nil { + log.Errorf("Failed to cleanup UTXOs: %v, txid: %s, orderId: %s", cleanupErr, txid, sendOrder.OrderId) + } else { + // Broadcast RBF cleanup to other nodes + p2p.PublishMessage(context.Background(), p2p.Message[any]{ + MessageType: p2p.MessageTypeSendOrderRbf, + RequestId: fmt.Sprintf("RBF:%s:%s", config.AppConfig.RelayerAddress, sendOrder.OrderId), + DataType: "MsgSendOrderRbf", + Data: types.MsgSendOrderRbf{ + Txid: txid, + OrderId: sendOrder.OrderId, + OrderType: orderType, + Reason: "utxo-conflict", + }, + }) + log.Infof("Broadcasted RBF message for order %s, txid: %s, type: %s", sendOrder.OrderId, txid, orderType) + } + + // For withdrawal orders, the order is now marked as RBF_REQUEST + // The RBF processor will pick it up and submit ReplaceWithdrawalV2 + if orderType == db.ORDER_TYPE_WITHDRAWAL { + log.Infof("Withdrawal order %s marked for RBF, Pid: %d", sendOrder.OrderId, sendOrder.Pid) + } + + return true, 0, 0, nil + } } } return false, 0, 0, fmt.Errorf("send raw transaction error: %v, txid: %s", err, txid) @@ -345,8 +403,9 @@ func (c *FireblocksClient) CheckPending(txid string, externalTxId string, update func NewOrderBroadcaster(btcClient *btc.BTCRPCService, state *state.State) OrderBroadcaster { orderBroadcaster := &BaseOrderBroadcaster{ - state: state, - txBroadcastCh: make(chan interface{}, 100), + state: state, + txBroadcastCh: make(chan interface{}, 100), + sendOrderRbfCh: make(chan interface{}, 100), } if config.AppConfig.BTCNetworkType == "regtest" { orderBroadcaster.remoteClient = &BtcClient{ @@ -367,6 +426,7 @@ func NewOrderBroadcaster(btcClient *btc.BTCRPCService, state *state.State) Order func (b *BaseOrderBroadcaster) Start(ctx context.Context) { log.Debug("BaseOrderBroadcaster start") b.state.EventBus.Subscribe(state.SendOrderBroadcasted, b.txBroadcastCh) + b.state.EventBus.Subscribe(state.SendOrderRbf, b.sendOrderRbfCh) ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() @@ -376,6 +436,8 @@ func (b *BaseOrderBroadcaster) Start(ctx context.Context) { case <-ctx.Done(): b.Stop() return + case msg := <-b.sendOrderRbfCh: + b.handleSendOrderRbf(msg) case msg := <-b.txBroadcastCh: msgOrder, ok := msg.(types.MsgSendOrderBroadcasted) if !ok { @@ -458,6 +520,53 @@ func (b *BaseOrderBroadcaster) Start(ctx context.Context) { func (b *BaseOrderBroadcaster) Stop() { } +// handleSendOrderRbf processes RBF messages from other nodes +// When a node detects UTXO conflict, it broadcasts RBF cleanup to other nodes +// Non-proposer nodes use this to clean up their local state +func (b *BaseOrderBroadcaster) handleSendOrderRbf(msg interface{}) { + rbfMsg, ok := msg.(types.MsgSendOrderRbf) + if !ok { + log.Errorf("Invalid send order RBF message type") + return + } + + log.Infof("Received send order RBF message: txid=%s, orderId=%s, orderType=%s, reason=%s", + rbfMsg.Txid, rbfMsg.OrderId, rbfMsg.OrderType, rbfMsg.Reason) + + // Find the order by txid + sendOrder, err := b.state.GetSendOrderByTxIdOrExternalId(rbfMsg.Txid) + if err != nil { + log.Warnf("handleSendOrderRbf: failed to find order by txid %s: %v", rbfMsg.Txid, err) + return + } + if sendOrder == nil { + log.Warnf("handleSendOrderRbf: order not found for txid %s", rbfMsg.Txid) + return + } + + // Verify orderId matches + if sendOrder.OrderId != rbfMsg.OrderId { + log.Warnf("handleSendOrderRbf: orderId mismatch, local=%s, received=%s", sendOrder.OrderId, rbfMsg.OrderId) + return + } + + // Non-proposer nodes don't have BTC RPC access, so we do a simpler cleanup + // Trust the proposer's cleanup and mark UTXOs as spent + _, cleanupErr := b.state.CleanInitializedNeedRbfWithdrawByOrderId(sendOrder.OrderId, func(utxoTxid string, outIndex int) (bool, error) { + // For non-proposer nodes, assume all UTXOs in conflicting order are spent + // This is safe because: + // 1. The proposer already verified spent status via BTC RPC + // 2. If UTXO is not actually spent, it will be restored when we try to use it again + return true, nil + }) + if cleanupErr != nil { + log.Errorf("handleSendOrderRbf: failed to cleanup order %s: %v", sendOrder.OrderId, cleanupErr) + return + } + + log.Infof("handleSendOrderRbf: successfully cleaned up order %s, txid=%s", sendOrder.OrderId, rbfMsg.Txid) +} + // broadcastOrders is a function that broadcasts withdrawal and consolidation orders to the network func (b *BaseOrderBroadcaster) broadcastOrders() { l2Info := b.state.GetL2Info()