Skip to content

Commit 834bbd7

Browse files
committed
Add blockchain.outpoint.subscribe RPC
1 parent 41763a4 commit 834bbd7

File tree

3 files changed

+196
-6
lines changed

3 files changed

+196
-6
lines changed

src/electrum.rs

+51-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
22
use bitcoin::{
33
consensus::{deserialize, encode::serialize_hex},
44
hashes::hex::FromHex,
5-
BlockHash, Txid,
5+
BlockHash, OutPoint, Txid,
66
};
77
use crossbeam_channel::Receiver;
88
use rayon::prelude::*;
@@ -21,7 +21,7 @@ use crate::{
2121
merkle::Proof,
2222
metrics::{self, Histogram, Metrics},
2323
signals::Signal,
24-
status::ScriptHashStatus,
24+
status::{OutPointStatus, ScriptHashStatus},
2525
tracker::Tracker,
2626
types::ScriptHash,
2727
};
@@ -36,6 +36,7 @@ const UNSUBSCRIBED_QUERY_MESSAGE: &str = "your wallet uses less efficient method
3636
pub struct Client {
3737
tip: Option<BlockHash>,
3838
scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
39+
outpoints: HashMap<OutPoint, OutPointStatus>,
3940
}
4041

4142
#[derive(Deserialize)]
@@ -185,7 +186,25 @@ impl Rpc {
185186
}
186187
})
187188
.collect::<Result<Vec<Value>>>()
188-
.context("failed to update status")?;
189+
.context("failed to update scripthash status")?;
190+
191+
notifications.extend(
192+
client
193+
.outpoints
194+
.par_iter_mut()
195+
.filter_map(|(outpoint, status)| -> Option<Result<Value>> {
196+
match self.tracker.update_outpoint_status(status, &self.daemon) {
197+
Ok(true) => Some(Ok(notification(
198+
"blockchain.outpoint.subscribe",
199+
&[json!([outpoint.txid, outpoint.vout]), json!(status)],
200+
))),
201+
Ok(false) => None, // outpoint status is the same
202+
Err(e) => Some(Err(e)),
203+
}
204+
})
205+
.collect::<Result<Vec<Value>>>()
206+
.context("failed to update scripthash status")?,
207+
);
189208

190209
if let Some(old_tip) = client.tip {
191210
let new_tip = self.tracker.chain().tip();
@@ -350,6 +369,28 @@ impl Rpc {
350369
})
351370
}
352371

372+
fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
373+
let outpoint = OutPoint::new(txid, vout);
374+
Ok(match client.outpoints.entry(outpoint) {
375+
Entry::Occupied(e) => json!(e.get()),
376+
Entry::Vacant(e) => {
377+
let outpoint = OutPoint::new(txid, vout);
378+
let mut status = OutPointStatus::new(outpoint);
379+
self.tracker
380+
.update_outpoint_status(&mut status, &self.daemon)?;
381+
json!(e.insert(status))
382+
}
383+
})
384+
}
385+
386+
fn outpoint_unsubscribe(
387+
&self,
388+
client: &mut Client,
389+
(txid, vout): (Txid, u32),
390+
) -> Result<Value> {
391+
Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
392+
}
393+
353394
fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
354395
let mut status = ScriptHashStatus::new(scripthash);
355396
self.tracker
@@ -548,6 +589,8 @@ impl Rpc {
548589
Params::Features => self.features(),
549590
Params::HeadersSubscribe => self.headers_subscribe(client),
550591
Params::MempoolFeeHistogram => self.get_fee_histogram(),
592+
Params::OutPointSubscribe(args) => self.outpoint_subscribe(client, *args),
593+
Params::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, *args),
551594
Params::PeersSubscribe => Ok(json!([])),
552595
Params::Ping => Ok(Value::Null),
553596
Params::RelayFee => self.relayfee(),
@@ -572,12 +615,13 @@ enum Params {
572615
Banner,
573616
BlockHeader((usize,)),
574617
BlockHeaders((usize, usize)),
575-
TransactionBroadcast((String,)),
576618
Donation,
577619
EstimateFee((u16,)),
578620
Features,
579621
HeadersSubscribe,
580622
MempoolFeeHistogram,
623+
OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
624+
OutPointUnsubscribe((Txid, u32)),
581625
PeersSubscribe,
582626
Ping,
583627
RelayFee,
@@ -586,6 +630,7 @@ enum Params {
586630
ScriptHashListUnspent((ScriptHash,)),
587631
ScriptHashSubscribe((ScriptHash,)),
588632
ScriptHashUnsubscribe((ScriptHash,)),
633+
TransactionBroadcast((String,)),
589634
TransactionGet(TxGetArgs),
590635
TransactionGetMerkle((Txid, usize)),
591636
TransactionFromPosition((usize, usize, bool)),
@@ -599,6 +644,8 @@ impl Params {
599644
"blockchain.block.headers" => Params::BlockHeaders(convert(params)?),
600645
"blockchain.estimatefee" => Params::EstimateFee(convert(params)?),
601646
"blockchain.headers.subscribe" => Params::HeadersSubscribe,
647+
"blockchain.outpoint.subscribe" => Params::OutPointSubscribe(convert(params)?),
648+
"blockchain.outpoint.unsubscribe" => Params::OutPointUnsubscribe(convert(params)?),
602649
"blockchain.relayfee" => Params::RelayFee,
603650
"blockchain.scripthash.get_balance" => Params::ScriptHashGetBalance(convert(params)?),
604651
"blockchain.scripthash.get_history" => Params::ScriptHashGetHistory(convert(params)?),

src/status.rs

+136-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bitcoin::{
44
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
55
};
66
use rayon::prelude::*;
7-
use serde::ser::{Serialize, Serializer};
7+
use serde::ser::{Serialize, SerializeMap, Serializer};
88

99
use std::collections::{BTreeMap, HashMap, HashSet};
1010
use std::convert::TryFrom;
@@ -48,12 +48,26 @@ impl TxEntry {
4848
// Confirmation height of a transaction or its mempool state:
4949
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
5050
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool
51+
#[derive(Copy, Clone, Eq, PartialEq)]
5152
enum Height {
5253
Confirmed { height: usize },
5354
Unconfirmed { has_unconfirmed_inputs: bool },
5455
}
5556

5657
impl Height {
58+
fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
59+
let height = chain
60+
.get_block_height(&blockhash)
61+
.expect("missing block in chain");
62+
Self::Confirmed { height }
63+
}
64+
65+
fn unconfirmed(e: &crate::mempool::Entry) -> Self {
66+
Self::Unconfirmed {
67+
has_unconfirmed_inputs: e.has_unconfirmed_inputs,
68+
}
69+
}
70+
5771
fn as_i64(&self) -> i64 {
5872
match self {
5973
Self::Confirmed { height } => i64::try_from(*height).unwrap(),
@@ -538,6 +552,127 @@ fn filter_block_txs<T: Send>(
538552
.into_iter()
539553
}
540554

555+
pub(crate) struct OutPointStatus {
556+
outpoint: OutPoint,
557+
funding: Option<Height>,
558+
spending: Option<(Txid, Height)>,
559+
tip: BlockHash,
560+
}
561+
562+
impl Serialize for OutPointStatus {
563+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
564+
where
565+
S: Serializer,
566+
{
567+
let mut map = serializer.serialize_map(None)?;
568+
if let Some(funding) = &self.funding {
569+
map.serialize_entry("height", &funding)?;
570+
}
571+
if let Some((txid, height)) = &self.spending {
572+
map.serialize_entry("spender_txhash", &txid)?;
573+
map.serialize_entry("spender_height", &height)?;
574+
}
575+
map.end()
576+
}
577+
}
578+
579+
impl OutPointStatus {
580+
pub(crate) fn new(outpoint: OutPoint) -> Self {
581+
Self {
582+
outpoint,
583+
funding: None,
584+
spending: None,
585+
tip: BlockHash::all_zeros(),
586+
}
587+
}
588+
589+
pub(crate) fn sync(
590+
&mut self,
591+
index: &Index,
592+
mempool: &Mempool,
593+
daemon: &Daemon,
594+
) -> Result<bool> {
595+
let funding = self.sync_funding(index, daemon, mempool)?;
596+
let spending = self.sync_spending(index, daemon, mempool)?;
597+
let same_status = (self.funding == funding) && (self.spending == spending);
598+
self.funding = funding;
599+
self.spending = spending;
600+
self.tip = index.chain().tip();
601+
Ok(!same_status)
602+
}
603+
604+
/// Return true iff current tip became unconfirmed
605+
fn is_reorg(&self, chain: &Chain) -> bool {
606+
chain.get_block_height(&self.tip).is_none()
607+
}
608+
609+
fn sync_funding(
610+
&self,
611+
index: &Index,
612+
daemon: &Daemon,
613+
mempool: &Mempool,
614+
) -> Result<Option<Height>> {
615+
let chain = index.chain();
616+
if !self.is_reorg(chain) {
617+
if let Some(Height::Confirmed { .. }) = &self.funding {
618+
return Ok(self.funding);
619+
}
620+
}
621+
let mut confirmed = None;
622+
daemon.for_blocks(
623+
index.filter_by_txid(self.outpoint.txid),
624+
|blockhash, block| {
625+
if confirmed.is_none() {
626+
for tx in block.txdata {
627+
let txid = tx.txid();
628+
let output_len = u32::try_from(tx.output.len()).unwrap();
629+
if self.outpoint.txid == txid && self.outpoint.vout < output_len {
630+
confirmed = Some(Height::from_blockhash(blockhash, chain));
631+
return;
632+
}
633+
}
634+
}
635+
},
636+
)?;
637+
Ok(confirmed.or_else(|| mempool.get(&self.outpoint.txid).map(Height::unconfirmed)))
638+
}
639+
640+
fn sync_spending(
641+
&self,
642+
index: &Index,
643+
daemon: &Daemon,
644+
mempool: &Mempool,
645+
) -> Result<Option<(Txid, Height)>> {
646+
let chain = index.chain();
647+
if !self.is_reorg(chain) {
648+
if let Some((_, Height::Confirmed { .. })) = &self.spending {
649+
return Ok(self.spending);
650+
}
651+
}
652+
let spending_blockhashes = index.filter_by_spending(self.outpoint);
653+
let mut confirmed = None;
654+
daemon.for_blocks(spending_blockhashes, |blockhash, block| {
655+
for tx in block.txdata {
656+
for txi in &tx.input {
657+
if txi.previous_output == self.outpoint {
658+
// TODO: there should be only one spending input
659+
assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
660+
confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
661+
return;
662+
}
663+
}
664+
}
665+
})?;
666+
Ok(confirmed.or_else(|| {
667+
let entries = mempool.filter_by_spending(&self.outpoint);
668+
assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
669+
entries
670+
.first()
671+
.map(|entry| (entry.txid, Height::unconfirmed(entry)))
672+
}))
673+
}
674+
}
675+
541676
#[cfg(test)]
542677
mod tests {
543678
use super::HistoryEntry;

src/tracker.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
mempool::{FeeHistogram, Mempool},
1212
metrics::Metrics,
1313
signals::ExitFlag,
14-
status::{Balance, ScriptHashStatus, UnspentEntry},
14+
status::{Balance, OutPointStatus, ScriptHashStatus, UnspentEntry},
1515
};
1616

1717
/// Electrum protocol subscriptions' tracker
@@ -114,4 +114,12 @@ impl Tracker {
114114
})?;
115115
Ok(result)
116116
}
117+
118+
pub(crate) fn update_outpoint_status(
119+
&self,
120+
status: &mut OutPointStatus,
121+
daemon: &Daemon,
122+
) -> Result<bool> {
123+
status.sync(&self.index, &self.mempool, daemon)
124+
}
117125
}

0 commit comments

Comments
 (0)