From 5374663a56742518083bd38e5dddc5cb8e1f2116 Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 19 Dec 2024 10:30:16 +1100 Subject: [PATCH 1/8] um --- rust/examples/conformance_writer.rs | 69 ++++----- rust/src/lib.rs | 2 + rust/src/read.rs | 2 + rust/src/sans_io/read.rs | 4 +- rust/src/write.rs | 221 +++++++++++++-------------- rust/tests/handles_time0_messages.rs | 11 +- rust/tests/message.rs | 4 + 7 files changed, 152 insertions(+), 161 deletions(-) diff --git a/rust/examples/conformance_writer.rs b/rust/examples/conformance_writer.rs index 9716f64dd5..3c5bac5e7c 100644 --- a/rust/examples/conformance_writer.rs +++ b/rust/examples/conformance_writer.rs @@ -1,20 +1,22 @@ -use std::{borrow::Cow, collections::HashMap, env, io::Write, sync::Arc}; +use std::{ + borrow::Cow, + collections::{BTreeMap, HashMap}, + env, + io::Write, +}; #[path = "common/conformance_writer_spec.rs"] mod conformance_writer_spec; fn write_file(spec: &conformance_writer_spec::WriterSpec) { - let mut tmp = tempfile::NamedTempFile::new().expect("Couldn't open file"); - let tmp_path = tmp.path().to_owned(); - let out_buffer = std::io::BufWriter::new(&mut tmp); let mut writer = mcap::WriteOptions::new() .compression(None) .profile("") .create(out_buffer) .expect("Couldn't create writer"); - let mut channels = HashMap::::new(); - let mut schemas = HashMap::::new(); + let mut channel_ids = HashMap::new(); + let mut schema_ids = HashMap::new(); for record in &spec.records { match record.record_type.as_str() { @@ -38,17 +40,10 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) { let schema_id = record.get_field_u64("schema_id"); let topic = record.get_field_str("topic"); let message_encoding = record.get_field_str("message_encoding"); - let schema = schemas.get(&schema_id).expect("Missing schema"); - let channel = mcap::Channel { - schema: Some(Arc::new(schema.to_owned())), - topic: topic.to_string(), - message_encoding: message_encoding.to_string(), - metadata: std::collections::BTreeMap::new(), - }; - writer - .add_channel(&channel) + let returned_id = writer + .add_channel(schema_id as u16, topic, message_encoding, &BTreeMap::new()) .expect("Couldn't write channel"); - channels.insert(id, channel); + channel_ids.insert(returned_id, id); } "ChunkIndex" => { // written automatically @@ -82,15 +77,23 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) { } "Message" => { let channel_id = record.get_field_u16("channel_id"); - let channel = channels.get(&channel_id).expect("Unknown channel"); - let message = mcap::Message { - channel: Arc::new(channel.to_owned()), - data: Cow::from(record.get_field_data("data")), - log_time: record.get_field_u64("log_time"), - publish_time: record.get_field_u64("publish_time"), - sequence: record.get_field_u32("sequence"), - }; - writer.write(&message).expect("Write message failed"); + let data = record.get_field_data("data"); + let log_time = record.get_field_u64("log_time"); + let publish_time = record.get_field_u64("publish_time"); + let sequence = record.get_field_u32("sequence"); + writer + .write_to_known_channel( + &mcap::records::MessageHeader { + channel_id: *channel_ids + .get(&channel_id) + .expect("message on unexpected channel ID"), + log_time, + publish_time, + sequence, + }, + &data, + ) + .expect("Write message failed"); } "Metadata" => { let name = record.get_field_str("name"); @@ -104,14 +107,12 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) { "Schema" => { let name = record.get_field_str("name"); let encoding = record.get_field_str("encoding"); - let id = record.get_field_u64("id"); - let data: Vec = record.get_field_data(&"data"); - let schema = mcap::Schema { - name: name.to_owned(), - encoding: encoding.to_owned(), - data: Cow::from(data), - }; - schemas.insert(id, schema); + let id = record.get_field_u16("id"); + let data: Vec = record.get_field_data("data"); + let returned_id = writer + .add_schema(name, encoding, &data) + .expect("cannot write schema"); + schema_ids.insert(returned_id, id); } "Statistics" => { // written automatically @@ -128,7 +129,7 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) { let contents = std::fs::read(tmp_path).expect("Couldn't read output"); std::io::stdout() - .write(&contents) + .write_all(&contents) .expect("Couldn't write output"); } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 5a6207865d..94ba78e357 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -169,6 +169,7 @@ pub enum Compression { /// or hold its own buffer if it was decompressed from a chunk. #[derive(Clone, PartialEq, Eq, Hash)] pub struct Schema<'a> { + pub id: u16, pub name: String, pub encoding: String, pub data: Cow<'a, [u8]>, @@ -186,6 +187,7 @@ impl fmt::Debug for Schema<'_> { /// Describes a channel which [Message]s are published to in an MCAP file #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Channel<'a> { + pub id: u16, pub topic: String, pub schema: Option>>, diff --git a/rust/src/read.rs b/rust/src/read.rs index a5496cf47a..f5472a318d 100644 --- a/rust/src/read.rs +++ b/rust/src/read.rs @@ -299,6 +299,7 @@ impl<'a> ChannelAccumulator<'a> { } let schema = Arc::new(Schema { + id: header.id, name: header.name.clone(), encoding: header.encoding, data, @@ -329,6 +330,7 @@ impl<'a> ChannelAccumulator<'a> { }; let channel = Arc::new(Channel { + id: chan.id, topic: chan.topic.clone(), schema, message_encoding: chan.message_encoding, diff --git a/rust/src/sans_io/read.rs b/rust/src/sans_io/read.rs index cb5f04c09a..a36061feed 100644 --- a/rust/src/sans_io/read.rs +++ b/rust/src/sans_io/read.rs @@ -743,12 +743,12 @@ mod tests { .chunk_size(None) .create(&mut buf)?; let channel = std::sync::Arc::new(crate::Channel { + id: 0, topic: "chat".to_owned(), schema: None, message_encoding: "json".to_owned(), metadata: BTreeMap::new(), }); - writer.add_channel(&channel)?; for n in 0..3 { writer.write(&crate::Message { channel: channel.clone(), @@ -774,12 +774,12 @@ mod tests { .use_chunks(false) .create(&mut buf)?; let channel = std::sync::Arc::new(crate::Channel { + id: 0, topic: "chat".to_owned(), schema: None, message_encoding: "json".to_owned(), metadata: BTreeMap::new(), }); - writer.add_channel(&channel)?; writer.write(&crate::Message { channel, sequence: 0, diff --git a/rust/src/write.rs b/rust/src/write.rs index 386858f266..e614bad5c4 100644 --- a/rust/src/write.rs +++ b/rust/src/write.rs @@ -2,7 +2,7 @@ use std::{ borrow::Cow, - collections::{BTreeMap, HashMap}, + collections::BTreeMap, io::{self, prelude::*, Cursor, SeekFrom}, mem::size_of, }; @@ -14,7 +14,7 @@ use crate::{ chunk_sink::{ChunkMode, ChunkSink}, io_utils::CountingCrcWriter, records::{self, op, AttachmentHeader, AttachmentIndex, MessageHeader, Record}, - Attachment, Channel, Compression, McapError, McapResult, Message, Schema, MAGIC, + Attachment, Compression, McapError, McapResult, Message, Schema, MAGIC, }; // re-export to help with linear writing @@ -187,8 +187,8 @@ impl WriteOptions { pub struct Writer<'a, W: Write + Seek> { writer: Option>, options: WriteOptions, - schemas: HashMap, u16>, - channels: HashMap, u16>, + schemas: Vec>, + channels: Vec, chunk_indexes: Vec, attachment_indexes: Vec, metadata_indexes: Vec, @@ -233,8 +233,8 @@ impl<'a, W: Write + Seek> Writer<'a, W> { Ok(Self { writer: Some(WriteMode::Raw(writer, chunk_mode)), options: opts, - schemas: HashMap::new(), - channels: HashMap::new(), + schemas: Vec::new(), + channels: Vec::new(), chunk_indexes: Vec::new(), attachment_indexes: Vec::new(), metadata_indexes: Vec::new(), @@ -243,75 +243,102 @@ impl<'a, W: Write + Seek> Writer<'a, W> { }) } - /// Adds a channel (and its provided schema, if any), returning its ID. - /// - /// Useful with subequent calls to [`write_to_known_channel()`](Self::write_to_known_channel) - pub fn add_channel(&mut self, chan: &Channel<'a>) -> McapResult { - let schema_id = match &chan.schema { - Some(s) => self.add_schema(s)?, - None => 0, - }; - - if let Some(id) = self.channels.get(chan) { - return Ok(*id); + // adds a schema, returning its ID. If a schema with the same content has been added already, + // its ID is returned. + pub fn add_schema(&mut self, name: &str, encoding: &str, data: &[u8]) -> McapResult { + let mut max_schema_id: u16 = 0; + for schema in &self.schemas { + max_schema_id = std::cmp::max(schema.id, max_schema_id); + if schema.name == name && schema.encoding == encoding && schema.data == data { + return Ok(schema.id); + } } + let next_schema_id = max_schema_id + 1; - let next_channel_id = self.channels.len() as u16; - assert!(self - .channels - .insert(chan.clone(), next_channel_id) - .is_none()); if self.options.use_chunks { - self.chunkin_time()? - .write_channel(next_channel_id, schema_id, chan)?; + let schema = Schema { + id: next_schema_id, + name: name.into(), + encoding: encoding.into(), + data: data.into(), + }; + self.chunkin_time()?.write_schema(schema)?; } else { + let header = records::SchemaHeader { + id: next_schema_id, + name: name.into(), + encoding: encoding.into(), + }; write_record( - self.finish_chunk()?, - &Record::Channel(records::Channel { - id: next_channel_id, - schema_id, - topic: chan.topic.clone(), - message_encoding: chan.message_encoding.clone(), - metadata: chan.metadata.clone(), - }), + &mut self.finish_chunk()?, + &Record::Schema { + header, + data: Cow::Borrowed(data), + }, )?; } - Ok(next_channel_id) + Ok(next_schema_id) } - fn add_schema(&mut self, schema: &Schema<'a>) -> McapResult { - if let Some(id) = self.schemas.get(schema) { - return Ok(*id); + /// Adds a channel, returning its ID. If a channel with equivalent content was added previously, + /// its ID i sreturned. + /// + /// Provide a schema_id returned from [`Self::add_schema`], or 0 if the channel is schemaless. + /// + /// Useful with subequent calls to [`write_to_known_channel()`](Self::write_to_known_channel) + pub fn add_channel( + &mut self, + schema_id: u16, + topic: &str, + message_encoding: &str, + metadata: &BTreeMap, + ) -> McapResult { + let mut max_channel_id: u16 = 0; + for channel in &self.channels { + max_channel_id = std::cmp::max(channel.id, max_channel_id); + if channel.schema_id == schema_id + && channel.topic == topic + && channel.message_encoding == message_encoding + && channel.metadata == *metadata + { + return Ok(channel.id); + } + } + let next_channel_id = max_channel_id + 1; + if schema_id != 0 && !self.schemas.iter().any(|schema| schema.id == schema_id) { + return Err(McapError::UnknownSchema(topic.into(), schema_id)); } - // Schema IDs cannot be zero, that's the sentinel value in a channel - // for "no schema" - let next_schema_id = self.schemas.len() as u16 + 1; - assert!(self - .schemas - .insert(schema.clone(), next_schema_id) - .is_none()); + let rec = records::Channel { + id: next_channel_id, + schema_id, + topic: topic.into(), + message_encoding: message_encoding.into(), + metadata: metadata.clone(), + }; + if self.options.use_chunks { - self.chunkin_time()?.write_schema(next_schema_id, schema)?; + self.chunkin_time()?.write_channel(rec)? } else { - write_record( - self.finish_chunk()?, - &Record::Schema { - header: records::SchemaHeader { - id: next_schema_id, - name: schema.name.clone(), - encoding: schema.encoding.clone(), - }, - data: Cow::Borrowed(&schema.data), - }, - )?; + write_record(self.finish_chunk()?, &Record::Channel(rec))?; } - Ok(next_schema_id) + Ok(next_channel_id) } /// Write the given message (and its provided channel, if needed). + /// The provided channel ID and schema ID are ignored and new ones will be assigned in the + /// resulting MCAP records. pub fn write(&mut self, message: &Message<'a>) -> McapResult<()> { - let channel_id = self.add_channel(&message.channel)?; + let schema_id: u16 = match message.channel.schema.as_ref() { + None => 0, + Some(schema) => self.add_schema(&schema.name, &schema.encoding, &schema.data)?, + }; + let channel_id = self.add_channel( + schema_id, + &message.channel.topic, + &message.channel.message_encoding, + &message.channel.metadata, + )?; let header = MessageHeader { channel_id, sequence: message.sequence, @@ -332,7 +359,11 @@ impl<'a, W: Write + Seek> Writer<'a, W> { ) -> McapResult<()> { // The number of channels should be relatively small, // do a quick linear search to make sure we're not being given a bogus ID - if !self.channels.values().any(|id| *id == header.channel_id) { + if !self + .channels + .iter() + .any(|channel| channel.id == header.channel_id) + { return Err(McapError::UnknownChannel( header.sequence, header.channel_id, @@ -661,34 +692,9 @@ impl<'a, W: Write + Seek> Writer<'a, W> { let mut metadata_indexes = Vec::new(); std::mem::swap(&mut metadata_indexes, &mut self.metadata_indexes); - // Make some Schema and Channel lists for the summary section. - // Be sure to grab schema IDs for the channels from the schema hash map before we drain it! - struct ChannelSummary<'a> { - channel: Channel<'a>, - channel_id: u16, - schema_id: u16, - } + self.channels.sort_by_key(|c| c.id); - let mut all_channels: Vec> = self - .channels - .drain() - .map(|(channel, channel_id)| { - let schema_id = match &channel.schema { - Some(s) => *self.schemas.get(s).unwrap(), - None => 0, - }; - - ChannelSummary { - channel, - channel_id, - schema_id, - } - }) - .collect(); - all_channels.sort_unstable_by_key(|cs| cs.channel_id); - - let mut all_schemas: Vec<(Schema<'_>, u16)> = self.schemas.drain().collect(); - all_schemas.sort_unstable_by_key(|(_, v)| *v); + self.schemas.sort_by_key(|s| s.id); let mut offsets = Vec::new(); @@ -703,13 +709,13 @@ impl<'a, W: Write + Seek> Writer<'a, W> { // Write all schemas. let schemas_start = summary_start; - for (schema, id) in all_schemas { + for schema in self.schemas.iter() { let header = records::SchemaHeader { - id, - name: schema.name, - encoding: schema.encoding, + id: schema.id, + name: schema.name.clone(), + encoding: schema.encoding.clone(), }; - let data = schema.data; + let data = schema.data.clone(); write_record(&mut ccw, &Record::Schema { header, data })?; } @@ -724,15 +730,8 @@ impl<'a, W: Write + Seek> Writer<'a, W> { // Write all channels. let channels_start = schemas_end; - for cs in all_channels { - let rec = records::Channel { - id: cs.channel_id, - schema_id: cs.schema_id, - topic: cs.channel.topic, - message_encoding: cs.channel.message_encoding, - metadata: cs.channel.metadata, - }; - write_record(&mut ccw, &Record::Channel(rec))?; + for channel in self.channels.iter() { + write_record(&mut ccw, &Record::Channel(channel.clone()))?; } let channels_end = posit(&mut ccw)?; if channels_end - channels_start > 0 { @@ -960,34 +959,24 @@ impl ChunkWriter { }) } - fn write_schema(&mut self, id: u16, schema: &Schema) -> McapResult<()> { + fn write_schema(&mut self, schema: Schema) -> McapResult<()> { let header = records::SchemaHeader { - id, - name: schema.name.clone(), - encoding: schema.encoding.clone(), + id: schema.id, + name: schema.name, + encoding: schema.encoding, }; write_record( &mut self.compressor, &Record::Schema { header, - data: Cow::Borrowed(&schema.data), + data: schema.data, }, )?; Ok(()) } - fn write_channel(&mut self, id: u16, schema_id: u16, chan: &Channel) -> McapResult<()> { - assert_eq!(schema_id == 0, chan.schema.is_none()); - - let rec = records::Channel { - id, - schema_id, - topic: chan.topic.clone(), - message_encoding: chan.message_encoding.clone(), - metadata: chan.metadata.clone(), - }; - - write_record(&mut self.compressor, &Record::Channel(rec))?; + fn write_channel(&mut self, chan: records::Channel) -> McapResult<()> { + write_record(&mut self.compressor, &Record::Channel(chan))?; Ok(()) } diff --git a/rust/tests/handles_time0_messages.rs b/rust/tests/handles_time0_messages.rs index ce18b62142..3b2514bdc8 100644 --- a/rust/tests/handles_time0_messages.rs +++ b/rust/tests/handles_time0_messages.rs @@ -1,4 +1,4 @@ -use std::io::Cursor; +use std::{collections::BTreeMap, io::Cursor}; use anyhow::Result; @@ -9,14 +9,7 @@ fn handles_time0_messages() -> Result<()> { let mut buf = Vec::new(); let mut out = mcap::Writer::new(Cursor::new(&mut buf))?; - let my_channel = mcap::Channel { - topic: String::from("time"), - message_encoding: String::from("text/plain"), - metadata: Default::default(), - schema: None, - }; - - let channel_id = out.add_channel(&my_channel)?; + let channel_id = out.add_channel(0, "time", "text/plain", &BTreeMap::new())?; out.write_to_known_channel( &mcap::records::MessageHeader { diff --git a/rust/tests/message.rs b/rust/tests/message.rs index eb92a2b61c..6354604ecc 100644 --- a/rust/tests/message.rs +++ b/rust/tests/message.rs @@ -17,7 +17,9 @@ fn smoke() -> Result<()> { let expected = mcap::Message { channel: Arc::new(mcap::Channel { + id: 0, schema: Some(Arc::new(mcap::Schema { + id: 1, name: String::from("Example"), encoding: String::from("c"), data: Cow::Borrowed(&[4, 5, 6]), @@ -65,12 +67,14 @@ fn run_round_trip(use_chunks: bool) -> Result<()> { let summary = mcap::Summary::read(&ours)?.unwrap(); let schema = Arc::new(mcap::Schema { + id: 1, name: String::from("Example"), encoding: String::from("c"), data: Cow::Borrowed(&[4, 5, 6]), }); let channel = Arc::new(mcap::Channel { + id: 0, schema: Some(schema.clone()), topic: String::from("example"), message_encoding: String::from("a"), From d9fd0e3114ac848a10ee4a6e3831f2bba07f4775 Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 19 Dec 2024 10:37:01 +1100 Subject: [PATCH 2/8] clippied out --- .../common/conformance_writer_spec.rs | 48 ++++++++----------- rust/examples/conformance_writer.rs | 13 ++--- 2 files changed, 24 insertions(+), 37 deletions(-) diff --git a/rust/examples/common/conformance_writer_spec.rs b/rust/examples/common/conformance_writer_spec.rs index 1d03309f52..999ea5cf44 100644 --- a/rust/examples/common/conformance_writer_spec.rs +++ b/rust/examples/common/conformance_writer_spec.rs @@ -20,28 +20,26 @@ pub struct Record { } impl Record { - pub fn get_field(self: &Self, name: &str) -> &Value { - return &self + pub fn get_field(&self, name: &str) -> &Value { + &self .fields .iter() .find(|f| f.0 == name) .unwrap_or_else(|| panic!("Invalid: {}", name)) - .1; + .1 } - pub fn get_field_data(self: &Self, name: &str) -> Vec { - let data: Vec = self - .get_field(name) + pub fn get_field_data(&self, name: &str) -> Vec { + self.get_field(name) .as_array() .unwrap_or_else(|| panic!("Invalid: {}", name)) - .into_iter() + .iter() .filter_map(|v| v.as_u64()) .filter_map(|n| u8::try_from(n).ok()) - .collect(); - return data; + .collect() } - pub fn get_field_meta(self: &Self, name: &str) -> BTreeMap { + pub fn get_field_meta(&self, name: &str) -> BTreeMap { let data = self .get_field(name) .as_object() @@ -50,38 +48,34 @@ impl Record { for (key, value) in data.iter() { result.insert(key.to_string(), value.as_str().unwrap().to_string()); } - return result; + result } - pub fn get_field_str(self: &Self, name: &str) -> &str { - return self - .get_field(name) + pub fn get_field_str(&self, name: &str) -> &str { + self.get_field(name) .as_str() - .unwrap_or_else(|| panic!("Invalid: {}", name)); + .unwrap_or_else(|| panic!("Invalid: {}", name)) } - pub fn get_field_u16(self: &Self, name: &str) -> u16 { - return self - .get_field(name) + pub fn get_field_u16(&self, name: &str) -> u16 { + self.get_field(name) .as_str() .and_then(|s| s.parse::().ok()) - .unwrap_or_else(|| panic!("Invalid: {}", name)); + .unwrap_or_else(|| panic!("Invalid: {}", name)) } - pub fn get_field_u32(self: &Self, name: &str) -> u32 { - return self - .get_field(name) + pub fn get_field_u32(&self, name: &str) -> u32 { + self.get_field(name) .as_str() .and_then(|s| s.parse::().ok()) - .unwrap_or_else(|| panic!("Invalid: {}", name)); + .unwrap_or_else(|| panic!("Invalid: {}", name)) } - pub fn get_field_u64(self: &Self, name: &str) -> u64 { - return self - .get_field(name) + pub fn get_field_u64(&self, name: &str) -> u64 { + self.get_field(name) .as_str() .and_then(|s| s.parse::().ok()) - .unwrap_or_else(|| panic!("Invalid: {}", name)); + .unwrap_or_else(|| panic!("Invalid: {}", name)) } } diff --git a/rust/examples/conformance_writer.rs b/rust/examples/conformance_writer.rs index 3c5bac5e7c..37d94a8400 100644 --- a/rust/examples/conformance_writer.rs +++ b/rust/examples/conformance_writer.rs @@ -2,7 +2,6 @@ use std::{ borrow::Cow, collections::{BTreeMap, HashMap}, env, - io::Write, }; #[path = "common/conformance_writer_spec.rs"] @@ -12,7 +11,8 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) { let mut writer = mcap::WriteOptions::new() .compression(None) .profile("") - .create(out_buffer) + .disable_seeking(true) + .create(binrw::io::NoSeek::new(std::io::stdout())) .expect("Couldn't create writer"); let mut channel_ids = HashMap::new(); @@ -50,9 +50,7 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) { } "DataEnd" => { let data_section_crc = record.get_field_u32("data_section_crc"); - let _data_end = mcap::records::DataEnd { - data_section_crc: data_section_crc, - }; + let _data_end = mcap::records::DataEnd { data_section_crc }; // write data end } "Footer" => { @@ -126,11 +124,6 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) { } writer.finish().expect("Couldn't finish"); - - let contents = std::fs::read(tmp_path).expect("Couldn't read output"); - std::io::stdout() - .write_all(&contents) - .expect("Couldn't write output"); } pub fn main() { From 68f6b7a760db6662bc232148efe002c0792fa366 Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 19 Dec 2024 11:53:58 +1100 Subject: [PATCH 3/8] most tests --- rust/Cargo.toml | 1 + rust/examples/conformance_writer.rs | 2 +- rust/src/sans_io/read.rs | 84 +++++++------- rust/src/write.rs | 167 +++++++++++++++++----------- 4 files changed, 150 insertions(+), 104 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 25c1ed61b0..bfd20926d4 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -25,6 +25,7 @@ thiserror = "1.0" lz4 = { version = "1.27", optional = true } tokio = { version = "1", features = ["io-util"] , optional = true } static_assertions = "1.1.0" +bimap = "0.6.3" [target.'cfg(target_arch = "wasm32")'.dependencies] zstd = { version = "0.11", features = ["wasm"], optional = true } diff --git a/rust/examples/conformance_writer.rs b/rust/examples/conformance_writer.rs index 37d94a8400..df0de0a1da 100644 --- a/rust/examples/conformance_writer.rs +++ b/rust/examples/conformance_writer.rs @@ -29,7 +29,7 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) { media_type: record.get_field_str("media_type").to_owned(), }; writer - .attach(&attachment) + .attach(&attachment) .expect("Couldn't write attachment"); } "AttachmentIndex" => { diff --git a/rust/src/sans_io/read.rs b/rust/src/sans_io/read.rs index a36061feed..4b585f5fa5 100644 --- a/rust/src/sans_io/read.rs +++ b/rust/src/sans_io/read.rs @@ -735,13 +735,14 @@ mod tests { use std::collections::BTreeMap; use std::io::Read; - fn basic_chunked_file(compression: Option) -> McapResult> { + fn basic_chunked_file(compression: Option) -> Vec { let mut buf = std::io::Cursor::new(Vec::new()); { let mut writer = crate::WriteOptions::new() .compression(compression) .chunk_size(None) - .create(&mut buf)?; + .create(&mut buf) + .expect("could not construct writer"); let channel = std::sync::Arc::new(crate::Channel { id: 0, topic: "chat".to_owned(), @@ -750,20 +751,22 @@ mod tests { metadata: BTreeMap::new(), }); for n in 0..3 { - writer.write(&crate::Message { - channel: channel.clone(), - sequence: n, - log_time: n as u64, - publish_time: n as u64, - data: (&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]).into(), - })?; + writer + .write(&crate::Message { + channel: channel.clone(), + sequence: n, + log_time: n as u64, + publish_time: n as u64, + data: (&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]).into(), + }) + .expect("could not construct channel"); if n == 1 { - writer.flush()?; + writer.flush().expect("failed to flush"); } } - writer.finish()?; + writer.finish().expect("failed to finish"); } - Ok(buf.into_inner()) + buf.into_inner() } #[test] @@ -826,50 +829,50 @@ mod tests { } #[test] - fn test_file_data_validation() -> McapResult<()> { + fn test_file_data_validation() { let mut reader = LinearReader::new_with_options( LinearReaderOptions::default() .with_validate_data_section_crc(true) .with_validate_summary_section_crc(true), ); - let mut cursor = std::io::Cursor::new(basic_chunked_file(None)?); + let mut cursor = std::io::Cursor::new(basic_chunked_file(None)); let mut opcodes: Vec = Vec::new(); let mut iter_count = 0; while let Some(action) = reader.next_action() { - match action? { + match action.expect("failed to get next action") { ReadAction::NeedMore(n) => { - let written = cursor.read(reader.insert(n))?; + let written = cursor + .read(reader.insert(n)) + .expect("failed to read from buffer"); reader.set_written(written); } ReadAction::GetRecord { data, opcode } => { opcodes.push(opcode); - parse_record(opcode, data)?; + parse_record(opcode, data).expect("failed to parse record"); } } iter_count += 1; // guard against infinite loop assert!(iter_count < 10000); } - Ok(()) } - fn test_chunked( - compression: Option, - options: LinearReaderOptions, - ) -> McapResult<()> { + fn test_chunked(compression: Option, options: LinearReaderOptions) { let mut reader = LinearReader::new_with_options(options); - let mut cursor = std::io::Cursor::new(basic_chunked_file(compression)?); + let mut cursor = std::io::Cursor::new(basic_chunked_file(compression)); let mut opcodes: Vec = Vec::new(); let mut iter_count = 0; while let Some(action) = reader.next_action() { - match action? { + match action.expect("failed to get next action") { ReadAction::NeedMore(n) => { - let written = cursor.read(reader.insert(n))?; + let written = cursor + .read(reader.insert(n)) + .expect("failed to read from buffer"); reader.set_written(written); } ReadAction::GetRecord { data, opcode } => { opcodes.push(opcode); - parse_record(opcode, data)?; + parse_record(opcode, data).expect("failed to parse record"); } } iter_count += 1; @@ -897,7 +900,6 @@ mod tests { op::FOOTER ] ); - Ok(()) } use paste::paste; @@ -906,7 +908,7 @@ mod tests { $( paste! { #[test] - fn [ ]() -> McapResult<()> { + fn [ ]() { test_chunked($compression, $options) } } @@ -928,12 +930,12 @@ mod tests { } #[test] - fn test_no_magic() -> McapResult<()> { + fn test_no_magic() { for options in [ LinearReaderOptions::default().with_skip_start_magic(true), LinearReaderOptions::default().with_skip_end_magic(true), ] { - let mcap = basic_chunked_file(None)?; + let mcap = basic_chunked_file(None); let input = if options.skip_start_magic { &mcap[8..] } else if options.skip_end_magic { @@ -946,14 +948,16 @@ mod tests { let mut opcodes: Vec = Vec::new(); let mut iter_count = 0; while let Some(action) = reader.next_action() { - match action? { + match action.expect("failed to get next action") { ReadAction::NeedMore(n) => { - let written = cursor.read(reader.insert(n))?; + let written = cursor + .read(reader.insert(n)) + .expect("failed to read from buffer"); reader.set_written(written); } ReadAction::GetRecord { data, opcode } => { opcodes.push(opcode); - parse_record(opcode, data)?; + parse_record(opcode, data).expect("failed to parse record"); } } iter_count += 1; @@ -982,26 +986,27 @@ mod tests { ] ); } - Ok(()) } #[test] - fn test_emit_chunks() -> McapResult<()> { - let mcap = basic_chunked_file(None)?; + fn test_emit_chunks() { + let mcap = basic_chunked_file(None); let mut reader = LinearReader::new_with_options(LinearReaderOptions::default().with_emit_chunks(true)); let mut cursor = std::io::Cursor::new(mcap); let mut opcodes: Vec = Vec::new(); let mut iter_count = 0; while let Some(action) = reader.next_action() { - match action? { + match action.expect("failed to get next action") { ReadAction::NeedMore(n) => { - let written = cursor.read(reader.insert(n))?; + let written = cursor + .read(reader.insert(n)) + .expect("failed to read from buffer"); reader.set_written(written); } ReadAction::GetRecord { data, opcode } => { opcodes.push(opcode); - parse_record(opcode, data)?; + parse_record(opcode, data).expect("failed to parse record"); } } iter_count += 1; @@ -1027,6 +1032,5 @@ mod tests { op::FOOTER ] ); - Ok(()) } } diff --git a/rust/src/write.rs b/rust/src/write.rs index e614bad5c4..9643361dce 100644 --- a/rust/src/write.rs +++ b/rust/src/write.rs @@ -7,6 +7,7 @@ use std::{ mem::size_of, }; +use bimap::BiHashMap; use binrw::prelude::*; use byteorder::{WriteBytesExt, LE}; @@ -175,20 +176,35 @@ impl WriteOptions { } /// Creates a [`Writer`] whch writes to `w` using the given options - pub fn create<'a, W: Write + Seek>(self, w: W) -> McapResult> { + pub fn create<'a, W: Write + Seek>(self, w: W) -> McapResult> { Writer::with_options(w, self) } } +#[derive(Hash, PartialEq, Eq)] +struct ChannelContent<'a> { + topic: Cow<'a, str>, + schema_id: u16, + message_encoding: Cow<'a, str>, + metadata: Cow<'a, BTreeMap>, +} + +#[derive(Hash, PartialEq, Eq)] +struct SchemaContent<'a> { + name: Cow<'a, str>, + encoding: Cow<'a, str>, + data: Cow<'a, [u8]>, +} + /// Writes an MCAP file to the given [writer](Write). /// /// Users should call [`finish()`](Self::finish) to flush the stream /// and check for errors when done; otherwise the result will be unwrapped on drop. -pub struct Writer<'a, W: Write + Seek> { +pub struct Writer { writer: Option>, options: WriteOptions, - schemas: Vec>, - channels: Vec, + schemas: BiHashMap, u16>, + channels: BiHashMap, u16>, chunk_indexes: Vec, attachment_indexes: Vec, metadata_indexes: Vec, @@ -197,7 +213,7 @@ pub struct Writer<'a, W: Write + Seek> { channel_message_counts: BTreeMap, } -impl<'a, W: Write + Seek> Writer<'a, W> { +impl Writer { pub fn new(writer: W) -> McapResult { Self::with_options(writer, WriteOptions::default()) } @@ -233,11 +249,11 @@ impl<'a, W: Write + Seek> Writer<'a, W> { Ok(Self { writer: Some(WriteMode::Raw(writer, chunk_mode)), options: opts, - schemas: Vec::new(), - channels: Vec::new(), - chunk_indexes: Vec::new(), - attachment_indexes: Vec::new(), - metadata_indexes: Vec::new(), + schemas: Default::default(), + channels: Default::default(), + chunk_indexes: Default::default(), + attachment_indexes: Default::default(), + metadata_indexes: Default::default(), message_bounds: None, channel_message_counts: BTreeMap::new(), }) @@ -246,34 +262,41 @@ impl<'a, W: Write + Seek> Writer<'a, W> { // adds a schema, returning its ID. If a schema with the same content has been added already, // its ID is returned. pub fn add_schema(&mut self, name: &str, encoding: &str, data: &[u8]) -> McapResult { - let mut max_schema_id: u16 = 0; - for schema in &self.schemas { - max_schema_id = std::cmp::max(schema.id, max_schema_id); - if schema.name == name && schema.encoding == encoding && schema.data == data { - return Ok(schema.id); - } + if let Some(&id) = self.schemas.get_by_left(&SchemaContent { + name: name.into(), + encoding: encoding.into(), + data: data.into(), + }) { + return Ok(id); } - let next_schema_id = max_schema_id + 1; - + let next_schema_id = self.schemas.right_values().max().unwrap_or(&0) + 1; + let schema = Schema { + id: next_schema_id, + name: name.into(), + encoding: encoding.into(), + data: Cow::Owned(data.into()), + }; + self.schemas.insert( + SchemaContent { + name: Cow::Owned(name.into()), + encoding: Cow::Owned(encoding.into()), + data: Cow::Owned(data.into()), + }, + next_schema_id, + ); if self.options.use_chunks { - let schema = Schema { - id: next_schema_id, - name: name.into(), - encoding: encoding.into(), - data: data.into(), - }; self.chunkin_time()?.write_schema(schema)?; } else { let header = records::SchemaHeader { - id: next_schema_id, - name: name.into(), - encoding: encoding.into(), + id: schema.id, + name: schema.name, + encoding: schema.encoding, }; write_record( &mut self.finish_chunk()?, &Record::Schema { header, - data: Cow::Borrowed(data), + data: schema.data, }, )?; } @@ -293,19 +316,21 @@ impl<'a, W: Write + Seek> Writer<'a, W> { message_encoding: &str, metadata: &BTreeMap, ) -> McapResult { - let mut max_channel_id: u16 = 0; - for channel in &self.channels { - max_channel_id = std::cmp::max(channel.id, max_channel_id); - if channel.schema_id == schema_id - && channel.topic == topic - && channel.message_encoding == message_encoding - && channel.metadata == *metadata - { - return Ok(channel.id); - } + if let Some(&id) = self.channels.get_by_left(&ChannelContent { + topic: Cow::Borrowed(topic), + schema_id: schema_id, + message_encoding: Cow::Borrowed(message_encoding), + metadata: Cow::Borrowed(metadata), + }) { + return Ok(id); } - let next_channel_id = max_channel_id + 1; - if schema_id != 0 && !self.schemas.iter().any(|schema| schema.id == schema_id) { + let next_channel_id = self + .channels + .right_values() + .max() + .map(|n| n + 1) + .unwrap_or(0); + if schema_id != 0 && self.schemas.get_by_right(&schema_id).is_none() { return Err(McapError::UnknownSchema(topic.into(), schema_id)); } @@ -316,6 +341,15 @@ impl<'a, W: Write + Seek> Writer<'a, W> { message_encoding: message_encoding.into(), metadata: metadata.clone(), }; + self.channels.insert( + ChannelContent { + topic: Cow::Owned(topic.into()), + schema_id: schema_id, + message_encoding: Cow::Owned(message_encoding.into()), + metadata: Cow::Owned(metadata.clone()), + }, + next_channel_id, + ); if self.options.use_chunks { self.chunkin_time()?.write_channel(rec)? @@ -328,7 +362,7 @@ impl<'a, W: Write + Seek> Writer<'a, W> { /// Write the given message (and its provided channel, if needed). /// The provided channel ID and schema ID are ignored and new ones will be assigned in the /// resulting MCAP records. - pub fn write(&mut self, message: &Message<'a>) -> McapResult<()> { + pub fn write(&mut self, message: &Message) -> McapResult<()> { let schema_id: u16 = match message.channel.schema.as_ref() { None => 0, Some(schema) => self.add_schema(&schema.name, &schema.encoding, &schema.data)?, @@ -357,13 +391,7 @@ impl<'a, W: Write + Seek> Writer<'a, W> { header: &MessageHeader, data: &[u8], ) -> McapResult<()> { - // The number of channels should be relatively small, - // do a quick linear search to make sure we're not being given a bogus ID - if !self - .channels - .iter() - .any(|channel| channel.id == header.channel_id) - { + if self.channels.get_by_right(&header.channel_id).is_none() { return Err(McapError::UnknownChannel( header.sequence, header.channel_id, @@ -692,9 +720,29 @@ impl<'a, W: Write + Seek> Writer<'a, W> { let mut metadata_indexes = Vec::new(); std::mem::swap(&mut metadata_indexes, &mut self.metadata_indexes); - self.channels.sort_by_key(|c| c.id); - - self.schemas.sort_by_key(|s| s.id); + let all_channels: Vec<_> = self + .channels + .iter() + .map(|(content, &id)| records::Channel { + id, + schema_id: content.schema_id, + topic: content.topic.clone().into(), + message_encoding: content.message_encoding.clone().into(), + metadata: content.metadata.clone().into_owned(), + }) + .collect(); + let all_schemas: Vec<_> = self + .schemas + .iter() + .map(|(content, &id)| Record::Schema { + header: records::SchemaHeader { + id, + name: content.name.clone().into(), + encoding: content.encoding.clone().into(), + }, + data: content.data.clone().into(), + }) + .collect(); let mut offsets = Vec::new(); @@ -709,15 +757,8 @@ impl<'a, W: Write + Seek> Writer<'a, W> { // Write all schemas. let schemas_start = summary_start; - for schema in self.schemas.iter() { - let header = records::SchemaHeader { - id: schema.id, - name: schema.name.clone(), - encoding: schema.encoding.clone(), - }; - let data = schema.data.clone(); - - write_record(&mut ccw, &Record::Schema { header, data })?; + for schema in all_schemas.iter() { + write_record(&mut ccw, schema)?; } let schemas_end = posit(&mut ccw)?; if schemas_end - schemas_start > 0 { @@ -730,8 +771,8 @@ impl<'a, W: Write + Seek> Writer<'a, W> { // Write all channels. let channels_start = schemas_end; - for channel in self.channels.iter() { - write_record(&mut ccw, &Record::Channel(channel.clone()))?; + for channel in all_channels { + write_record(&mut ccw, &Record::Channel(channel))?; } let channels_end = posit(&mut ccw)?; if channels_end - channels_start > 0 { @@ -820,7 +861,7 @@ impl<'a, W: Write + Seek> Writer<'a, W> { } } -impl<'a, W: Write + Seek> Drop for Writer<'a, W> { +impl<'a, W: Write + Seek> Drop for Writer { fn drop(&mut self) { self.finish().unwrap() } From 5308c465382b90bc5c2ccd2b667ef74dbf0a916d Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 19 Dec 2024 12:32:10 +1100 Subject: [PATCH 4/8] clippy and all --- rust/examples/conformance_writer.rs | 2 +- rust/src/lib.rs | 9 +-- rust/src/write.rs | 115 +++++++++++++++++++--------- rust/tests/message.rs | 8 +- 4 files changed, 83 insertions(+), 51 deletions(-) diff --git a/rust/examples/conformance_writer.rs b/rust/examples/conformance_writer.rs index df0de0a1da..37d94a8400 100644 --- a/rust/examples/conformance_writer.rs +++ b/rust/examples/conformance_writer.rs @@ -29,7 +29,7 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) { media_type: record.get_field_str("media_type").to_owned(), }; writer - .attach(&attachment) + .attach(&attachment) .expect("Couldn't write attachment"); } "AttachmentIndex" => { diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 94ba78e357..6b77507d06 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -39,14 +39,7 @@ //! //! // Channels and schemas are automatically assigned ID as they're serialized, //! // and automatically deduplicated with `Arc` when deserialized. -//! let my_channel = Channel { -//! topic: String::from("cool stuff"), -//! schema: None, -//! message_encoding: String::from("application/octet-stream"), -//! metadata: BTreeMap::default() -//! }; -//! -//! let channel_id = out.add_channel(&my_channel)?; +//! let channel_id = out.add_channel(0, "cool stuff", "application/octet-stream", &BTreeMap::new())?; //! //! out.write_to_known_channel( //! &MessageHeader { diff --git a/rust/src/write.rs b/rust/src/write.rs index 9643361dce..abc2c50c03 100644 --- a/rust/src/write.rs +++ b/rust/src/write.rs @@ -176,7 +176,7 @@ impl WriteOptions { } /// Creates a [`Writer`] whch writes to `w` using the given options - pub fn create<'a, W: Write + Seek>(self, w: W) -> McapResult> { + pub fn create(self, w: W) -> McapResult> { Writer::with_options(w, self) } } @@ -270,37 +270,40 @@ impl Writer { return Ok(id); } let next_schema_id = self.schemas.right_values().max().unwrap_or(&0) + 1; - let schema = Schema { + self.write_schema(Schema { id: next_schema_id, name: name.into(), encoding: encoding.into(), data: Cow::Owned(data.into()), - }; + })?; + Ok(next_schema_id) + } + + fn write_schema(&mut self, schema: Schema) -> McapResult<()> { self.schemas.insert( SchemaContent { - name: Cow::Owned(name.into()), - encoding: Cow::Owned(encoding.into()), - data: Cow::Owned(data.into()), + name: Cow::Owned(schema.name.clone()), + encoding: Cow::Owned(schema.encoding.clone()), + data: Cow::Owned(schema.data.clone().into_owned()), }, - next_schema_id, + schema.id, ); if self.options.use_chunks { - self.chunkin_time()?.write_schema(schema)?; + self.chunkin_time()?.write_schema(schema) } else { let header = records::SchemaHeader { id: schema.id, name: schema.name, encoding: schema.encoding, }; - write_record( + Ok(write_record( &mut self.finish_chunk()?, &Record::Schema { header, data: schema.data, }, - )?; + )?) } - Ok(next_schema_id) } /// Adds a channel, returning its ID. If a channel with equivalent content was added previously, @@ -318,7 +321,7 @@ impl Writer { ) -> McapResult { if let Some(&id) = self.channels.get_by_left(&ChannelContent { topic: Cow::Borrowed(topic), - schema_id: schema_id, + schema_id, message_encoding: Cow::Borrowed(message_encoding), metadata: Cow::Borrowed(metadata), }) { @@ -334,47 +337,83 @@ impl Writer { return Err(McapError::UnknownSchema(topic.into(), schema_id)); } - let rec = records::Channel { + self.write_channel(records::Channel { id: next_channel_id, schema_id, topic: topic.into(), message_encoding: message_encoding.into(), metadata: metadata.clone(), - }; + })?; + Ok(next_channel_id) + } + + fn write_channel(&mut self, channel: records::Channel) -> McapResult<()> { self.channels.insert( ChannelContent { - topic: Cow::Owned(topic.into()), - schema_id: schema_id, - message_encoding: Cow::Owned(message_encoding.into()), - metadata: Cow::Owned(metadata.clone()), + topic: Cow::Owned(channel.topic.clone()), + schema_id: channel.schema_id, + message_encoding: Cow::Owned(channel.message_encoding.clone()), + metadata: Cow::Owned(channel.metadata.clone()), }, - next_channel_id, + channel.id, ); if self.options.use_chunks { - self.chunkin_time()?.write_channel(rec)? + self.chunkin_time()?.write_channel(channel) } else { - write_record(self.finish_chunk()?, &Record::Channel(rec))?; + Ok(write_record( + self.finish_chunk()?, + &Record::Channel(channel), + )?) } - Ok(next_channel_id) } - /// Write the given message (and its provided channel, if needed). - /// The provided channel ID and schema ID are ignored and new ones will be assigned in the - /// resulting MCAP records. + /// Write the given message (and its provided channel, if not already added). + /// The provided channel ID and schema ID will be used as IDs in the resulting MCAP. pub fn write(&mut self, message: &Message) -> McapResult<()> { - let schema_id: u16 = match message.channel.schema.as_ref() { - None => 0, - Some(schema) => self.add_schema(&schema.name, &schema.encoding, &schema.data)?, - }; - let channel_id = self.add_channel( - schema_id, - &message.channel.topic, - &message.channel.message_encoding, - &message.channel.metadata, - )?; + if let Some(schema) = message.channel.schema.as_ref() { + match self.schemas.get_by_right(&schema.id) { + Some(expected) => { + let actual = SchemaContent { + name: Cow::Borrowed(&schema.name), + encoding: Cow::Borrowed(&schema.encoding), + data: Cow::Borrowed(&schema.data), + }; + if *expected != actual { + return Err(McapError::ConflictingSchemas(schema.name.clone())); + } + } + None => { + self.write_schema(schema.as_ref().clone())?; + } + } + } + match self.channels.get_by_right(&message.channel.id) { + Some(expected) => { + let actual = ChannelContent { + topic: Cow::Borrowed(&message.channel.topic), + schema_id: message.channel.schema.as_ref().map(|s| s.id).unwrap_or(0), + message_encoding: Cow::Borrowed(&message.channel.message_encoding), + metadata: Cow::Borrowed(&message.channel.metadata), + }; + if *expected != actual { + return Err(McapError::ConflictingChannels( + message.channel.topic.clone(), + )); + } + } + None => { + self.write_channel(records::Channel { + id: message.channel.id, + schema_id: message.channel.schema.as_ref().map(|s| s.id).unwrap_or(0), + topic: message.channel.topic.clone(), + message_encoding: message.channel.message_encoding.clone(), + metadata: message.channel.metadata.clone(), + })?; + } + } let header = MessageHeader { - channel_id, + channel_id: message.channel.id, sequence: message.sequence, log_time: message.log_time, publish_time: message.publish_time, @@ -740,7 +779,7 @@ impl Writer { name: content.name.clone().into(), encoding: content.encoding.clone().into(), }, - data: content.data.clone().into(), + data: content.data.clone(), }) .collect(); @@ -861,7 +900,7 @@ impl Writer { } } -impl<'a, W: Write + Seek> Drop for Writer { +impl Drop for Writer { fn drop(&mut self) { self.finish().unwrap() } diff --git a/rust/tests/message.rs b/rust/tests/message.rs index 6354604ecc..51eb772b2d 100644 --- a/rust/tests/message.rs +++ b/rust/tests/message.rs @@ -17,7 +17,7 @@ fn smoke() -> Result<()> { let expected = mcap::Message { channel: Arc::new(mcap::Channel { - id: 0, + id: 1, schema: Some(Arc::new(mcap::Schema { id: 1, name: String::from("Example"), @@ -74,7 +74,7 @@ fn run_round_trip(use_chunks: bool) -> Result<()> { }); let channel = Arc::new(mcap::Channel { - id: 0, + id: 1, schema: Some(schema.clone()), topic: String::from("example"), message_encoding: String::from("a"), @@ -89,10 +89,10 @@ fn run_round_trip(use_chunks: bool) -> Result<()> { chunk_count: if use_chunks { 1 } else { 0 }, message_start_time: 2, message_end_time: 2, - channel_message_counts: [(0, 1)].into(), + channel_message_counts: [(1, 1)].into(), ..Default::default() }), - channels: [(0, channel.clone())].into(), + channels: [(1, channel.clone())].into(), schemas: [(1, schema.clone())].into(), ..Default::default() }; From bd7c120be92c513ebf913fa2378de73a393b864d Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 19 Dec 2024 13:35:56 +1100 Subject: [PATCH 5/8] switch to all targets for clippy --- .github/workflows/ci.yml | 2 +- rust/benches/reader.rs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8476a2b2d0..4e15e0439a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -493,7 +493,7 @@ jobs: components: "rustfmt, clippy" - run: rustup target add wasm32-unknown-unknown - run: cargo fmt --all -- --check - - run: cargo clippy -- --no-deps + - run: cargo clippy --all-targets -- --no-deps - run: cargo clippy --no-default-features -- --no-deps - run: cargo clippy --no-default-features --features lz4 -- --no-deps - run: cargo clippy --no-default-features --features zstd -- --no-deps diff --git a/rust/benches/reader.rs b/rust/benches/reader.rs index d234ccf6d3..2c1624a829 100644 --- a/rust/benches/reader.rs +++ b/rust/benches/reader.rs @@ -17,12 +17,14 @@ fn create_test_mcap(n: usize, compression: Option) -> Vec const MESSAGE_DATA: &[u8] = &[42; 10]; let schema = Arc::new(Schema { + id: 1, name: "TestSchema".to_string(), encoding: "raw".to_string(), data: Cow::Borrowed(b"{}"), }); let channel = Arc::new(Channel { + id: 0, topic: "test_topic".to_string(), message_encoding: "raw".to_string(), metadata: Default::default(), @@ -35,7 +37,7 @@ fn create_test_mcap(n: usize, compression: Option) -> Vec sequence: i as u32, log_time: i as u64, publish_time: i as u64, - data: Cow::Borrowed(&MESSAGE_DATA), + data: Cow::Borrowed(MESSAGE_DATA), }; writer.write(&message).unwrap(); } From 360595206618e33b7e5f8774a97921492c26b757 Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 19 Dec 2024 13:58:43 +1100 Subject: [PATCH 6/8] naming --- rust/src/write.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/rust/src/write.rs b/rust/src/write.rs index abc2c50c03..b44e24f977 100644 --- a/rust/src/write.rs +++ b/rust/src/write.rs @@ -259,7 +259,7 @@ impl Writer { }) } - // adds a schema, returning its ID. If a schema with the same content has been added already, + // Adds a schema, returning its ID. If a schema with the same content has been added already, // its ID is returned. pub fn add_schema(&mut self, name: &str, encoding: &str, data: &[u8]) -> McapResult { if let Some(&id) = self.schemas.get_by_left(&SchemaContent { @@ -307,9 +307,9 @@ impl Writer { } /// Adds a channel, returning its ID. If a channel with equivalent content was added previously, - /// its ID i sreturned. + /// its ID is returned. /// - /// Provide a schema_id returned from [`Self::add_schema`], or 0 if the channel is schemaless. + /// Provide a schema_id returned from [`Self::add_schema`], or 0 if the channel has no schema. /// /// Useful with subequent calls to [`write_to_known_channel()`](Self::write_to_known_channel) pub fn add_channel( @@ -373,13 +373,14 @@ impl Writer { pub fn write(&mut self, message: &Message) -> McapResult<()> { if let Some(schema) = message.channel.schema.as_ref() { match self.schemas.get_by_right(&schema.id) { - Some(expected) => { - let actual = SchemaContent { + Some(previous) => { + // ensure that this message schema does not conflict with the existing one's content + let current = SchemaContent { name: Cow::Borrowed(&schema.name), encoding: Cow::Borrowed(&schema.encoding), data: Cow::Borrowed(&schema.data), }; - if *expected != actual { + if *previous != current { return Err(McapError::ConflictingSchemas(schema.name.clone())); } } @@ -388,15 +389,19 @@ impl Writer { } } } + let schema_id = match message.channel.schema.as_ref() { + None => 0, + Some(schema) => schema.id, + }; match self.channels.get_by_right(&message.channel.id) { - Some(expected) => { - let actual = ChannelContent { + Some(previous) => { + let current = ChannelContent { topic: Cow::Borrowed(&message.channel.topic), - schema_id: message.channel.schema.as_ref().map(|s| s.id).unwrap_or(0), + schema_id, message_encoding: Cow::Borrowed(&message.channel.message_encoding), metadata: Cow::Borrowed(&message.channel.metadata), }; - if *expected != actual { + if *previous != current { return Err(McapError::ConflictingChannels( message.channel.topic.clone(), )); From 166ae2b0a649811932baec20d390c6aee0b650c6 Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 19 Dec 2024 14:00:40 +1100 Subject: [PATCH 7/8] use len --- rust/src/write.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/rust/src/write.rs b/rust/src/write.rs index b44e24f977..8bd1054d8b 100644 --- a/rust/src/write.rs +++ b/rust/src/write.rs @@ -269,7 +269,7 @@ impl Writer { }) { return Ok(id); } - let next_schema_id = self.schemas.right_values().max().unwrap_or(&0) + 1; + let next_schema_id = self.schemas.len() as u16 + 1; self.write_schema(Schema { id: next_schema_id, name: name.into(), @@ -327,12 +327,7 @@ impl Writer { }) { return Ok(id); } - let next_channel_id = self - .channels - .right_values() - .max() - .map(|n| n + 1) - .unwrap_or(0); + let next_channel_id = self.channels.len() as u16; if schema_id != 0 && self.schemas.get_by_right(&schema_id).is_none() { return Err(McapError::UnknownSchema(topic.into(), schema_id)); } From 0e5aef19cb4428c4a3790bddc2c8d87eb122e25f Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 19 Dec 2024 14:38:54 +1100 Subject: [PATCH 8/8] fix conformance writer --- rust/examples/conformance_writer.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/rust/examples/conformance_writer.rs b/rust/examples/conformance_writer.rs index 37d94a8400..8fdc3fae54 100644 --- a/rust/examples/conformance_writer.rs +++ b/rust/examples/conformance_writer.rs @@ -37,13 +37,19 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) { } "Channel" => { let id = record.get_field_u16("id"); - let schema_id = record.get_field_u64("schema_id"); + let schema_id = record.get_field_u16("schema_id"); + let output_schema_id = match schema_id { + 0 => 0, + input_schema_id => { + *schema_ids.get(&input_schema_id).expect("unknown schema ID") + } + }; let topic = record.get_field_str("topic"); let message_encoding = record.get_field_str("message_encoding"); let returned_id = writer - .add_channel(schema_id as u16, topic, message_encoding, &BTreeMap::new()) + .add_channel(output_schema_id, topic, message_encoding, &BTreeMap::new()) .expect("Couldn't write channel"); - channel_ids.insert(returned_id, id); + channel_ids.insert(id, returned_id); } "ChunkIndex" => { // written automatically @@ -110,7 +116,7 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) { let returned_id = writer .add_schema(name, encoding, &data) .expect("cannot write schema"); - schema_ids.insert(returned_id, id); + schema_ids.insert(id, returned_id); } "Statistics" => { // written automatically