Skip to content

Commit 6aab219

Browse files
STR-1739: ASM Logs in CSM (#1078)
* feat(service): adapters for the listener service pattern. * feat(csm): consume ASM logs in client_worker. * chore(csm): cleanups. * chore(csm): prettify csm worker and move as a crate. * chore(service): lossen up ServiceMonitor and define tokio input for watch. * chore: remove debug-utils feature check from checkpoint verification and always allow empty proofs (for now). * chore: some review changes. * fix: make csm listener to be able to replay (missing) entries. * fix lints * chore: review and leave TODOs. * chore: add some unit tests for new csm-worker. * fixes after rebase * fix: add a debug log instead of warn --------- Co-authored-by: Prajwol Gyawali <[email protected]>
1 parent 801db7b commit 6aab219

File tree

48 files changed

+1604
-878
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1604
-878
lines changed

Cargo.lock

Lines changed: 28 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ members = [
3030
"crates/common",
3131
"crates/config",
3232
"crates/consensus-logic",
33+
"crates/csm-worker",
3334
"crates/crypto",
3435
"crates/csm-types",
3536
"crates/da-framework",
@@ -163,6 +164,7 @@ strata-config = { path = "crates/config" }
163164
strata-consensus-logic = { path = "crates/consensus-logic" }
164165
strata-crypto = { path = "crates/crypto", default-features = false }
165166
strata-csm-types = { path = "crates/csm-types" }
167+
strata-csm-worker = { path = "crates/csm-worker" }
166168
strata-da-framework = { path = "crates/da-framework" }
167169
strata-db = { path = "crates/db" }
168170
strata-db-store-rocksdb = { path = "crates/db-store-rocksdb" }

bin/strata-client/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ strata-rpc-api = { workspace = true, default-features = true }
3232
strata-rpc-types.workspace = true
3333
strata-rpc-utils.workspace = true
3434
strata-sequencer.workspace = true
35-
strata-state.workspace = true
3635
strata-status.workspace = true
3736
strata-storage.workspace = true
3837
strata-sync.workspace = true

bin/strata-client/src/main.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ use strata_sequencer::{
4747
block_template,
4848
checkpoint::{checkpoint_expiry_worker, checkpoint_worker, CheckpointHandle},
4949
};
50-
use strata_state::CombinedBlockSubmitter;
5150
use strata_status::StatusChannel;
5251
use strata_storage::{create_node_storage, NodeStorage};
5352
use strata_sync::{self, L2SyncContext, RpcSyncPeer};
@@ -343,11 +342,8 @@ fn start_core_tasks(
343342
)?
344343
.into();
345344

346-
// Creating a combined block consumer both for CSM and for ASM, so they are in-sync.
347-
// TODO: Ideally, ASM service would use its own reader task with its own reader config.
348-
let block_submitter =
349-
CombinedBlockSubmitter::new(vec![sync_manager.get_csm_ctl(), sync_manager.get_asm_ctl()]);
350-
345+
// ASM processes L1 blocks from the bitcoin reader.
346+
// CSM listens to ASM logs (via the service framework listener pattern).
351347
// Start the L1 tasks to get that going.
352348
executor.spawn_critical_async(
353349
"bitcoin_data_reader_task",
@@ -357,7 +353,7 @@ fn start_core_tasks(
357353
Arc::new(config.btcio.reader.clone()),
358354
sync_manager.get_params(),
359355
status_channel.clone(),
360-
Arc::new(block_submitter),
356+
sync_manager.get_asm_ctl(),
361357
),
362358
);
363359

bin/strata-dbtool/src/cmd/chainstate.rs

Lines changed: 88 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
use argh::FromArgs;
2-
use strata_checkpoint_types::Checkpoint;
32
use strata_cli_common::errors::{DisplayableError, DisplayedError};
43
use strata_consensus_logic::chain_worker_context::conv_blkid_to_slot_wb_id;
54
use strata_db::{
65
chainstate::ChainstateDatabase,
76
traits::{
8-
BlockStatus, CheckpointDatabase, DatabaseBackend, L1BroadcastDatabase, L1WriterDatabase,
9-
L2BlockDatabase,
7+
BlockStatus, CheckpointDatabase, ClientStateDatabase, DatabaseBackend, L2BlockDatabase,
108
},
11-
types::IntentStatus,
129
};
1310
use strata_ol_chainstate_types::WriteBatch;
14-
use strata_primitives::l2::L2BlockId;
11+
use strata_primitives::{l1::L1BlockCommitment, l2::L2BlockId};
1512

1613
use super::{
1714
checkpoint::get_latest_checkpoint_entry,
@@ -80,71 +77,41 @@ pub(crate) fn get_latest_l2_write_batch(
8077
})
8178
}
8279

83-
/// Deletes writer database entries and broadcast database entries associated with a specific
84-
/// checkpoint
85-
fn delete_l1_entries_for_checkpoint(
80+
/// Deletes ClientState entries from a given L1 block onwards
81+
fn delete_client_states_from(
8682
db: &impl DatabaseBackend,
87-
epoch: u64,
88-
checkpoint: &Checkpoint,
89-
) -> Result<(), DisplayedError> {
90-
let writer_db = db.writer_db();
91-
let broadcast_db = db.broadcast_db();
92-
// Compute the checkpoint hash (same way as in complete_checkpoint_signature)
93-
let checkpoint_hash = checkpoint.hash();
94-
95-
// Find the intent entry by this hash
96-
if let Some(intent_entry) = writer_db
97-
.get_intent_by_id(checkpoint_hash)
98-
.internal_error("Failed to get intent entry")?
99-
{
100-
// Delete based on status
101-
match intent_entry.status {
102-
IntentStatus::Bundled(bundle_idx) => {
103-
// Get the payload entry to find commit and reveal txids
104-
if let Some(payload_entry) = writer_db
105-
.get_payload_entry_by_idx(bundle_idx)
106-
.internal_error("Failed to get payload entry")?
107-
{
108-
// Delete commit transaction entry from broadcast DB
109-
if broadcast_db.del_tx_entry(payload_entry.commit_txid).is_ok() {
110-
println!("Deleted commit tx entry for checkpoint epoch {}", epoch);
111-
}
112-
113-
// Delete reveal transaction entry from broadcast DB
114-
if broadcast_db.del_tx_entry(payload_entry.reveal_txid).is_ok() {
115-
println!("Deleted reveal tx entry for checkpoint epoch {}", epoch);
116-
}
117-
}
118-
119-
// Delete the bundled payload entry from writer DB
120-
writer_db
121-
.del_payload_entry(bundle_idx)
122-
.internal_error("Failed to delete payload entry")?;
123-
// Delete the intent entry from writer DB
124-
writer_db
125-
.del_intent_entry(checkpoint_hash)
126-
.internal_error("Failed to delete intent entry")?;
127-
println!(
128-
"Deleted bundled L1 entries for checkpoint epoch {} (bundle_idx: {})",
129-
epoch, bundle_idx
130-
);
131-
}
132-
IntentStatus::Unbundled => {
133-
// Just delete the intent entry from writer DB
134-
writer_db
135-
.del_intent_entry(checkpoint_hash)
136-
.internal_error("Failed to delete intent entry")?;
137-
println!(
138-
"Deleted unbundled L1 writer entry for checkpoint epoch {}",
139-
epoch
140-
);
141-
}
83+
from_l1_block: L1BlockCommitment,
84+
) -> Result<usize, DisplayedError> {
85+
let client_state_db = db.client_state_db();
86+
let mut deleted_count = 0;
87+
let mut current_block = from_l1_block;
88+
89+
// Get all ClientState updates from the specified L1 block onwards
90+
// We fetch in batches to avoid loading too many at oncloadinge
91+
const BATCH_SIZE: usize = 100;
92+
loop {
93+
let updates = client_state_db
94+
.get_client_updates_from(current_block, BATCH_SIZE)
95+
.internal_error("Failed to get client state updates")?;
96+
97+
let batch_size = updates.len();
98+
99+
// Delete each ClientState update and track the last one for next iteration
100+
for (l1_block, _) in updates {
101+
client_state_db
102+
.del_client_update(l1_block)
103+
.internal_error("Failed to delete client state update")?;
104+
deleted_count += 1;
105+
current_block = l1_block;
106+
}
107+
108+
// If we got fewer than BATCH_SIZE, we've reached the end
109+
if batch_size < BATCH_SIZE {
110+
break;
142111
}
143-
} else {
144-
println!("No L1 writer entry found for checkpoint epoch {}", epoch);
145112
}
146113

147-
Ok(())
114+
Ok(deleted_count)
148115
}
149116

150117
/// Get chainstate at specified block.
@@ -204,9 +171,9 @@ pub(crate) fn revert_chainstate(
204171
let latest_slot = get_highest_l2_slot(db)?;
205172

206173
// Get latest write batch to check finalized epoch constraints
207-
let write_batch = get_latest_l2_write_batch(db)?;
208-
let top_level_state = write_batch.new_toplevel_state();
209-
let finalized_slot = top_level_state.finalized_epoch().last_slot();
174+
let latest_write_batch = get_latest_l2_write_batch(db)?;
175+
let latest_top_level_state = latest_write_batch.new_toplevel_state();
176+
let finalized_slot = latest_top_level_state.finalized_epoch().last_slot();
210177

211178
if target_slot < finalized_slot {
212179
return Err(DisplayedError::UserError(
@@ -231,10 +198,25 @@ pub(crate) fn revert_chainstate(
231198
));
232199
}
233200

201+
// Get the target block's write batch to find the L1 safe block at that point
202+
let target_write_batch = get_l2_write_batch(db, target_block_id)?.ok_or_else(|| {
203+
DisplayedError::UserError(
204+
"Target L2 write batch not found".to_string(),
205+
Box::new(target_block_id),
206+
)
207+
})?;
208+
let target_top_level_state = target_write_batch.new_toplevel_state();
209+
let target_l1_safe_block = target_top_level_state.l1_view().get_safe_block();
210+
234211
println!("Chainstate latest slot {latest_slot}");
235212
println!("Chainstate finalized slot {finalized_slot}");
236213
println!("Latest checkpointed slot {checkpoint_last_slot}");
237214
println!("Revert chainstate target slot {target_slot}");
215+
println!(
216+
"Target L1 safe block {}@{}",
217+
target_l1_safe_block.height_u64(),
218+
target_l1_safe_block.blkid()
219+
);
238220

239221
// Now delete write batches and optionally blocks
240222
for slot in target_slot + 1..=latest_slot {
@@ -282,49 +264,57 @@ pub(crate) fn revert_chainstate(
282264
}
283265
}
284266

285-
let target_epoch = top_level_state.cur_epoch();
267+
let target_epoch = target_top_level_state.cur_epoch();
286268
let latest_checkpoint_epoch = latest_checkpoint_entry.checkpoint.batch_info().epoch;
269+
println!(
270+
"target epoch = {:?}, latest_checkpoint_epoch = {:?}",
271+
target_epoch, latest_checkpoint_epoch
272+
);
287273
if target_epoch <= latest_checkpoint_epoch {
288-
// Clean up checkpoints and related data from target epoch onwards
274+
// Delete ClientState entries AFTER the target L1 safe block
275+
let next_l1_height = target_l1_safe_block.height_u64() + 1;
276+
let next_l1_block = L1BlockCommitment::from_height_u64(next_l1_height, Default::default())
277+
.ok_or_else(|| {
278+
DisplayedError::InternalError(
279+
"Failed to create next L1 block commitment".to_string(),
280+
Box::new(next_l1_height),
281+
)
282+
})?;
283+
289284
println!(
290-
"Revert chainstate cleaning up checkpoints and L1 entries from epoch {target_epoch}"
285+
"Revert chainstate deleting ClientState entries from L1 height {} onwards",
286+
next_l1_height
291287
);
292-
293-
// First, clean up L1 entries (writer and broadcast) for checkpoints that will be deleted
294-
// We need to do this before deleting checkpoints since we need the checkpoint data
295-
let mut deleted_l1_entries = 0;
296-
for epoch in target_epoch..=latest_checkpoint_epoch {
297-
if let Some(checkpoint_entry) = db
298-
.checkpoint_db()
299-
.get_checkpoint(epoch)
300-
.internal_error("Failed to get checkpoint")?
301-
{
302-
// Get the actual checkpoint data
303-
let checkpoint = checkpoint_entry.checkpoint;
304-
305-
// Delete associated L1 entries
306-
if let Err(e) = delete_l1_entries_for_checkpoint(db, epoch, &checkpoint) {
307-
println!(
308-
"Warning: Failed to delete L1 entries for checkpoint epoch {}: {}",
309-
epoch, e
310-
);
311-
} else {
312-
deleted_l1_entries += 1;
313-
}
288+
match delete_client_states_from(db, next_l1_block) {
289+
Ok(count) => {
290+
println!("Deleted {} ClientState entries", count);
291+
}
292+
Err(e) => {
293+
println!("Warning: Failed to delete ClientState entries: {}", e);
314294
}
315295
}
316296

317-
println!("Deleted L1 entries for {deleted_l1_entries} checkpoints");
297+
// Clean up checkpoints and related data AFTER target epoch
298+
// We keep the target epoch's data intact since we're reverting TO that epoch, not FROM it
299+
let first_epoch_to_delete = target_epoch + 1;
300+
println!(
301+
"Revert chainstate cleaning up checkpoints from epoch {first_epoch_to_delete} onwards"
302+
);
303+
304+
// Note: We intentionally do NOT delete L1 related stuff ( writer entries such as
305+
// intent/payload, broadcast entries or ASM related stuff). Reason is twofold:
306+
// 1. These L1 entries don't affect L2 chain state correctness after a revert.
307+
// 2. The L1 transactions may already be on Bitcoin, so keeping the records is appropriate.
318308

319-
// Now use bulk deletion methods for efficiency
309+
// Use bulk deletion methods for efficiency
320310
let deleted_checkpoints = db
321311
.checkpoint_db()
322-
.del_checkpoints_from_epoch(target_epoch)
312+
.del_checkpoints_from_epoch(first_epoch_to_delete)
323313
.internal_error("Failed to delete checkpoints")?;
324314

325315
let deleted_summaries = db
326316
.checkpoint_db()
327-
.del_epoch_summaries_from_epoch(target_epoch)
317+
.del_epoch_summaries_from_epoch(first_epoch_to_delete)
328318
.internal_error("Failed to delete epoch summaries")?;
329319

330320
println!("Deleted checkpoints at epochs: {:?}", deleted_checkpoints);

crates/asm/subprotocols/checkpoint-v0/src/verification.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ fn verify_checkpoint_proof(
9393
);
9494

9595
if is_empty_proof {
96-
if allow_empty && cfg!(feature = "debug-utils") {
96+
if allow_empty {
9797
logging::warn!(
9898
epoch = checkpoint.batch_info().epoch(),
9999
"Accepting empty checkpoint proof"

crates/asm/worker/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,4 @@ async-trait.workspace = true
2121
bitcoin.workspace = true
2222
serde.workspace = true
2323
thiserror.workspace = true
24-
tokio.workspace = true
2524
tracing.workspace = true

0 commit comments

Comments
 (0)