Skip to content

Commit

Permalink
Generalize parser so that it's independent of stream
Browse files Browse the repository at this point in the history
  • Loading branch information
fabi321 committed May 30, 2024
1 parent 07d827b commit ddf9e16
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 70 deletions.
7 changes: 4 additions & 3 deletions breakwater-parser/benches/parsing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{sync::Arc, time::Duration};
use std::sync::mpsc::channel;

use breakwater_core::{framebuffer::FrameBuffer, test_helpers::DevNullTcpStream};
use breakwater_core::{framebuffer::FrameBuffer};
#[cfg(target_arch = "x86_64")]
use breakwater_parser::assembler::AssemblerParser;
use breakwater_parser::{
Expand Down Expand Up @@ -82,6 +83,7 @@ fn invoke_benchmark(
c_group.bench_with_input(parse_name, &commands, |b, input| {
b.to_async(tokio::runtime::Runtime::new().expect("Failed to start tokio runtime"))
.iter(|| async {
let (message_sender, _) = channel();
let mut parser = match parse_name {
"original" => {
ParserImplementation::Original(OriginalParser::new(fb.clone()))
Expand All @@ -96,8 +98,7 @@ fn invoke_benchmark(
};

parser
.parse(input, DevNullTcpStream::default())
.await
.parse(input, &message_sender)
.expect("Failed to parse commands");
});
});
Expand Down
7 changes: 3 additions & 4 deletions breakwater-parser/src/assembler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::arch::asm;

use tokio::io::AsyncWriteExt;
use std::sync::mpsc::Sender;

use crate::{Parser, ParserError};

Expand All @@ -10,10 +9,10 @@ const PARSER_LOOKAHEAD: usize = "PX 1234 1234 rrggbbaa\n".len(); // Longest poss
pub struct AssemblerParser {}

impl Parser for AssemblerParser {
async fn parse(
fn parse(
&mut self,
buffer: &[u8],
_stream: impl AsyncWriteExt + Send + Unpin,
_message_sender: &Sender<Box<[u8]>>,
) -> Result<usize, ParserError> {
let mut last_byte_parsed = 0;

Expand Down
8 changes: 4 additions & 4 deletions breakwater-parser/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Needed for simple implementation
#![feature(portable_simd)]

use std::sync::mpsc::Sender;
use enum_dispatch::enum_dispatch;
use snafu::Snafu;
use tokio::io::AsyncWriteExt;

#[cfg(target_arch = "x86_64")]
pub mod assembler;
Expand All @@ -14,17 +14,17 @@ pub mod refactored;
#[derive(Debug, Snafu)]
pub enum ParserError {
#[snafu(display("Failed to write to TCP socket"))]
WriteToTcpSocket { source: std::io::Error },
WriteToTcpSocket { source: std::sync::mpsc::SendError<Box<[u8]>> },
}

#[enum_dispatch(ParserImplementation)]
// According to https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html
#[trait_variant::make(SendParser: Send)]
pub trait Parser {
async fn parse(
fn parse(
&mut self,
buffer: &[u8],
stream: impl AsyncWriteExt + Send + Unpin,
message_sender: &Sender<Box<[u8]>>,
) -> Result<usize, ParserError>;

// Sadly this cant be const (yet?) (https://github.com/rust-lang/rust/issues/71971 and https://github.com/rust-lang/rfcs/pull/2632)
Expand Down
6 changes: 3 additions & 3 deletions breakwater-parser/src/memchr.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::sync::mpsc::Sender;

use breakwater_core::framebuffer::FrameBuffer;
use tokio::io::AsyncWriteExt;

use crate::{Parser, ParserError};

Expand All @@ -16,10 +16,10 @@ impl MemchrParser {
}

impl Parser for MemchrParser {
async fn parse(
fn parse(
&mut self,
buffer: &[u8],
_stream: impl AsyncWriteExt + Send + Unpin,
_message_sender: &Sender<Box<[u8]>>,
) -> Result<usize, ParserError> {
let mut last_char_after_newline = 0;
for newline in memchr::memchr_iter(b'\n', buffer) {
Expand Down
45 changes: 17 additions & 28 deletions breakwater-parser/src/original.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::{
simd::{num::SimdUint, u32x8, Simd},
sync::Arc,
};
use std::sync::mpsc::Sender;

use breakwater_core::{framebuffer::FrameBuffer, HELP_TEXT};
use tokio::io::AsyncWriteExt;

use crate::{Parser, ParserError};

Expand Down Expand Up @@ -32,10 +32,10 @@ impl OriginalParser {
}

impl Parser for OriginalParser {
async fn parse(
fn parse(
&mut self,
buffer: &[u8],
mut stream: impl AsyncWriteExt + Send + Unpin,
message_sender: &Sender<Box<[u8]>>,
) -> Result<usize, ParserError> {
let mut last_byte_parsed = 0;

Expand Down Expand Up @@ -130,22 +130,15 @@ impl Parser for OriginalParser {
last_byte_parsed = i;
i += 1;
if let Some(rgb) = self.fb.get(x, y) {
match stream
.write_all(
format!(
"PX {} {} {:06x}\n",
// We don't want to return the actual (absolute) coordinates, the client should also get the result offseted
x - self.connection_x_offset,
y - self.connection_y_offset,
rgb.to_be() >> 8
)
.as_bytes(),
)
.await
{
Ok(_) => (),
Err(_) => continue,
}
message_sender.send(
format!(
"PX {} {} {:06x}\n",
// We don't want to return the actual (absolute) coordinates, the client should also get the result offseted
x - self.connection_x_offset,
y - self.connection_y_offset,
rgb.to_be() >> 8
).into_boxed_str().into_boxed_bytes()
).expect("Failed to write message");
}
continue;
}
Expand All @@ -166,22 +159,18 @@ impl Parser for OriginalParser {
i += 4;
last_byte_parsed = i - 1;

stream
.write_all(
message_sender.send(
format!("SIZE {} {}\n", self.fb.get_width(), self.fb.get_height())
.as_bytes(),
.into_boxed_str().into_boxed_bytes()
)
.await
.expect("Failed to write bytes to tcp socket");
.expect("Failed to write message");
continue;
} else if current_command & 0xffff_ffff == HELP_PATTERN {
i += 4;
last_byte_parsed = i - 1;

stream
.write_all(HELP_TEXT)
.await
.expect("Failed to write bytes to tcp socket");
message_sender.send(HELP_TEXT.into())
.expect("Failed to write message");
continue;
}

Expand Down
49 changes: 23 additions & 26 deletions breakwater-parser/src/refactored.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use breakwater_core::{framebuffer::FrameBuffer, HELP_TEXT};
use snafu::ResultExt;
use tokio::io::AsyncWriteExt;
use std::sync::mpsc::Sender;

use crate::{
original::{
Expand All @@ -29,11 +29,11 @@ impl RefactoredParser {
}

#[inline(always)]
async fn handle_pixel(
fn handle_pixel(
&self,
buffer: &[u8],
mut idx: usize,
stream: &mut (impl AsyncWriteExt + Send + Unpin),
message_sender: &Sender<Box<[u8]>>,
) -> Result<(usize, usize), ParserError> {
let previous = idx;
idx += 3;
Expand Down Expand Up @@ -75,7 +75,7 @@ impl RefactoredParser {
// End of command to read Pixel value
else if unsafe { *buffer.get_unchecked(idx) } == b'\n' {
idx += 1;
self.handle_get_pixel(stream, x, y).await?;
self.handle_get_pixel(message_sender, x, y)?;
Ok((idx, idx))
} else {
Ok((idx, previous))
Expand All @@ -97,27 +97,25 @@ impl RefactoredParser {
}

#[inline(always)]
async fn handle_size(
fn handle_size(
&self,
stream: &mut (impl AsyncWriteExt + Send + Unpin),
message_sender: &Sender<Box<[u8]>>,
) -> Result<(), ParserError> {
stream
.write_all(
format!("SIZE {} {}\n", self.fb.get_width(), self.fb.get_height()).as_bytes(),
message_sender
.send(
format!("SIZE {} {}\n", self.fb.get_width(), self.fb.get_height()).into_boxed_str().into_boxed_bytes(),
)
.await
.context(crate::WriteToTcpSocketSnafu)?;
Ok(())
}

#[inline(always)]
async fn handle_help(
fn handle_help(
&self,
stream: &mut (impl AsyncWriteExt + Send + Unpin),
message_sender: &Sender<Box<[u8]>>,
) -> Result<(), ParserError> {
stream
.write_all(HELP_TEXT)
.await
message_sender
.send(HELP_TEXT.into())
.context(crate::WriteToTcpSocketSnafu)?;
Ok(())
}
Expand Down Expand Up @@ -173,36 +171,35 @@ impl RefactoredParser {
}

#[inline(always)]
async fn handle_get_pixel(
fn handle_get_pixel(
&self,
stream: &mut (impl AsyncWriteExt + Send + Unpin),
message_sender: &Sender<Box<[u8]>>,
x: usize,
y: usize,
) -> Result<(), ParserError> {
if let Some(rgb) = self.fb.get(x, y) {
stream
.write_all(
message_sender
.send(
format!(
"PX {} {} {:06x}\n",
// We don't want to return the actual (absolute) coordinates, the client should also get the result offseted
x - self.connection_x_offset,
y - self.connection_y_offset,
rgb.to_be() >> 8
)
.as_bytes(),
.into_boxed_str().into_boxed_bytes(),
)
.await
.context(crate::WriteToTcpSocketSnafu)?;
}
Ok(())
}
}

impl Parser for RefactoredParser {
async fn parse(
fn parse(
&mut self,
buffer: &[u8],
mut stream: impl AsyncWriteExt + Send + Unpin,
message_sender: &Sender<Box<[u8]>>,
) -> Result<usize, ParserError> {
let mut last_byte_parsed = 0;

Expand All @@ -213,19 +210,19 @@ impl Parser for RefactoredParser {
let current_command =
unsafe { (buffer.as_ptr().add(i) as *const u64).read_unaligned() };
if current_command & 0x00ff_ffff == PX_PATTERN {
(i, last_byte_parsed) = self.handle_pixel(buffer, i, &mut stream).await?;
(i, last_byte_parsed) = self.handle_pixel(buffer, i, message_sender)?;
} else if current_command & 0x00ff_ffff_ffff_ffff == OFFSET_PATTERN {
i += 7;
self.handle_offset(&mut i, buffer);
last_byte_parsed = i;
} else if current_command & 0xffff_ffff == SIZE_PATTERN {
i += 4;
last_byte_parsed = i;
self.handle_size(&mut stream).await?;
self.handle_size(message_sender)?;
} else if current_command & 0xffff_ffff == HELP_PATTERN {
i += 4;
last_byte_parsed = i;
self.handle_help(&mut stream).await?;
self.handle_help(message_sender)?;
} else {
i += 1;
}
Expand Down
10 changes: 8 additions & 2 deletions breakwater/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::{cmp::min, net::IpAddr, sync::Arc, time::Duration};
use std::sync::mpsc::channel;

use breakwater_core::framebuffer::FrameBuffer;
use breakwater_parser::{original::OriginalParser, Parser, ParserError};
Expand Down Expand Up @@ -155,6 +156,8 @@ pub async fn handle_connection(
// Instead we bulk the statistics and send them pre-aggregated.
let mut last_statistics = Instant::now();
let mut statistics_bytes_read: u64 = 0;

let (message_sender, message_receiver) = channel::<Box<[u8]>>();

loop {
// Fill the buffer up with new data from the socket
Expand Down Expand Up @@ -204,8 +207,7 @@ pub async fn handle_connection(
}

let last_byte_parsed = parser
.parse(&buffer[..data_end + parser_lookahead], &mut stream)
.await
.parse(&buffer[..data_end + parser_lookahead], &message_sender)
.context(ParsePixelflutCommandsSnafu)?;

// IMPORTANT: We have to subtract 1 here, as e.g. we have "PX 0 0\n" data_end is 7 and parser_state.last_byte_parsed is 6.
Expand All @@ -223,6 +225,10 @@ pub async fn handle_connection(
0,
);
}

while let Ok(message) = message_receiver.try_recv() {
stream.write_all(message.as_ref()).await.expect("Failed to write to tcp stream")
}
}
}

Expand Down

0 comments on commit ddf9e16

Please sign in to comment.