Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

as much as it hurts, async is holding us back #9

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions benches/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use std::{sync::Arc, time::Duration};
const FRAMEBUFFER_WIDTH: usize = 1920;
const FRAMEBUFFER_HEIGHT: usize = 1080;

async fn invoke_parse_pixelflut_commands(
fn invoke_parse_pixelflut_commands(
input: &[u8],
fb: &Arc<FrameBuffer>,
parser_state: ParserState,
) {
let mut stream = DevNullTcpStream::default();
parse_pixelflut_commands(input, fb, &mut stream, parser_state).await;
parse_pixelflut_commands(input, fb, &mut stream, parser_state);
}

fn from_elem(c: &mut Criterion) {
Expand All @@ -33,8 +33,7 @@ fn from_elem(c: &mut Criterion) {
|b, input| {
let fb = Arc::new(FrameBuffer::new(FRAMEBUFFER_WIDTH, FRAMEBUFFER_HEIGHT));
let parser_state = ParserState::default();
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter(|| invoke_parse_pixelflut_commands(input, &fb, parser_state.clone()));
b.iter(|| invoke_parse_pixelflut_commands(input, &fb, parser_state.clone()));
},
);

Expand Down
11 changes: 6 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use breakwater::{
use clap::Parser;
use env_logger::Env;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use std::sync::mpsc;
use tokio::sync::{broadcast};
#[cfg(feature = "vnc")]
use {
breakwater::sinks::vnc::VncServer,
Expand All @@ -27,7 +28,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// If we make the channel to big, stats will start to lag behind
// TODO: Check performance impact in real-world scenario. Maybe the statistics thread blocks the other threads
let (statistics_tx, statistics_rx) = mpsc::channel::<StatisticsEvent>(100);
let (statistics_tx, statistics_rx) = mpsc::channel::<StatisticsEvent>();
let (statistics_information_tx, statistics_information_rx_for_prometheus_exporter) =
broadcast::channel::<StatisticsInformationEvent>(2);
#[cfg(feature = "vnc")]
Expand All @@ -48,8 +49,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
)?;

let network = Network::new(&args.listen_address, Arc::clone(&fb), statistics_tx.clone());
let network_listener_thread = tokio::spawn(async move {
network.listen().await.unwrap();
let network_listener_thread = std::thread::spawn(move || {
network.listen().unwrap();
});

let ffmpeg_sink = FfmpegSink::new(&args, Arc::clone(&fb));
Expand Down Expand Up @@ -94,7 +95,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

prometheus_exporter_thread.await?;
network_listener_thread.await?;
network_listener_thread.join().unwrap();
if let Some(ffmpeg_thread) = ffmpeg_thread {
ffmpeg_thread.await?;
}
Expand Down
63 changes: 26 additions & 37 deletions src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ use std::{
sync::Arc,
time::Duration,
};
use std::{net::TcpListener, io::{Read, Write}, sync::mpsc::Sender};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
sync::mpsc::Sender,
time::Instant,
};

Expand All @@ -40,27 +38,27 @@ impl Network {
}
}

pub async fn listen(&self) -> tokio::io::Result<()> {
let listener = TcpListener::bind(&self.listen_address).await?;
pub fn listen(&self) -> std::io::Result<()> {
let listener = TcpListener::bind(&self.listen_address)?;
info!("Started Pixelflut server on {}", self.listen_address);

loop {
let (socket, socket_addr) = listener.accept().await?;
let (socket, socket_addr) = listener.accept()?;
// If you connect via IPv4 you often show up as embedded inside an IPv6 address
// Extracting the embedded information here, so we get the real (TM) address
let ip = ip_to_canonical(socket_addr.ip());

let fb_for_thread = Arc::clone(&self.fb);
let statistics_tx_for_thread = self.statistics_tx.clone();
tokio::spawn(async move {
handle_connection(socket, ip, fb_for_thread, statistics_tx_for_thread).await;
std::thread::spawn(move || {
handle_connection(socket, ip, fb_for_thread, statistics_tx_for_thread);
});
}
}
}

pub async fn handle_connection(
mut stream: impl AsyncReadExt + AsyncWriteExt + Unpin,
pub fn handle_connection(
mut stream: impl Read + Write + Unpin,
ip: IpAddr,
fb: Arc<FrameBuffer>,
statistics_tx: Sender<StatisticsEvent>,
Expand All @@ -69,7 +67,6 @@ pub async fn handle_connection(

statistics_tx
.send(StatisticsEvent::ConnectionCreated { ip })
.await
.expect("Statistics channel disconnected");

// TODO: Try performance of Vec<> on heap instead of stack. Also bigger buffer
Expand All @@ -90,7 +87,6 @@ pub async fn handle_connection(
// If there are any bytes left over from the previous loop iteration leave them as is and but the new data behind
let bytes_read = match stream
.read(&mut buffer[leftover_bytes_in_buffer..NETWORK_BUFFER_SIZE - PARSER_LOOKAHEAD])
.await
{
Ok(bytes_read) => bytes_read,
Err(_) => {
Expand All @@ -108,7 +104,6 @@ pub async fn handle_connection(
ip,
bytes: statistics_bytes_read,
})
.await
.expect("Statistics channel disconnected");
last_statistics = Instant::now();
statistics_bytes_read = 0;
Expand Down Expand Up @@ -137,8 +132,7 @@ pub async fn handle_connection(
&fb,
&mut stream,
parser_state,
)
.await;
);

// IMPORTANT: We have to subtract 1 here, as e.g. we have "PX 0 0\n" data_end is 7 and parser_state.last_byte_parsed is 6.
// This happens, because last_byte_parsed is an index starting at 0, so index 6 is from an array of length 7
Expand All @@ -161,7 +155,6 @@ pub async fn handle_connection(

statistics_tx
.send(StatisticsEvent::ConnectionClosed { ip })
.await
.expect("Statistics channel disconnected");
}

Expand All @@ -185,7 +178,7 @@ mod test {
use crate::test::helpers::MockTcpStream;
use rstest::{fixture, rstest};
use std::time::Duration;
use tokio::sync::mpsc::{self, Receiver};
use std::sync::mpsc::{self, Receiver};

#[fixture]
fn ip() -> IpAddr {
Expand All @@ -199,7 +192,7 @@ mod test {

#[fixture]
fn statistics_channel() -> (Sender<StatisticsEvent>, Receiver<StatisticsEvent>) {
mpsc::channel(10000)
mpsc::channel()
}

#[rstest]
Expand All @@ -215,16 +208,16 @@ mod test {
#[case("HELP", std::str::from_utf8(crate::parser::HELP_TEXT).unwrap())]
#[case("HELP\n", std::str::from_utf8(crate::parser::HELP_TEXT).unwrap())]
#[case("bla bla bla\nSIZE\nblub\nbla", "SIZE 1920 1080\n")]
#[tokio::test]
async fn test_correct_responses_to_general_commands(
#[test]
fn test_correct_responses_to_general_commands(
#[case] input: &str,
#[case] expected: &str,
ip: IpAddr,
fb: Arc<FrameBuffer>,
statistics_channel: (Sender<StatisticsEvent>, Receiver<StatisticsEvent>),
) {
let mut stream = MockTcpStream::from_input(input);
handle_connection(&mut stream, ip, fb, statistics_channel.0).await;
handle_connection(&mut stream, ip, fb, statistics_channel.0);

assert_eq!(expected, stream.get_output());
}
Expand Down Expand Up @@ -269,32 +262,32 @@ mod test {
"PX 0 0 ffffff\nPX 42 42 000000\n"
)] // The get pixel result is also offseted
#[case("OFFSET 0 0\nPX 0 42 abcdef\nPX 0 42\n", "PX 0 42 abcdef\n")]
#[tokio::test]
async fn test_setting_pixel(
#[test]
fn test_setting_pixel(
#[case] input: &str,
#[case] expected: &str,
ip: IpAddr,
fb: Arc<FrameBuffer>,
statistics_channel: (Sender<StatisticsEvent>, Receiver<StatisticsEvent>),
) {
let mut stream = MockTcpStream::from_input(input);
handle_connection(&mut stream, ip, fb, statistics_channel.0).await;
handle_connection(&mut stream, ip, fb, statistics_channel.0);

assert_eq!(expected, stream.get_output());
}

#[rstest]
#[case("PX 0 0 aaaaaa\n")]
#[case("PX 0 0 aa\n")]
#[tokio::test]
async fn test_safe(
#[test]
fn test_safe(
#[case] input: &str,
ip: IpAddr,
fb: Arc<FrameBuffer>,
statistics_channel: (Sender<StatisticsEvent>, Receiver<StatisticsEvent>),
) {
let mut stream = MockTcpStream::from_input(input);
handle_connection(&mut stream, ip, fb.clone(), statistics_channel.0).await;
handle_connection(&mut stream, ip, fb.clone(), statistics_channel.0);
// Test if it panics
assert_eq!(fb.get(0, 0).unwrap() & 0x00ff_ffff, 0xaaaaaa);
}
Expand All @@ -315,8 +308,8 @@ mod test {
#[case(500, 500, 300, 400)]
#[case(fb().get_width(), fb().get_height(), 0, 0)]
#[case(fb().get_width() - 1, fb().get_height() - 1, 1, 1)]
#[tokio::test]
async fn test_drawing_rect(
#[test]
fn test_drawing_rect(
#[case] width: usize,
#[case] height: usize,
#[case] offset_x: usize,
Expand Down Expand Up @@ -361,8 +354,7 @@ mod test {
ip,
Arc::clone(&fb),
statistics_channel.0.clone(),
)
.await;
);
assert_eq!("", stream.get_output());

// Read the pixels again
Expand All @@ -372,8 +364,7 @@ mod test {
ip,
Arc::clone(&fb),
statistics_channel.0.clone(),
)
.await;
);
assert_eq!(fill_commands, stream.get_output());

// We can also do coloring and reading in a single connection
Expand All @@ -383,8 +374,7 @@ mod test {
ip,
Arc::clone(&fb),
statistics_channel.0.clone(),
)
.await;
);
assert_eq!(combined_commands_expected, stream.get_output());

// Check that nothing else was colored
Expand All @@ -394,8 +384,7 @@ mod test {
ip,
Arc::clone(&fb),
statistics_channel.0.clone(),
)
.await;
);
assert_eq!(read_other_pixels_commands_expected, stream.get_output());
}
}
9 changes: 3 additions & 6 deletions src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use const_format::formatcp;
use log::{info, warn};
use std::simd::{u32x8, Simd, SimdUint};
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use std::io::Write;

pub const PARSER_LOOKAHEAD: usize = "PX 1234 1234 rrggbbaa\n".len(); // Longest possible command
pub const HELP_TEXT: &[u8] = formatcp!("\
Expand Down Expand Up @@ -53,10 +53,10 @@ const fn string_to_number(input: &[u8]) -> u64 {
/// TODO: Implement support for 16K (15360 × 8640).
/// Currently the parser only can read up to 4 digits of x or y coordinates.
/// If you buy me a big enough screen I will kindly implement this feature.
pub async fn parse_pixelflut_commands(
pub fn parse_pixelflut_commands(
buffer: &[u8],
fb: &Arc<FrameBuffer>,
mut stream: impl AsyncWriteExt + Unpin,
mut stream: impl Write + Unpin,
// We don't pass this as mutual reference but instead hand it around - I guess on the stack?
// I don't know what I'm doing, hoping for best performance anyway ;)
parser_state: ParserState,
Expand Down Expand Up @@ -223,7 +223,6 @@ pub async fn parse_pixelflut_commands(
)
.as_bytes(),
)
.await
{
Ok(_) => (),
Err(_) => continue,
Expand Down Expand Up @@ -302,7 +301,6 @@ pub async fn parse_pixelflut_commands(

stream
.write_all(format!("SIZE {} {}\n", fb.get_width(), fb.get_height()).as_bytes())
.await
.expect("Failed to write bytes to tcp socket");
continue;
} else if current_command & 0xffff_ffff == string_to_number(b"HELP\0\0\0\0") {
Expand All @@ -311,7 +309,6 @@ pub async fn parse_pixelflut_commands(

stream
.write_all(HELP_TEXT)
.await
.expect("Failed to write bytes to tcp socket");
continue;
}
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/vnc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use rusttype::{point, Font, Scale};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Sender;
use std::sync::mpsc::Sender;
use vncserver::{
rfb_framebuffer_malloc, rfb_get_screen, rfb_init_server, rfb_mark_rect_as_modified,
rfb_run_event_loop, RfbScreenInfoPtr,
Expand Down Expand Up @@ -105,7 +105,7 @@ impl<'a> VncServer<'a> {
height_up_to_stats_text as i32,
);
self.statistics_tx
.blocking_send(StatisticsEvent::FrameRendered)
.send(StatisticsEvent::FrameRendered)
.unwrap();

if !self.statistics_information_rx.is_empty() {
Expand Down
5 changes: 3 additions & 2 deletions src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::{
cmp::max,
collections::{hash_map::Entry, HashMap},
fs::File,
sync::mpsc::Receiver,
net::IpAddr,
time::{Duration, Instant},
};
use tokio::sync::{broadcast, mpsc::Receiver};
use tokio::sync::{broadcast};

pub const STATS_REPORT_INTERVAL: Duration = Duration::from_millis(1000);
pub const STATS_SLIDING_WINDOW_SIZE: usize = 5;
Expand Down Expand Up @@ -106,7 +107,7 @@ impl Statistics {
let mut last_save_file_written = Instant::now();
let mut statistics_information_event = StatisticsInformationEvent::default();

while let Some(statistics_update) = self.statistics_rx.recv().await {
while let Ok(statistics_update) = self.statistics_rx.recv() {
self.statistic_events += 1;
match statistics_update {
StatisticsEvent::ConnectionCreated { ip } => {
Expand Down
Loading