Skip to content

Commit e0d42ac

Browse files
committed
Add blockchain.outpoint.subscribe RPC
1 parent fd8361b commit e0d42ac

File tree

3 files changed

+200
-6
lines changed

3 files changed

+200
-6
lines changed

Diff for: src/electrum.rs

+51-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
22
use bitcoin::{
33
consensus::{deserialize, serialize},
44
hashes::hex::{FromHex, ToHex},
5-
BlockHash, Txid,
5+
BlockHash, OutPoint, Txid,
66
};
77
use crossbeam_channel::Receiver;
88
use rayon::prelude::*;
@@ -19,7 +19,7 @@ use crate::{
1919
merkle::Proof,
2020
metrics::{self, Histogram, Metrics},
2121
signals::Signal,
22-
status::ScriptHashStatus,
22+
status::{OutPointStatus, ScriptHashStatus},
2323
tracker::Tracker,
2424
types::ScriptHash,
2525
};
@@ -34,6 +34,7 @@ const UNSUBSCRIBED_QUERY_MESSAGE: &str = "your wallet uses less efficient method
3434
pub struct Client {
3535
tip: Option<BlockHash>,
3636
scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
37+
outpoints: HashMap<OutPoint, OutPointStatus>,
3738
}
3839

3940
#[derive(Deserialize)]
@@ -183,7 +184,25 @@ impl Rpc {
183184
}
184185
})
185186
.collect::<Result<Vec<Value>>>()
186-
.context("failed to update status")?;
187+
.context("failed to update scripthash status")?;
188+
189+
notifications.extend(
190+
client
191+
.outpoints
192+
.par_iter_mut()
193+
.filter_map(|(outpoint, status)| -> Option<Result<Value>> {
194+
match self.tracker.update_outpoint_status(status, &self.daemon) {
195+
Ok(true) => Some(Ok(notification(
196+
"blockchain.outpoint.subscribe",
197+
&[json!([outpoint.txid, outpoint.vout]), json!(status)],
198+
))),
199+
Ok(false) => None, // outpoint status is the same
200+
Err(e) => Some(Err(e)),
201+
}
202+
})
203+
.collect::<Result<Vec<Value>>>()
204+
.context("failed to update scripthash status")?,
205+
);
187206

188207
if let Some(old_tip) = client.tip {
189208
let new_tip = self.tracker.chain().tip();
@@ -342,6 +361,28 @@ impl Rpc {
342361
})
343362
}
344363

364+
fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
365+
let outpoint = OutPoint::new(txid, vout);
366+
Ok(match client.outpoints.entry(outpoint) {
367+
Entry::Occupied(e) => json!(e.get()),
368+
Entry::Vacant(e) => {
369+
let outpoint = OutPoint::new(txid, vout);
370+
let mut status = OutPointStatus::new(outpoint);
371+
self.tracker
372+
.update_outpoint_status(&mut status, &self.daemon)?;
373+
json!(e.insert(status))
374+
}
375+
})
376+
}
377+
378+
fn outpoint_unsubscribe(
379+
&self,
380+
client: &mut Client,
381+
(txid, vout): (Txid, u32),
382+
) -> Result<Value> {
383+
Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
384+
}
385+
345386
fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
346387
let mut status = ScriptHashStatus::new(scripthash);
347388
self.tracker
@@ -525,6 +566,8 @@ impl Rpc {
525566
Params::Features => self.features(),
526567
Params::HeadersSubscribe => self.headers_subscribe(client),
527568
Params::MempoolFeeHistogram => self.get_fee_histogram(),
569+
Params::OutPointSubscribe(args) => self.outpoint_subscribe(client, *args),
570+
Params::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, *args),
528571
Params::PeersSubscribe => Ok(json!([])),
529572
Params::Ping => Ok(Value::Null),
530573
Params::RelayFee => self.relayfee(),
@@ -547,19 +590,21 @@ enum Params {
547590
Banner,
548591
BlockHeader((usize,)),
549592
BlockHeaders((usize, usize)),
550-
TransactionBroadcast((String,)),
551593
Donation,
552594
EstimateFee((u16,)),
553595
Features,
554596
HeadersSubscribe,
555597
MempoolFeeHistogram,
598+
OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
599+
OutPointUnsubscribe((Txid, u32)),
556600
PeersSubscribe,
557601
Ping,
558602
RelayFee,
559603
ScriptHashGetBalance((ScriptHash,)),
560604
ScriptHashGetHistory((ScriptHash,)),
561605
ScriptHashListUnspent((ScriptHash,)),
562606
ScriptHashSubscribe((ScriptHash,)),
607+
TransactionBroadcast((String,)),
563608
TransactionGet(TxGetArgs),
564609
TransactionGetMerkle((Txid, usize)),
565610
Version((String, Version)),
@@ -572,6 +617,8 @@ impl Params {
572617
"blockchain.block.headers" => Params::BlockHeaders(convert(params)?),
573618
"blockchain.estimatefee" => Params::EstimateFee(convert(params)?),
574619
"blockchain.headers.subscribe" => Params::HeadersSubscribe,
620+
"blockchain.outpoint.subscribe" => Params::OutPointSubscribe(convert(params)?),
621+
"blockchain.outpoint.unsubscribe" => Params::OutPointUnsubscribe(convert(params)?),
575622
"blockchain.relayfee" => Params::RelayFee,
576623
"blockchain.scripthash.get_balance" => Params::ScriptHashGetBalance(convert(params)?),
577624
"blockchain.scripthash.get_history" => Params::ScriptHashGetHistory(convert(params)?),

Diff for: src/status.rs

+140-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bitcoin::{
44
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
55
};
66
use rayon::prelude::*;
7-
use serde::ser::{Serialize, Serializer};
7+
use serde::ser::{Serialize, SerializeMap, Serializer};
88

99
use std::collections::{BTreeMap, HashMap, HashSet};
1010
use std::convert::TryFrom;
@@ -48,12 +48,26 @@ impl TxEntry {
4848
// Confirmation height of a transaction or its mempool state:
4949
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
5050
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool
51+
#[derive(Copy, Clone, Eq, PartialEq)]
5152
enum Height {
5253
Confirmed { height: usize },
5354
Unconfirmed { has_unconfirmed_inputs: bool },
5455
}
5556

5657
impl Height {
58+
fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
59+
let height = chain
60+
.get_block_height(&blockhash)
61+
.expect("missing block in chain");
62+
Self::Confirmed { height }
63+
}
64+
65+
fn unconfirmed(e: &crate::mempool::Entry) -> Self {
66+
Self::Unconfirmed {
67+
has_unconfirmed_inputs: e.has_unconfirmed_inputs,
68+
}
69+
}
70+
5771
fn as_i64(&self) -> i64 {
5872
match self {
5973
Self::Confirmed { height } => i64::try_from(*height).unwrap(),
@@ -539,6 +553,131 @@ fn filter_block_txs<T: Send>(
539553
.into_iter()
540554
}
541555

556+
pub(crate) struct OutPointStatus {
557+
outpoint: OutPoint,
558+
funding: Option<Height>,
559+
spending: Option<(Txid, Height)>,
560+
tip: BlockHash,
561+
}
562+
563+
impl Serialize for OutPointStatus {
564+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
565+
where
566+
S: Serializer,
567+
{
568+
let mut map = serializer.serialize_map(None)?;
569+
if let Some(funding) = &self.funding {
570+
map.serialize_entry("height", &funding)?;
571+
}
572+
if let Some((txid, height)) = &self.spending {
573+
map.serialize_entry("spender_txhash", &txid)?;
574+
map.serialize_entry("spender_height", &height)?;
575+
}
576+
map.end()
577+
}
578+
}
579+
580+
impl OutPointStatus {
581+
pub(crate) fn new(outpoint: OutPoint) -> Self {
582+
Self {
583+
outpoint,
584+
funding: None,
585+
spending: None,
586+
tip: BlockHash::all_zeros(),
587+
}
588+
}
589+
590+
pub(crate) fn sync(
591+
&mut self,
592+
index: &Index,
593+
mempool: &Mempool,
594+
daemon: &Daemon,
595+
) -> Result<bool> {
596+
let funding = self.sync_funding(index, daemon, mempool)?;
597+
let spending = self.sync_spending(index, daemon, mempool)?;
598+
let same_status = (self.funding == funding) && (self.spending == spending);
599+
self.funding = funding;
600+
self.spending = spending;
601+
self.tip = index.chain().tip();
602+
Ok(!same_status)
603+
}
604+
605+
/// Return true iff current tip became unconfirmed
606+
fn is_reorg(&self, chain: &Chain) -> bool {
607+
chain.get_block_height(&self.tip).is_none()
608+
}
609+
610+
fn sync_funding(
611+
&self,
612+
index: &Index,
613+
daemon: &Daemon,
614+
mempool: &Mempool,
615+
) -> Result<Option<Height>> {
616+
let chain = index.chain();
617+
if !self.is_reorg(chain) {
618+
if let Some(Height::Confirmed { .. }) = &self.funding {
619+
return Ok(self.funding);
620+
}
621+
}
622+
let mut confirmed = None;
623+
daemon.for_blocks(
624+
index.filter_by_txid(self.outpoint.txid),
625+
|blockhash, block| {
626+
if confirmed.is_none() {
627+
for tx in block.txdata {
628+
let txid = tx.txid();
629+
let output_len = u32::try_from(tx.output.len()).unwrap();
630+
if self.outpoint.txid == txid && self.outpoint.vout < output_len {
631+
confirmed = Some(Height::from_blockhash(blockhash, chain));
632+
return;
633+
}
634+
}
635+
}
636+
},
637+
)?;
638+
Ok(confirmed.or_else(|| {
639+
mempool
640+
.get(&self.outpoint.txid)
641+
.map(|entry| Height::unconfirmed(entry))
642+
}))
643+
}
644+
645+
fn sync_spending(
646+
&self,
647+
index: &Index,
648+
daemon: &Daemon,
649+
mempool: &Mempool,
650+
) -> Result<Option<(Txid, Height)>> {
651+
let chain = index.chain();
652+
if !self.is_reorg(chain) {
653+
if let Some((_, Height::Confirmed { .. })) = &self.spending {
654+
return Ok(self.spending);
655+
}
656+
}
657+
let spending_blockhashes = index.filter_by_spending(self.outpoint);
658+
let mut confirmed = None;
659+
daemon.for_blocks(spending_blockhashes, |blockhash, block| {
660+
for tx in block.txdata {
661+
for txi in &tx.input {
662+
if txi.previous_output == self.outpoint {
663+
// TODO: there should be only one spending input
664+
assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
665+
confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
666+
return;
667+
}
668+
}
669+
}
670+
})?;
671+
Ok(confirmed.or_else(|| {
672+
let entries = mempool.filter_by_spending(&self.outpoint);
673+
assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
674+
entries
675+
.first()
676+
.map(|entry| (entry.txid, Height::unconfirmed(entry)))
677+
}))
678+
}
679+
}
680+
542681
#[cfg(test)]
543682
mod tests {
544683
use super::HistoryEntry;

Diff for: src/tracker.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
mempool::{FeeHistogram, Mempool},
1212
metrics::Metrics,
1313
signals::ExitFlag,
14-
status::{Balance, ScriptHashStatus, UnspentEntry},
14+
status::{Balance, OutPointStatus, ScriptHashStatus, UnspentEntry},
1515
};
1616

1717
/// Electrum protocol subscriptions' tracker
@@ -114,4 +114,12 @@ impl Tracker {
114114
})?;
115115
Ok(result)
116116
}
117+
118+
pub(crate) fn update_outpoint_status(
119+
&self,
120+
status: &mut OutPointStatus,
121+
daemon: &Daemon,
122+
) -> Result<bool> {
123+
status.sync(&self.index, &self.mempool, daemon)
124+
}
117125
}

0 commit comments

Comments
 (0)