diff --git a/crates/dt-cli/src/export.rs b/crates/dt-cli/src/export.rs index ecb2146..44ceda8 100644 --- a/crates/dt-cli/src/export.rs +++ b/crates/dt-cli/src/export.rs @@ -11,7 +11,7 @@ use std::cmp::Ordering; /// Write a second export script which outputs the data to some dt-json style format (making this a /// non-issue). Or just add these fields in and demand people ignore them. -use std::collections::HashMap; +use std::collections::{BinaryHeap, HashMap}; use std::ffi::OsString; use std::fs::File; use std::io::{BufRead, BufReader}; @@ -260,21 +260,36 @@ fn safe_assignments_needed_for_agent(oplog: &ListOpLog, agent: AgentId) -> (RleV let mut last_lv = vec![]; let mut map = RleVec::new(); - for (seq, lv, len) in oplog.cg.agent_assignment.iter_lv_map_for_agent(agent) { - // Find the first item in last_lv which is strictly before lv. - let slot = if let Some(slot) = last_lv.iter().position(|other_lv| { - oplog.cg.graph.version_cmp(*other_lv, lv) == Some(Ordering::Less) - }) { - last_lv[slot] = lv + len - 1; - slot - } else { - let slot = last_lv.len(); - last_lv.push(lv + len - 1); - slot - }; + for (_seq, outer_lv, outer_len) in oplog.cg.agent_assignment.iter_lv_map_for_agent(agent) { + // There's a potential bug with just using (seq,lv,len) as-is: The chunk described from + // lv..lv+len may contain a series of individual graph entries, which each need to be + // assigned to a different agent. + // + // So we'll split up the span by iterating through the graph... + for entry in oplog.cg.iter_range((outer_lv..outer_lv + outer_len).into()) { + let lv = entry.start; + // let len = entry.len(); + // let seq = entry.span.seq_range.start; + assert_eq!(entry.span.agent, agent); + + let last_lv_in_span = lv + entry.len() - 1; + + // Find the first item in last_lv which is strictly before lv. + let slot = if let Some(slot) = last_lv.iter().position(|other_lv| { + oplog.cg.graph.version_cmp(*other_lv, lv) == Some(Ordering::Less) + }) { + last_lv[slot] = last_lv_in_span; + slot + } else { + let slot = last_lv.len(); + last_lv.push(last_lv_in_span); + slot + }; + + // map.push(KVPair(lv, RleRun::new(slot, len))); + map.push(KVPair(entry.span.seq_range.start, RleRun::new(slot, entry.len()))); + } - // map.push(KVPair(lv, RleRun::new(slot, len))); - map.push(KVPair(seq, RleRun::new(slot, len))); } // dbg!(map); @@ -284,6 +299,47 @@ fn safe_assignments_needed_for_agent(oplog: &ListOpLog, agent: AgentId) -> (RleV } +fn check_history(num_agents: usize, txns: &[TraceExportTxn]) { + // Each entry in the history must come causally after all other entries with the same agent. + // Let's check thats actually true! + + let mut last_idx_for_agent = vec![usize::MAX; num_agents]; + for (i, e) in txns.iter().enumerate() { + let agent = e.agent; + let prev = last_idx_for_agent[agent]; + + if prev != usize::MAX { + // Check that prev comes causally before i. The first item with the same agent that + // we run into in the BFS expansion must be prev. + let mut queue = BinaryHeap::new(); + for parent in e.parents.iter() { + queue.push(*parent); + } + + while let Some(p_i) = queue.pop() { + let p_e = &txns[p_i]; + + while let Some(peek_i) = queue.peek() { // Handle graph merging. + if *peek_i != p_i { break; } + queue.pop(); + } + + if p_e.agent == agent { + assert_eq!(p_i, prev, "Nonlinear edits from agent {agent}: {i} should come after {prev} but instead we found {p_i} / {:?} || {:?}", e._dt_span, p_e._dt_span); + break; + } + + for parent in p_e.parents.iter() { + queue.push(*parent); + } + } + + } + + last_idx_for_agent[agent] = i; + } +} + pub fn export_trace_to_json(oplog: &ListOpLog, timestamp_filename: Option, shatter: bool) -> TraceExportData { let timestamps = timestamp_filename.map(Timestamps::from_file); @@ -323,7 +379,8 @@ pub fn export_trace_to_json(oplog: &ListOpLog, timestamp_filename: Option