Skip to content

Commit 9a0383b

Browse files
authored
Merge pull request Blockstream#35 from mempool/junderw/fix-popular-scripts-script
Fix popular scripts script
2 parents 61763fc + f6a4c4d commit 9a0383b

File tree

2 files changed

+177
-42
lines changed

2 files changed

+177
-42
lines changed

src/bin/popular-scripts.rs

Lines changed: 155 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,181 @@
11
extern crate electrs;
22

3-
use electrs::{
4-
config::Config,
5-
new_index::{Store, TxHistoryKey},
6-
util::bincode_util,
7-
};
3+
use std::{convert::TryInto, thread::ThreadId, time::Instant};
84

5+
use electrs::{config::Config, new_index::db::open_raw_db};
6+
use lazy_static::lazy_static;
7+
8+
/*
9+
// How to run:
10+
export ELECTRS_DATA=/path/to/electrs
11+
cargo run \
12+
-q --release --bin popular-scripts -- \
13+
--db-dir $ELECTRS_DATA/db \
14+
> ./contrib/popular-scripts.txt
15+
*/
16+
17+
type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
18+
lazy_static! {
19+
static ref HISTORY_DB: DB = {
20+
let config = Config::from_args();
21+
open_raw_db(&config.db_path.join("newindex").join("history"))
22+
};
23+
}
24+
25+
// Dev note:
26+
// Only use println for file output (lines for output)
27+
// Use eprintln to print to stderr for dev notifications
928
fn main() {
10-
let config = Config::from_args();
11-
let store = Store::open(&config.db_path.join("newindex"), &config);
29+
let high_usage_threshold = std::env::var("HIGH_USAGE_THRESHOLD")
30+
.ok()
31+
.and_then(|s| s.parse::<u32>().ok())
32+
.unwrap_or(4000);
33+
let thread_count = std::env::var("JOB_THREAD_COUNT")
34+
.ok()
35+
.and_then(|s| s.parse::<usize>().ok())
36+
.unwrap_or(4);
37+
eprintln!(
38+
"Seaching for scripts with history rows of {} or more...",
39+
high_usage_threshold
40+
);
41+
42+
let thread_pool = rayon::ThreadPoolBuilder::new()
43+
.num_threads(thread_count)
44+
.build()
45+
.expect("Built threadpool");
46+
47+
let (sender, receiver) = crossbeam_channel::unbounded::<[u8; 32]>();
48+
49+
let increment = 256 / thread_count;
50+
let bytes: Vec<u8> = (0u8..=255u8)
51+
.filter(|n| *n % increment as u8 == 0)
52+
.collect();
53+
54+
let now = Instant::now();
55+
for i in 0..bytes.len() {
56+
let sender = sender.clone();
57+
let first_byte = bytes[i];
58+
let second_byte = bytes.get(i + 1).copied();
59+
60+
thread_pool.spawn(move || {
61+
let id = std::thread::current().id();
62+
run_iterator(
63+
id,
64+
&HISTORY_DB,
65+
high_usage_threshold,
66+
first_byte,
67+
second_byte,
68+
sender,
69+
now,
70+
);
71+
eprintln!("{id:?} Finished its job!");
72+
})
73+
}
74+
// If we don't drop this sender
75+
// the receiver will hang forever
76+
drop(sender);
77+
78+
while let Ok(script) = receiver.recv() {
79+
println!("{}", hex::encode(script));
80+
}
81+
eprintln!("Finished!!!!");
82+
}
83+
84+
fn run_iterator(
85+
thread_id: ThreadId,
86+
db: &DB,
87+
high_usage_threshold: u32,
88+
first_byte: u8,
89+
next_byte: Option<u8>,
90+
sender: crossbeam_channel::Sender<[u8; 32]>,
91+
now: Instant,
92+
) {
93+
let mut iter = db.raw_iterator();
94+
eprintln!(
95+
"Thread ({thread_id:?}) Seeking DB to beginning of tx histories for b'H' + {}",
96+
hex::encode([first_byte])
97+
);
98+
// H = 72
99+
let mut compare_vec: Vec<u8> = vec![72, first_byte];
100+
iter.seek(&compare_vec); // Seek to beginning of our section
12101

13-
let mut iter = store.history_db().raw_iterator();
14-
iter.seek(b"H");
102+
// Insert the byte of the next section for comparing
103+
// This will tell us when to stop with a closure
104+
type Checker<'a> = Box<dyn Fn(&[u8]) -> bool + 'a>;
105+
let is_finished: Checker<'_> = if let Some(next) = next_byte {
106+
// Modify the vec to what we're looking for next
107+
// to indicate we left our section
108+
compare_vec[1] = next;
109+
Box::new(|key: &[u8]| -> bool { key.starts_with(&compare_vec) })
110+
} else {
111+
// Modify the vec to only have H so we know when we left H
112+
compare_vec.remove(1);
113+
Box::new(|key: &[u8]| -> bool { !key.starts_with(&compare_vec) })
114+
};
115+
116+
eprintln!("Thread ({thread_id:?}) Seeking done");
15117

16118
let mut curr_scripthash = [0u8; 32];
17-
let mut total_entries = 0;
119+
let mut total_entries: usize = 0;
120+
let mut iter_index: usize = 1;
18121

19122
while iter.valid() {
20123
let key = iter.key().unwrap();
21124

22-
if !key.starts_with(b"H") {
125+
if is_finished(key) {
126+
// We have left the txhistory section,
127+
// but we need to check the final scripthash
128+
send_if_popular(
129+
high_usage_threshold,
130+
total_entries,
131+
curr_scripthash,
132+
&sender,
133+
);
23134
break;
24135
}
25136

26-
let entry: TxHistoryKey =
27-
bincode_util::deserialize_big(key).expect("failed to deserialize TxHistoryKey");
137+
if iter_index % 10_000_000 == 0 {
138+
let duration = now.elapsed().as_secs();
139+
eprintln!(
140+
"Thread ({thread_id:?}) Processing row #{iter_index}... {duration} seconds elapsed"
141+
);
142+
}
143+
144+
// We know that the TxHistory key is 1 byte "H" followed by
145+
// 32 byte scripthash
146+
let entry_hash: [u8; 32] = key[1..33].try_into().unwrap();
28147

29-
if curr_scripthash != entry.hash {
30-
if total_entries > 100 {
31-
println!("{} {}", hex::encode(curr_scripthash), total_entries);
32-
}
148+
if curr_scripthash != entry_hash {
149+
// We have rolled on to a new scripthash
150+
// If the last scripthash was popular
151+
// Collect for sorting
152+
send_if_popular(
153+
high_usage_threshold,
154+
total_entries,
155+
curr_scripthash,
156+
&sender,
157+
);
33158

34-
curr_scripthash = entry.hash;
159+
// After collecting, reset values for next scripthash
160+
curr_scripthash = entry_hash;
35161
total_entries = 0;
36162
}
37163

38164
total_entries += 1;
165+
iter_index += 1;
39166

40167
iter.next();
41168
}
169+
}
42170

43-
if total_entries >= 4000 {
44-
println!(
45-
"scripthash,{},{}",
46-
hex::encode(curr_scripthash),
47-
total_entries
48-
);
171+
#[inline]
172+
fn send_if_popular(
173+
high_usage_threshold: u32,
174+
total_entries: usize,
175+
curr_scripthash: [u8; 32],
176+
sender: &crossbeam_channel::Sender<[u8; 32]>,
177+
) {
178+
if total_entries >= high_usage_threshold as usize {
179+
sender.send(curr_scripthash).unwrap();
49180
}
50181
}

src/new_index/db.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -82,25 +82,8 @@ pub enum DBFlush {
8282

8383
impl DB {
8484
pub fn open(path: &Path, config: &Config) -> DB {
85-
debug!("opening DB at {:?}", path);
86-
let mut db_opts = rocksdb::Options::default();
87-
db_opts.create_if_missing(true);
88-
db_opts.set_max_open_files(100_000); // TODO: make sure to `ulimit -n` this process correctly
89-
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
90-
db_opts.set_compression_type(rocksdb::DBCompressionType::None);
91-
db_opts.set_target_file_size_base(1_073_741_824);
92-
db_opts.set_write_buffer_size(256 << 20);
93-
db_opts.set_disable_auto_compactions(true); // for initial bulk load
94-
95-
// db_opts.set_advise_random_on_open(???);
96-
db_opts.set_compaction_readahead_size(1 << 20);
97-
db_opts.increase_parallelism(2);
98-
99-
// let mut block_opts = rocksdb::BlockBasedOptions::default();
100-
// block_opts.set_block_size(???);
101-
10285
let db = DB {
103-
db: rocksdb::DB::open(&db_opts, path).expect("failed to open RocksDB"),
86+
db: open_raw_db(path),
10487
};
10588
db.verify_compatibility(config);
10689
db
@@ -213,3 +196,24 @@ impl DB {
213196
}
214197
}
215198
}
199+
200+
pub fn open_raw_db<T: rocksdb::ThreadMode>(path: &Path) -> rocksdb::DBWithThreadMode<T> {
201+
debug!("opening DB at {:?}", path);
202+
let mut db_opts = rocksdb::Options::default();
203+
db_opts.create_if_missing(true);
204+
db_opts.set_max_open_files(100_000); // TODO: make sure to `ulimit -n` this process correctly
205+
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
206+
db_opts.set_compression_type(rocksdb::DBCompressionType::None);
207+
db_opts.set_target_file_size_base(1_073_741_824);
208+
db_opts.set_write_buffer_size(256 << 20);
209+
db_opts.set_disable_auto_compactions(true); // for initial bulk load
210+
211+
// db_opts.set_advise_random_on_open(???);
212+
db_opts.set_compaction_readahead_size(1 << 20);
213+
db_opts.increase_parallelism(2);
214+
215+
// let mut block_opts = rocksdb::BlockBasedOptions::default();
216+
// block_opts.set_block_size(???);
217+
218+
rocksdb::DBWithThreadMode::<T>::open(&db_opts, path).expect("failed to open RocksDB")
219+
}

0 commit comments

Comments
 (0)