From f53ff6224af30650f62b511be75884f7f9d4040a Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Sun, 9 Jun 2024 21:29:27 +0200 Subject: [PATCH] refactor: Simplify Parse trait by using Vec instead of AyncWriteExt (#30) --- Cargo.lock | 14 ---- breakwater-parser/Cargo.toml | 3 - breakwater-parser/benches/parsing.rs | 34 ++++------ breakwater-parser/src/assembler.rs | 12 +--- breakwater-parser/src/lib.rs | 17 +---- breakwater-parser/src/memchr.rs | 11 +--- breakwater-parser/src/original.rs | 70 ++++++++------------ breakwater-parser/src/refactored.rs | 95 ++++++++++------------------ breakwater/src/server.rs | 26 +++++--- 9 files changed, 98 insertions(+), 184 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 85d35eb..284746d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -213,9 +213,6 @@ dependencies = [ "enum_dispatch", "memchr", "pixelbomber", - "snafu", - "tokio", - "trait-variant", ] [[package]] @@ -1805,17 +1802,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "trait-variant" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70977707304198400eb4835a78f6a9f928bf41bba420deb8fdb175cd965d77a7" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "ttf-parser" version = "0.15.2" diff --git a/breakwater-parser/Cargo.toml b/breakwater-parser/Cargo.toml index 9277730..e300282 100644 --- a/breakwater-parser/Cargo.toml +++ b/breakwater-parser/Cargo.toml @@ -16,9 +16,6 @@ breakwater-core.workspace = true enum_dispatch.workspace = true memchr.workspace = true -snafu.workspace = true -tokio.workspace = true -trait-variant.workspace = true [dev-dependencies] criterion.workspace = true diff --git a/breakwater-parser/benches/parsing.rs b/breakwater-parser/benches/parsing.rs index 22153a7..4cd7591 100644 --- a/breakwater-parser/benches/parsing.rs +++ b/breakwater-parser/benches/parsing.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, time::Duration}; -use breakwater_core::{framebuffer::FrameBuffer, test_helpers::DevNullTcpStream}; +use breakwater_core::framebuffer::FrameBuffer; #[cfg(target_arch = "x86_64")] use breakwater_parser::assembler::AssemblerParser; use breakwater_parser::{ @@ -80,26 +80,20 @@ fn invoke_benchmark( for parse_name in parser_names { c_group.bench_with_input(parse_name, &commands, |b, input| { - b.to_async(tokio::runtime::Runtime::new().expect("Failed to start tokio runtime")) - .iter(|| async { - let mut parser = match parse_name { - "original" => { - ParserImplementation::Original(OriginalParser::new(fb.clone())) - } - "refactored" => { - ParserImplementation::Refactored(RefactoredParser::new(fb.clone())) - } - "memchr" => ParserImplementation::Naive(MemchrParser::new(fb.clone())), - #[cfg(target_arch = "x86_64")] - "assembler" => ParserImplementation::Assembler(AssemblerParser::default()), - _ => panic!("Parser implementation {parse_name} not known"), - }; + b.iter(|| { + let mut parser = match parse_name { + "original" => ParserImplementation::Original(OriginalParser::new(fb.clone())), + "refactored" => { + ParserImplementation::Refactored(RefactoredParser::new(fb.clone())) + } + "memchr" => ParserImplementation::Naive(MemchrParser::new(fb.clone())), + #[cfg(target_arch = "x86_64")] + "assembler" => ParserImplementation::Assembler(AssemblerParser::default()), + _ => panic!("Parser implementation {parse_name} not known"), + }; - parser - .parse(input, DevNullTcpStream::default()) - .await - .expect("Failed to parse commands"); - }); + parser.parse(input, &mut Vec::new()); + }); }); } } diff --git a/breakwater-parser/src/assembler.rs b/breakwater-parser/src/assembler.rs index eb1484c..0947fe4 100644 --- a/breakwater-parser/src/assembler.rs +++ b/breakwater-parser/src/assembler.rs @@ -1,8 +1,6 @@ use std::arch::asm; -use tokio::io::AsyncWriteExt; - -use crate::{Parser, ParserError}; +use crate::Parser; const PARSER_LOOKAHEAD: usize = "PX 1234 1234 rrggbbaa\n".len(); // Longest possible command @@ -10,11 +8,7 @@ const PARSER_LOOKAHEAD: usize = "PX 1234 1234 rrggbbaa\n".len(); // Longest poss pub struct AssemblerParser {} impl Parser for AssemblerParser { - async fn parse( - &mut self, - buffer: &[u8], - _stream: impl AsyncWriteExt + Send + Unpin, - ) -> Result { + fn parse(&mut self, buffer: &[u8], _response: &mut Vec) -> usize { let mut last_byte_parsed = 0; // This loop does nothing and should be seen as a placeholder @@ -33,7 +27,7 @@ impl Parser for AssemblerParser { ) } - Ok(last_byte_parsed) + last_byte_parsed } fn parser_lookahead(&self) -> usize { diff --git a/breakwater-parser/src/lib.rs b/breakwater-parser/src/lib.rs index 797def6..638eb00 100644 --- a/breakwater-parser/src/lib.rs +++ b/breakwater-parser/src/lib.rs @@ -2,8 +2,6 @@ #![feature(portable_simd)] use enum_dispatch::enum_dispatch; -use snafu::Snafu; -use tokio::io::AsyncWriteExt; #[cfg(target_arch = "x86_64")] pub mod assembler; @@ -11,21 +9,10 @@ pub mod memchr; pub mod original; pub mod refactored; -#[derive(Debug, Snafu)] -pub enum ParserError { - #[snafu(display("Failed to write to TCP socket"))] - WriteToTcpSocket { source: std::io::Error }, -} - #[enum_dispatch(ParserImplementation)] -// According to https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html -#[trait_variant::make(SendParser: Send)] pub trait Parser { - async fn parse( - &mut self, - buffer: &[u8], - stream: impl AsyncWriteExt + Send + Unpin, - ) -> Result; + /// Returns the last byte parsed. The next parsing loop will again contain all data that was not parsed. + fn parse(&mut self, buffer: &[u8], response: &mut Vec) -> usize; // Sadly this cant be const (yet?) (https://github.com/rust-lang/rust/issues/71971 and https://github.com/rust-lang/rfcs/pull/2632) fn parser_lookahead(&self) -> usize; diff --git a/breakwater-parser/src/memchr.rs b/breakwater-parser/src/memchr.rs index 8cb8a7c..6233133 100644 --- a/breakwater-parser/src/memchr.rs +++ b/breakwater-parser/src/memchr.rs @@ -1,9 +1,8 @@ use std::sync::Arc; use breakwater_core::framebuffer::FrameBuffer; -use tokio::io::AsyncWriteExt; -use crate::{Parser, ParserError}; +use crate::Parser; pub struct MemchrParser { fb: Arc, @@ -16,11 +15,7 @@ impl MemchrParser { } impl Parser for MemchrParser { - async fn parse( - &mut self, - buffer: &[u8], - _stream: impl AsyncWriteExt + Send + Unpin, - ) -> Result { + fn parse(&mut self, buffer: &[u8], _response: &mut Vec) -> usize { let mut last_char_after_newline = 0; for newline in memchr::memchr_iter(b'\n', buffer) { // TODO Use get_unchecked everywhere @@ -68,7 +63,7 @@ impl Parser for MemchrParser { } } - Ok(last_char_after_newline.saturating_sub(1)) + last_char_after_newline.saturating_sub(1) } fn parser_lookahead(&self) -> usize { diff --git a/breakwater-parser/src/original.rs b/breakwater-parser/src/original.rs index 39c5c33..e35b38a 100644 --- a/breakwater-parser/src/original.rs +++ b/breakwater-parser/src/original.rs @@ -4,9 +4,8 @@ use std::{ }; use breakwater_core::{framebuffer::FrameBuffer, ALT_HELP_TEXT, HELP_TEXT}; -use tokio::io::AsyncWriteExt; -use crate::{Parser, ParserError}; +use crate::Parser; pub const PARSER_LOOKAHEAD: usize = "PX 1234 1234 rrggbbaa\n".len(); // Longest possible command @@ -32,11 +31,7 @@ impl OriginalParser { } impl Parser for OriginalParser { - async fn parse( - &mut self, - buffer: &[u8], - mut stream: impl AsyncWriteExt + Send + Unpin, - ) -> Result { + fn parse(&mut self, buffer: &[u8], response: &mut Vec) -> usize { let mut last_byte_parsed = 0; let mut help_count = 0; @@ -131,22 +126,16 @@ impl Parser for OriginalParser { last_byte_parsed = i; i += 1; if let Some(rgb) = self.fb.get(x, y) { - match stream - .write_all( - format!( - "PX {} {} {:06x}\n", - // We don't want to return the actual (absolute) coordinates, the client should also get the result offseted - x - self.connection_x_offset, - y - self.connection_y_offset, - rgb.to_be() >> 8 - ) - .as_bytes(), + response.extend_from_slice( + format!( + "PX {} {} {:06x}\n", + // We don't want to return the actual (absolute) coordinates, the client should also get the result offseted + x - self.connection_x_offset, + y - self.connection_y_offset, + rgb.to_be() >> 8 ) - .await - { - Ok(_) => (), - Err(_) => continue, - } + .as_bytes(), + ); } continue; } @@ -167,31 +156,26 @@ impl Parser for OriginalParser { i += 4; last_byte_parsed = i - 1; - stream - .write_all( - format!("SIZE {} {}\n", self.fb.get_width(), self.fb.get_height()) - .as_bytes(), - ) - .await - .expect("Failed to write bytes to tcp socket"); + response.extend_from_slice( + format!("SIZE {} {}\n", self.fb.get_width(), self.fb.get_height()).as_bytes(), + ); continue; } else if current_command & 0xffff_ffff == HELP_PATTERN { i += 4; last_byte_parsed = i - 1; - #[allow(clippy::comparison_chain)] - if help_count < 3 { - stream - .write_all(HELP_TEXT) - .await - .expect("Failed to write bytes to tcp socket"); - help_count += 1; - } else if help_count == 3 { - stream - .write_all(ALT_HELP_TEXT) - .await - .expect("Failed to write bytes to tcp socket"); - help_count += 1; + match help_count { + 0..=2 => { + response.extend_from_slice(HELP_TEXT); + help_count += 1; + } + 3 => { + response.extend_from_slice(ALT_HELP_TEXT); + help_count += 1; + } + _ => { + // The client has requested the help to often, let's just ignore it + } } continue; } @@ -199,7 +183,7 @@ impl Parser for OriginalParser { i += 1; } - Ok(last_byte_parsed.wrapping_sub(1)) + last_byte_parsed.wrapping_sub(1) } fn parser_lookahead(&self) -> usize { diff --git a/breakwater-parser/src/refactored.rs b/breakwater-parser/src/refactored.rs index ced033c..2761145 100644 --- a/breakwater-parser/src/refactored.rs +++ b/breakwater-parser/src/refactored.rs @@ -1,14 +1,12 @@ use std::sync::Arc; use breakwater_core::{framebuffer::FrameBuffer, HELP_TEXT}; -use snafu::ResultExt; -use tokio::io::AsyncWriteExt; use crate::{ original::{ parse_pixel_coordinates, simd_unhex, HELP_PATTERN, OFFSET_PATTERN, PX_PATTERN, SIZE_PATTERN, }, - Parser, ParserError, + Parser, }; const PARSER_LOOKAHEAD: usize = "PX 1234 1234 rrggbbaa\n".len(); // Longest possible command @@ -29,12 +27,12 @@ impl RefactoredParser { } #[inline(always)] - async fn handle_pixel( + fn handle_pixel( &self, buffer: &[u8], mut idx: usize, - stream: &mut (impl AsyncWriteExt + Send + Unpin), - ) -> Result<(usize, usize), ParserError> { + response: &mut Vec, + ) -> (usize, usize) { let previous = idx; idx += 3; @@ -55,33 +53,33 @@ impl RefactoredParser { if unsafe { *buffer.get_unchecked(idx + 6) } == b'\n' { idx += 7; self.handle_rgb(idx, buffer, x, y); - Ok((idx, idx)) + (idx, idx) } // ... or must be followed by 8 bytes RGBA and newline else if unsafe { *buffer.get_unchecked(idx + 8) } == b'\n' { idx += 9; self.handle_rgba(idx, buffer, x, y); - Ok((idx, idx)) + (idx, idx) } // ... for the efficient/lazy clients else if unsafe { *buffer.get_unchecked(idx + 2) } == b'\n' { idx += 3; self.handle_gray(idx, buffer, x, y); - Ok((idx, idx)) + (idx, idx) } else { - Ok((idx, previous)) + (idx, previous) } } // End of command to read Pixel value else if unsafe { *buffer.get_unchecked(idx) } == b'\n' { idx += 1; - self.handle_get_pixel(stream, x, y).await?; - Ok((idx, idx)) + self.handle_get_pixel(response, x, y); + (idx, idx) } else { - Ok((idx, previous)) + (idx, previous) } } else { - Ok((idx, previous)) + (idx, previous) } } @@ -97,29 +95,15 @@ impl RefactoredParser { } #[inline(always)] - async fn handle_size( - &self, - stream: &mut (impl AsyncWriteExt + Send + Unpin), - ) -> Result<(), ParserError> { - stream - .write_all( - format!("SIZE {} {}\n", self.fb.get_width(), self.fb.get_height()).as_bytes(), - ) - .await - .context(crate::WriteToTcpSocketSnafu)?; - Ok(()) + fn handle_size(&self, response: &mut Vec) { + response.extend_from_slice( + format!("SIZE {} {}\n", self.fb.get_width(), self.fb.get_height()).as_bytes(), + ); } #[inline(always)] - async fn handle_help( - &self, - stream: &mut (impl AsyncWriteExt + Send + Unpin), - ) -> Result<(), ParserError> { - stream - .write_all(HELP_TEXT) - .await - .context(crate::WriteToTcpSocketSnafu)?; - Ok(()) + fn handle_help(&self, response: &mut Vec) { + response.extend_from_slice(HELP_TEXT); } #[inline(always)] @@ -173,37 +157,24 @@ impl RefactoredParser { } #[inline(always)] - async fn handle_get_pixel( - &self, - stream: &mut (impl AsyncWriteExt + Send + Unpin), - x: usize, - y: usize, - ) -> Result<(), ParserError> { + fn handle_get_pixel(&self, response: &mut Vec, x: usize, y: usize) { if let Some(rgb) = self.fb.get(x, y) { - stream - .write_all( - format!( - "PX {} {} {:06x}\n", - // We don't want to return the actual (absolute) coordinates, the client should also get the result offseted - x - self.connection_x_offset, - y - self.connection_y_offset, - rgb.to_be() >> 8 - ) - .as_bytes(), + response.extend_from_slice( + format!( + "PX {} {} {:06x}\n", + // We don't want to return the actual (absolute) coordinates, the client should also get the result offseted + x - self.connection_x_offset, + y - self.connection_y_offset, + rgb.to_be() >> 8 ) - .await - .context(crate::WriteToTcpSocketSnafu)?; + .as_bytes(), + ); } - Ok(()) } } impl Parser for RefactoredParser { - async fn parse( - &mut self, - buffer: &[u8], - mut stream: impl AsyncWriteExt + Send + Unpin, - ) -> Result { + fn parse(&mut self, buffer: &[u8], response: &mut Vec) -> usize { let mut last_byte_parsed = 0; let mut i = 0; // We can't use a for loop here because Rust don't lets use skip characters by incrementing i @@ -213,7 +184,7 @@ impl Parser for RefactoredParser { let current_command = unsafe { (buffer.as_ptr().add(i) as *const u64).read_unaligned() }; if current_command & 0x00ff_ffff == PX_PATTERN { - (i, last_byte_parsed) = self.handle_pixel(buffer, i, &mut stream).await?; + (i, last_byte_parsed) = self.handle_pixel(buffer, i, response); } else if current_command & 0x00ff_ffff_ffff_ffff == OFFSET_PATTERN { i += 7; self.handle_offset(&mut i, buffer); @@ -221,17 +192,17 @@ impl Parser for RefactoredParser { } else if current_command & 0xffff_ffff == SIZE_PATTERN { i += 4; last_byte_parsed = i; - self.handle_size(&mut stream).await?; + self.handle_size(response); } else if current_command & 0xffff_ffff == HELP_PATTERN { i += 4; last_byte_parsed = i; - self.handle_help(&mut stream).await?; + self.handle_help(response); } else { i += 1; } } - Ok(last_byte_parsed.wrapping_sub(1)) + last_byte_parsed.wrapping_sub(1) } fn parser_lookahead(&self) -> usize { diff --git a/breakwater/src/server.rs b/breakwater/src/server.rs index 99b0025..d378228 100644 --- a/breakwater/src/server.rs +++ b/breakwater/src/server.rs @@ -3,9 +3,8 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::{cmp::min, net::IpAddr, sync::Arc, time::Duration}; -use breakwater_core::framebuffer::FrameBuffer; -use breakwater_core::CONNECTION_DENIED_TEXT; -use breakwater_parser::{original::OriginalParser, Parser, ParserError}; +use breakwater_core::{framebuffer::FrameBuffer, CONNECTION_DENIED_TEXT}; +use breakwater_parser::{original::OriginalParser, Parser}; use log::{debug, info, warn}; use memadvise::{Advice, MemAdviseError}; use snafu::{ResultExt, Snafu}; @@ -32,13 +31,13 @@ pub enum Error { #[snafu(display("Failed to accept new client connection"))] AcceptNewClientConnection { source: std::io::Error }, + #[snafu(display("Failed to write to client connection"))] + WriteToClientConnection { source: std::io::Error }, + #[snafu(display("Failed to write to statistics channel"))] WriteToStatisticsChannel { source: mpsc::error::SendError, }, - - #[snafu(display("Failed to parse Pixelflut commands"))] - ParsePixelflutCommands { source: ParserError }, } pub struct Server { @@ -161,6 +160,7 @@ pub async fn handle_connection( let layout = alloc::Layout::from_size_align(network_buffer_size, page_size).unwrap(); let ptr = unsafe { alloc::alloc(layout) }; let buffer = unsafe { std::slice::from_raw_parts_mut(ptr, network_buffer_size) }; + let mut response_buf = Vec::new(); if let Err(err) = memadvise::advise(buffer.as_ptr() as _, buffer.len(), Advice::Sequential) { // [`MemAdviseError`] does not implement Debug... @@ -233,10 +233,16 @@ pub async fn handle_connection( *i = 0; } - let last_byte_parsed = parser - .parse(&buffer[..data_end + parser_lookahead], &mut stream) - .await - .context(ParsePixelflutCommandsSnafu)?; + let last_byte_parsed = + parser.parse(&buffer[..data_end + parser_lookahead], &mut response_buf); + + if !response_buf.is_empty() { + stream + .write_all(&response_buf) + .await + .context(WriteToClientConnectionSnafu)?; + response_buf.clear(); + } // IMPORTANT: We have to subtract 1 here, as e.g. we have "PX 0 0\n" data_end is 7 and parser_state.last_byte_parsed is 6. // This happens, because last_byte_parsed is an index starting at 0, so index 6 is from an array of length 7