Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Simplify Parse trait by using Vec<u8> instead of AyncWriteExt #30

Merged
merged 2 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions breakwater-parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ breakwater-core.workspace = true

enum_dispatch.workspace = true
memchr.workspace = true
snafu.workspace = true
tokio.workspace = true
trait-variant.workspace = true

[dev-dependencies]
criterion.workspace = true
Expand Down
34 changes: 14 additions & 20 deletions breakwater-parser/benches/parsing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{sync::Arc, time::Duration};

use breakwater_core::{framebuffer::FrameBuffer, test_helpers::DevNullTcpStream};
use breakwater_core::framebuffer::FrameBuffer;
#[cfg(target_arch = "x86_64")]
use breakwater_parser::assembler::AssemblerParser;
use breakwater_parser::{
Expand Down Expand Up @@ -80,26 +80,20 @@ fn invoke_benchmark(

for parse_name in parser_names {
c_group.bench_with_input(parse_name, &commands, |b, input| {
b.to_async(tokio::runtime::Runtime::new().expect("Failed to start tokio runtime"))
.iter(|| async {
let mut parser = match parse_name {
"original" => {
ParserImplementation::Original(OriginalParser::new(fb.clone()))
}
"refactored" => {
ParserImplementation::Refactored(RefactoredParser::new(fb.clone()))
}
"memchr" => ParserImplementation::Naive(MemchrParser::new(fb.clone())),
#[cfg(target_arch = "x86_64")]
"assembler" => ParserImplementation::Assembler(AssemblerParser::default()),
_ => panic!("Parser implementation {parse_name} not known"),
};
b.iter(|| {
let mut parser = match parse_name {
"original" => ParserImplementation::Original(OriginalParser::new(fb.clone())),
"refactored" => {
ParserImplementation::Refactored(RefactoredParser::new(fb.clone()))
}
"memchr" => ParserImplementation::Naive(MemchrParser::new(fb.clone())),
#[cfg(target_arch = "x86_64")]
"assembler" => ParserImplementation::Assembler(AssemblerParser::default()),
_ => panic!("Parser implementation {parse_name} not known"),
};

parser
.parse(input, DevNullTcpStream::default())
.await
.expect("Failed to parse commands");
});
parser.parse(input, &mut Vec::new());
});
});
}
}
Expand Down
12 changes: 3 additions & 9 deletions breakwater-parser/src/assembler.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
use std::arch::asm;

use tokio::io::AsyncWriteExt;

use crate::{Parser, ParserError};
use crate::Parser;

const PARSER_LOOKAHEAD: usize = "PX 1234 1234 rrggbbaa\n".len(); // Longest possible command

#[derive(Default)]
pub struct AssemblerParser {}

impl Parser for AssemblerParser {
async fn parse(
&mut self,
buffer: &[u8],
_stream: impl AsyncWriteExt + Send + Unpin,
) -> Result<usize, ParserError> {
fn parse(&mut self, buffer: &[u8], _response: &mut Vec<u8>) -> usize {
let mut last_byte_parsed = 0;

// This loop does nothing and should be seen as a placeholder
Expand All @@ -33,7 +27,7 @@ impl Parser for AssemblerParser {
)
}

Ok(last_byte_parsed)
last_byte_parsed
}

fn parser_lookahead(&self) -> usize {
Expand Down
17 changes: 2 additions & 15 deletions breakwater-parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,17 @@
#![feature(portable_simd)]

use enum_dispatch::enum_dispatch;
use snafu::Snafu;
use tokio::io::AsyncWriteExt;

#[cfg(target_arch = "x86_64")]
pub mod assembler;
pub mod memchr;
pub mod original;
pub mod refactored;

#[derive(Debug, Snafu)]
pub enum ParserError {
#[snafu(display("Failed to write to TCP socket"))]
WriteToTcpSocket { source: std::io::Error },
}

#[enum_dispatch(ParserImplementation)]
// According to https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html
#[trait_variant::make(SendParser: Send)]
pub trait Parser {
async fn parse(
&mut self,
buffer: &[u8],
stream: impl AsyncWriteExt + Send + Unpin,
) -> Result<usize, ParserError>;
/// Returns the last byte parsed. The next parsing loop will again contain all data that was not parsed.
fn parse(&mut self, buffer: &[u8], response: &mut Vec<u8>) -> usize;

// Sadly this cant be const (yet?) (https://github.com/rust-lang/rust/issues/71971 and https://github.com/rust-lang/rfcs/pull/2632)
fn parser_lookahead(&self) -> usize;
Expand Down
11 changes: 3 additions & 8 deletions breakwater-parser/src/memchr.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::sync::Arc;

use breakwater_core::framebuffer::FrameBuffer;
use tokio::io::AsyncWriteExt;

use crate::{Parser, ParserError};
use crate::Parser;

pub struct MemchrParser {
fb: Arc<FrameBuffer>,
Expand All @@ -16,11 +15,7 @@ impl MemchrParser {
}

impl Parser for MemchrParser {
async fn parse(
&mut self,
buffer: &[u8],
_stream: impl AsyncWriteExt + Send + Unpin,
) -> Result<usize, ParserError> {
fn parse(&mut self, buffer: &[u8], _response: &mut Vec<u8>) -> usize {
let mut last_char_after_newline = 0;
for newline in memchr::memchr_iter(b'\n', buffer) {
// TODO Use get_unchecked everywhere
Expand Down Expand Up @@ -68,7 +63,7 @@ impl Parser for MemchrParser {
}
}

Ok(last_char_after_newline.saturating_sub(1))
last_char_after_newline.saturating_sub(1)
}

fn parser_lookahead(&self) -> usize {
Expand Down
70 changes: 27 additions & 43 deletions breakwater-parser/src/original.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use std::{
};

use breakwater_core::{framebuffer::FrameBuffer, ALT_HELP_TEXT, HELP_TEXT};
use tokio::io::AsyncWriteExt;

use crate::{Parser, ParserError};
use crate::Parser;

pub const PARSER_LOOKAHEAD: usize = "PX 1234 1234 rrggbbaa\n".len(); // Longest possible command

Expand All @@ -32,11 +31,7 @@ impl OriginalParser {
}

impl Parser for OriginalParser {
async fn parse(
&mut self,
buffer: &[u8],
mut stream: impl AsyncWriteExt + Send + Unpin,
) -> Result<usize, ParserError> {
fn parse(&mut self, buffer: &[u8], response: &mut Vec<u8>) -> usize {
let mut last_byte_parsed = 0;
let mut help_count = 0;

Expand Down Expand Up @@ -131,22 +126,16 @@ impl Parser for OriginalParser {
last_byte_parsed = i;
i += 1;
if let Some(rgb) = self.fb.get(x, y) {
match stream
.write_all(
format!(
"PX {} {} {:06x}\n",
// We don't want to return the actual (absolute) coordinates, the client should also get the result offseted
x - self.connection_x_offset,
y - self.connection_y_offset,
rgb.to_be() >> 8
)
.as_bytes(),
response.extend_from_slice(
format!(
"PX {} {} {:06x}\n",
// We don't want to return the actual (absolute) coordinates, the client should also get the result offseted
x - self.connection_x_offset,
y - self.connection_y_offset,
rgb.to_be() >> 8
)
.await
{
Ok(_) => (),
Err(_) => continue,
}
.as_bytes(),
);
}
continue;
}
Expand All @@ -167,39 +156,34 @@ impl Parser for OriginalParser {
i += 4;
last_byte_parsed = i - 1;

stream
.write_all(
format!("SIZE {} {}\n", self.fb.get_width(), self.fb.get_height())
.as_bytes(),
)
.await
.expect("Failed to write bytes to tcp socket");
response.extend_from_slice(
format!("SIZE {} {}\n", self.fb.get_width(), self.fb.get_height()).as_bytes(),
);
continue;
} else if current_command & 0xffff_ffff == HELP_PATTERN {
i += 4;
last_byte_parsed = i - 1;

#[allow(clippy::comparison_chain)]
if help_count < 3 {
stream
.write_all(HELP_TEXT)
.await
.expect("Failed to write bytes to tcp socket");
help_count += 1;
} else if help_count == 3 {
stream
.write_all(ALT_HELP_TEXT)
.await
.expect("Failed to write bytes to tcp socket");
help_count += 1;
match help_count {
0..=2 => {
response.extend_from_slice(HELP_TEXT);
help_count += 1;
}
3 => {
response.extend_from_slice(ALT_HELP_TEXT);
help_count += 1;
}
_ => {
// The client has requested the help to often, let's just ignore it
}
}
continue;
}

i += 1;
}

Ok(last_byte_parsed.wrapping_sub(1))
last_byte_parsed.wrapping_sub(1)
}

fn parser_lookahead(&self) -> usize {
Expand Down
Loading
Loading