Skip to content

Commit

Permalink
as much as it hurts, async is holding us back
Browse files Browse the repository at this point in the history
About my testing: for some reason, I can't push loopback further than ~40Gbit/s on my machine, no matter the connections.

On the other hand, if I only flood with a single connection, I get ~10Gbit/s with sturmflut with the async version, with breakwater running at 102% cpu. With the sync version, I get ~14Gbit/s, still at 102% cpu usage.

I'm not saying that I like the sync version more than the async version, but it is definitely noticeably faster. If I hammer them with 10 connections, both versions perform similar in terms of bandwidth.
  • Loading branch information
fabi321 committed Jun 17, 2023
1 parent 74e95c9 commit 2f2adf8
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 31 deletions.
11 changes: 6 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use breakwater::{
use clap::Parser;
use env_logger::Env;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use std::sync::mpsc;
use tokio::sync::{broadcast};
#[cfg(feature = "vnc")]
use {
breakwater::sinks::vnc::VncServer,
Expand All @@ -27,7 +28,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// If we make the channel to big, stats will start to lag behind
// TODO: Check performance impact in real-world scenario. Maybe the statistics thread blocks the other threads
let (statistics_tx, statistics_rx) = mpsc::channel::<StatisticsEvent>(100);
let (statistics_tx, statistics_rx) = mpsc::channel::<StatisticsEvent>();
let (statistics_information_tx, statistics_information_rx_for_prometheus_exporter) =
broadcast::channel::<StatisticsInformationEvent>(2);
#[cfg(feature = "vnc")]
Expand All @@ -48,8 +49,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
)?;

let network = Network::new(&args.listen_address, Arc::clone(&fb), statistics_tx.clone());
let network_listener_thread = tokio::spawn(async move {
network.listen().await.unwrap();
let network_listener_thread = std::thread::spawn(move || {
network.listen().unwrap();
});

let ffmpeg_sink = FfmpegSink::new(&args, Arc::clone(&fb));
Expand Down Expand Up @@ -94,7 +95,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

prometheus_exporter_thread.await?;
network_listener_thread.await?;
network_listener_thread.join().unwrap();
if let Some(ffmpeg_thread) = ffmpeg_thread {
ffmpeg_thread.await?;
}
Expand Down
25 changes: 9 additions & 16 deletions src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ use std::{
sync::Arc,
time::Duration,
};
use std::{net::TcpListener, io::{Read, Write}, sync::mpsc::Sender};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
sync::mpsc::Sender,
time::Instant,
};

Expand All @@ -40,27 +38,27 @@ impl Network {
}
}

pub async fn listen(&self) -> tokio::io::Result<()> {
let listener = TcpListener::bind(&self.listen_address).await?;
pub fn listen(&self) -> std::io::Result<()> {
let listener = TcpListener::bind(&self.listen_address)?;
info!("Started Pixelflut server on {}", self.listen_address);

loop {
let (socket, socket_addr) = listener.accept().await?;
let (socket, socket_addr) = listener.accept()?;
// If you connect via IPv4 you often show up as embedded inside an IPv6 address
// Extracting the embedded information here, so we get the real (TM) address
let ip = ip_to_canonical(socket_addr.ip());

let fb_for_thread = Arc::clone(&self.fb);
let statistics_tx_for_thread = self.statistics_tx.clone();
tokio::spawn(async move {
handle_connection(socket, ip, fb_for_thread, statistics_tx_for_thread).await;
std::thread::spawn(move || {
handle_connection(socket, ip, fb_for_thread, statistics_tx_for_thread);
});
}
}
}

pub async fn handle_connection(
mut stream: impl AsyncReadExt + AsyncWriteExt + Unpin,
pub fn handle_connection(
mut stream: impl Read + Write + Unpin,
ip: IpAddr,
fb: Arc<FrameBuffer>,
statistics_tx: Sender<StatisticsEvent>,
Expand All @@ -69,7 +67,6 @@ pub async fn handle_connection(

statistics_tx
.send(StatisticsEvent::ConnectionCreated { ip })
.await
.expect("Statistics channel disconnected");

// TODO: Try performance of Vec<> on heap instead of stack. Also bigger buffer
Expand All @@ -90,7 +87,6 @@ pub async fn handle_connection(
// If there are any bytes left over from the previous loop iteration leave them as is and but the new data behind
let bytes_read = match stream
.read(&mut buffer[leftover_bytes_in_buffer..NETWORK_BUFFER_SIZE - PARSER_LOOKAHEAD])
.await
{
Ok(bytes_read) => bytes_read,
Err(_) => {
Expand All @@ -108,7 +104,6 @@ pub async fn handle_connection(
ip,
bytes: statistics_bytes_read,
})
.await
.expect("Statistics channel disconnected");
last_statistics = Instant::now();
statistics_bytes_read = 0;
Expand Down Expand Up @@ -137,8 +132,7 @@ pub async fn handle_connection(
&fb,
&mut stream,
parser_state,
)
.await;
);

// IMPORTANT: We have to subtract 1 here, as e.g. we have "PX 0 0\n" data_end is 7 and parser_state.last_byte_parsed is 6.
// This happens, because last_byte_parsed is an index starting at 0, so index 6 is from an array of length 7
Expand All @@ -161,7 +155,6 @@ pub async fn handle_connection(

statistics_tx
.send(StatisticsEvent::ConnectionClosed { ip })
.await
.expect("Statistics channel disconnected");
}

Expand Down
9 changes: 3 additions & 6 deletions src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use const_format::formatcp;
use log::{info, warn};
use std::simd::{u32x8, Simd, SimdUint};
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use std::io::Write;

pub const PARSER_LOOKAHEAD: usize = "PX 1234 1234 rrggbbaa\n".len(); // Longest possible command
pub const HELP_TEXT: &[u8] = formatcp!("\
Expand Down Expand Up @@ -53,10 +53,10 @@ const fn string_to_number(input: &[u8]) -> u64 {
/// TODO: Implement support for 16K (15360 × 8640).
/// Currently the parser only can read up to 4 digits of x or y coordinates.
/// If you buy me a big enough screen I will kindly implement this feature.
pub async fn parse_pixelflut_commands(
pub fn parse_pixelflut_commands(
buffer: &[u8],
fb: &Arc<FrameBuffer>,
mut stream: impl AsyncWriteExt + Unpin,
mut stream: impl Write + Unpin,
// We don't pass this as mutual reference but instead hand it around - I guess on the stack?
// I don't know what I'm doing, hoping for best performance anyway ;)
parser_state: ParserState,
Expand Down Expand Up @@ -223,7 +223,6 @@ pub async fn parse_pixelflut_commands(
)
.as_bytes(),
)
.await
{
Ok(_) => (),
Err(_) => continue,
Expand Down Expand Up @@ -302,7 +301,6 @@ pub async fn parse_pixelflut_commands(

stream
.write_all(format!("SIZE {} {}\n", fb.get_width(), fb.get_height()).as_bytes())
.await
.expect("Failed to write bytes to tcp socket");
continue;
} else if current_command & 0xffff_ffff == string_to_number(b"HELP\0\0\0\0") {
Expand All @@ -311,7 +309,6 @@ pub async fn parse_pixelflut_commands(

stream
.write_all(HELP_TEXT)
.await
.expect("Failed to write bytes to tcp socket");
continue;
}
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/vnc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use rusttype::{point, Font, Scale};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Sender;
use std::sync::mpsc::Sender;
use vncserver::{
rfb_framebuffer_malloc, rfb_get_screen, rfb_init_server, rfb_mark_rect_as_modified,
rfb_run_event_loop, RfbScreenInfoPtr,
Expand Down Expand Up @@ -105,7 +105,7 @@ impl<'a> VncServer<'a> {
height_up_to_stats_text as i32,
);
self.statistics_tx
.blocking_send(StatisticsEvent::FrameRendered)
.send(StatisticsEvent::FrameRendered)
.unwrap();

if !self.statistics_information_rx.is_empty() {
Expand Down
5 changes: 3 additions & 2 deletions src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::{
cmp::max,
collections::{hash_map::Entry, HashMap},
fs::File,
sync::mpsc::Receiver,
net::IpAddr,
time::{Duration, Instant},
};
use tokio::sync::{broadcast, mpsc::Receiver};
use tokio::sync::{broadcast};

pub const STATS_REPORT_INTERVAL: Duration = Duration::from_millis(1000);
pub const STATS_SLIDING_WINDOW_SIZE: usize = 5;
Expand Down Expand Up @@ -106,7 +107,7 @@ impl Statistics {
let mut last_save_file_written = Instant::now();
let mut statistics_information_event = StatisticsInformationEvent::default();

while let Some(statistics_update) = self.statistics_rx.recv().await {
while let Ok(statistics_update) = self.statistics_rx.recv() {
self.statistic_events += 1;
match statistics_update {
StatisticsEvent::ConnectionCreated { ip } => {
Expand Down

0 comments on commit 2f2adf8

Please sign in to comment.