From 64d6555865a19ccabde220f469ee11517d6143aa Mon Sep 17 00:00:00 2001 From: Troy Kohler Date: Sat, 4 Jan 2025 10:57:01 +0100 Subject: [PATCH 1/3] feat: add NMEA stream parser with message chunking --- src/stream.rs | 159 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 src/stream.rs diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..729a28d --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,159 @@ +use std::io::Read; + +// an iterator-based parser that will parse new messages +// from e.g. streamed/received bytes when \r\n ending is present. +// +// we should 'chunkinate' the bytes based on the \r\n ending +// https://www.perplexity.ai/search/what-is-the-core-difference-be-P9yWefFfTP6DlB_f5vEl4w#12 + +struct NmeaStreamParser { + buffer: Vec, + separator: Vec, +} + +impl NmeaStreamParser { + fn new() -> Self { + NmeaStreamParser { + buffer: Vec::new(), + separator: b"\r\n".to_vec(), + } + } + + fn process_chunk(&mut self, chunk: &[u8]) -> Vec> { + self.buffer.extend_from_slice(chunk); + let mut messages = Vec::new(); + + while let Some(pos) = self + .buffer + .windows(self.separator.len()) + .position(|window| window == self.separator.as_slice()) + { + let message = self.buffer.drain(..pos).collect(); + messages.push(message); + self.buffer.drain(..self.separator.len()); + } + + messages + } +} + +#[allow(dead_code)] +struct MessageStream { + reader: R, + parser: NmeaStreamParser, + chunk_size: usize, +} + +impl MessageStream { + #[allow(dead_code)] + fn new(reader: R, chunk_size: usize) -> Self { + MessageStream { + reader, + parser: NmeaStreamParser::new(), + chunk_size, + } + } +} + +impl Iterator for MessageStream { + type Item = Result, std::io::Error>; + + fn next(&mut self) -> Option { + let mut chunk = vec![0; self.chunk_size]; + loop { + match self.reader.read(&mut chunk) { + Ok(0) => return None, // End of stream + Ok(n) => { + let mut messages = self.parser.process_chunk(&chunk[..n]); + if !messages.is_empty() { + return Some(Ok(messages.remove(0))); + } + } + Err(e) => return Some(Err(e)), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_stream_parser() { + let mut parser = NmeaStreamParser::new(); + let chunk = b"$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r\n$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r\n"; + let messages = parser.process_chunk(chunk); + assert_eq!(messages.len(), 2); + } + + #[test] + fn test_stream_parser_partial_message() { + let mut parser = NmeaStreamParser::new(); + let chunk1 = b"$GPGLL,4916.45,N,123"; + let chunk2 = b"11.12,W,225444,A,*1D\r\n"; + + let messages = parser.process_chunk(chunk1); + assert_eq!(messages.len(), 0); + + let messages = parser.process_chunk(chunk2); + assert_eq!(messages.len(), 1); + } + + #[test] + fn test_stream_parser_multiple_chunks() { + let mut parser = NmeaStreamParser::new(); + let chunks = [ + b"$GPGLL,4916.45,N,12311.12,W,2254", + b"44,A,*1D\r\n$GPGLL,4916.45,N,12300", + b"11.12,W,225444,A,*1D\r\n$GPGLL,4..", + b"916.45,N,12311.12,W,225444,A,*1D", + b"\r\n$GPGLL,4916.45,N,12311.12,W,..", + b"225444,A,*1D\r\n..................", + b"$GPGLL,4916.45,N,12311.12,W,2254", + b"44,A,*1D\r\n$GPGLL,4916.45,N,123..", + b"11.12,W,225444,A,*1D\r\n..........", + ]; + + let mut total_messages = 0; + for chunk in chunks { + let messages = parser.process_chunk(chunk); + total_messages += messages.len(); + } + assert_eq!(total_messages, 6); + } + + #[test] + fn test_stream_parser_invalid_data() { + let mut parser = NmeaStreamParser::new(); + let chunk = b"Invalid data without separators"; + let messages = parser.process_chunk(chunk); + assert_eq!(messages.len(), 0); + } + + #[test] + fn test_stream_parser_empty_chunk() { + let mut parser = NmeaStreamParser::new(); + let messages = parser.process_chunk(b""); + assert_eq!(messages.len(), 0); + } + + #[test] + fn test_message_stream() { + let data = b"$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r\n$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r\n"; + let stream = MessageStream::new(&data[..], 10); + let messages: Vec, std::io::Error>> = stream.collect(); + assert_eq!(messages.len(), 2); + } + + #[test] + fn test_empty_message_stream() { + let data = b""; + let stream = MessageStream::new(&data[..], 10); + + // check that result is ok + for message in stream { + assert!(message.is_ok()); + } + } +} From ad4adbe015d53df4ff77f07c4becc1f913371130 Mon Sep 17 00:00:00 2001 From: Troy Kohler Date: Sat, 4 Jan 2025 10:57:32 +0100 Subject: [PATCH 2/3] refactor: remove unused perplexity.ai link from comments --- src/stream.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stream.rs b/src/stream.rs index 729a28d..a54e3fd 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -4,7 +4,6 @@ use std::io::Read; // from e.g. streamed/received bytes when \r\n ending is present. // // we should 'chunkinate' the bytes based on the \r\n ending -// https://www.perplexity.ai/search/what-is-the-core-difference-be-P9yWefFfTP6DlB_f5vEl4w#12 struct NmeaStreamParser { buffer: Vec, From 190d9983bb9a5bebbd3c459533cc4bbc7456d25d Mon Sep 17 00:00:00 2001 From: Troy Kohler Date: Sat, 4 Jan 2025 16:13:27 +0100 Subject: [PATCH 3/3] feat: add stream parser with example --- examples/parse_stream.rs | 33 +++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/parser.rs | 1 + src/stream.rs | 6 +++--- 4 files changed, 38 insertions(+), 3 deletions(-) create mode 100644 examples/parse_stream.rs diff --git a/examples/parse_stream.rs b/examples/parse_stream.rs new file mode 100644 index 0000000..354acda --- /dev/null +++ b/examples/parse_stream.rs @@ -0,0 +1,33 @@ +use std::io::{Cursor, Read}; +use nmea::stream::NmeaStreamParser; + +fn main() { + // Example data source: a Cursor over a byte slice + let data = b"$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r\n$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r\n"; + let mut reader = Cursor::new(data); + + let mut parser = NmeaStreamParser::new(); + + let chunk_size = 10; + let mut buffer = vec![0; chunk_size]; + + // Read and process data in chunks + while let Ok(n) = reader.read(&mut buffer) { + if n == 0 { + break; // End of stream + } + + // Process the chunk and get complete messages + let messages = parser.process_chunk(&buffer[..n]); + + // Handle each complete message + for message in messages { + // Convert the message from Vec to a string for display + if let Ok(message_str) = String::from_utf8(message) { + println!("Parsed message: {}", message_str); + } else { + eprintln!("Failed to convert message to string"); + } + } + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 66adebe..7e7787b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,6 +66,7 @@ mod error; pub(crate) mod parse; mod parser; +pub mod stream; pub mod sentences; diff --git a/src/parser.rs b/src/parser.rs index 0c1409b..f321ffc 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -245,6 +245,7 @@ impl<'a> Nmea { /// - and other /// /// The type of sentence is returned if implemented and valid. + // MARK: parse which currently works based on strings pub fn parse(&mut self, sentence: &'a str) -> Result> { match parse_str(sentence)? { ParseResult::VTG(vtg) => { diff --git a/src/stream.rs b/src/stream.rs index a54e3fd..ee9b03a 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -5,20 +5,20 @@ use std::io::Read; // // we should 'chunkinate' the bytes based on the \r\n ending -struct NmeaStreamParser { +pub struct NmeaStreamParser { buffer: Vec, separator: Vec, } impl NmeaStreamParser { - fn new() -> Self { + pub fn new() -> Self { NmeaStreamParser { buffer: Vec::new(), separator: b"\r\n".to_vec(), } } - fn process_chunk(&mut self, chunk: &[u8]) -> Vec> { + pub fn process_chunk(&mut self, chunk: &[u8]) -> Vec> { self.buffer.extend_from_slice(chunk); let mut messages = Vec::new();