diff --git a/benches/benchmarks.rs b/benches/benchmarks.rs index 4e1f080..d08a593 100644 --- a/benches/benchmarks.rs +++ b/benches/benchmarks.rs @@ -11,13 +11,13 @@ use std::{sync::Arc, time::Duration}; const FRAMEBUFFER_WIDTH: usize = 1920; const FRAMEBUFFER_HEIGHT: usize = 1080; -async fn invoke_parse_pixelflut_commands( +fn invoke_parse_pixelflut_commands( input: &[u8], fb: &Arc, parser_state: ParserState, ) { let mut stream = DevNullTcpStream::default(); - parse_pixelflut_commands(input, fb, &mut stream, parser_state).await; + parse_pixelflut_commands(input, fb, &mut stream, parser_state); } fn from_elem(c: &mut Criterion) { @@ -33,8 +33,7 @@ fn from_elem(c: &mut Criterion) { |b, input| { let fb = Arc::new(FrameBuffer::new(FRAMEBUFFER_WIDTH, FRAMEBUFFER_HEIGHT)); let parser_state = ParserState::default(); - b.to_async(tokio::runtime::Runtime::new().unwrap()) - .iter(|| invoke_parse_pixelflut_commands(input, &fb, parser_state.clone())); + b.iter(|| invoke_parse_pixelflut_commands(input, &fb, parser_state.clone())); }, ); diff --git a/src/main.rs b/src/main.rs index 14d2339..f69f017 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,8 @@ use breakwater::{ use clap::Parser; use env_logger::Env; use std::sync::Arc; -use tokio::sync::{broadcast, mpsc}; +use std::sync::mpsc; +use tokio::sync::{broadcast}; #[cfg(feature = "vnc")] use { breakwater::sinks::vnc::VncServer, @@ -27,7 +28,7 @@ async fn main() -> Result<(), Box> { // If we make the channel to big, stats will start to lag behind // TODO: Check performance impact in real-world scenario. Maybe the statistics thread blocks the other threads - let (statistics_tx, statistics_rx) = mpsc::channel::(100); + let (statistics_tx, statistics_rx) = mpsc::channel::(); let (statistics_information_tx, statistics_information_rx_for_prometheus_exporter) = broadcast::channel::(2); #[cfg(feature = "vnc")] @@ -48,8 +49,8 @@ async fn main() -> Result<(), Box> { )?; let network = Network::new(&args.listen_address, Arc::clone(&fb), statistics_tx.clone()); - let network_listener_thread = tokio::spawn(async move { - network.listen().await.unwrap(); + let network_listener_thread = std::thread::spawn(move || { + network.listen().unwrap(); }); let ffmpeg_sink = FfmpegSink::new(&args, Arc::clone(&fb)); @@ -94,7 +95,7 @@ async fn main() -> Result<(), Box> { }); prometheus_exporter_thread.await?; - network_listener_thread.await?; + network_listener_thread.join().unwrap(); if let Some(ffmpeg_thread) = ffmpeg_thread { ffmpeg_thread.await?; } diff --git a/src/network.rs b/src/network.rs index e299931..3a6a8a8 100644 --- a/src/network.rs +++ b/src/network.rs @@ -10,10 +10,8 @@ use std::{ sync::Arc, time::Duration, }; +use std::{net::TcpListener, io::{Read, Write}, sync::mpsc::Sender}; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::TcpListener, - sync::mpsc::Sender, time::Instant, }; @@ -40,27 +38,27 @@ impl Network { } } - pub async fn listen(&self) -> tokio::io::Result<()> { - let listener = TcpListener::bind(&self.listen_address).await?; + pub fn listen(&self) -> std::io::Result<()> { + let listener = TcpListener::bind(&self.listen_address)?; info!("Started Pixelflut server on {}", self.listen_address); loop { - let (socket, socket_addr) = listener.accept().await?; + let (socket, socket_addr) = listener.accept()?; // If you connect via IPv4 you often show up as embedded inside an IPv6 address // Extracting the embedded information here, so we get the real (TM) address let ip = ip_to_canonical(socket_addr.ip()); let fb_for_thread = Arc::clone(&self.fb); let statistics_tx_for_thread = self.statistics_tx.clone(); - tokio::spawn(async move { - handle_connection(socket, ip, fb_for_thread, statistics_tx_for_thread).await; + std::thread::spawn(move || { + handle_connection(socket, ip, fb_for_thread, statistics_tx_for_thread); }); } } } -pub async fn handle_connection( - mut stream: impl AsyncReadExt + AsyncWriteExt + Unpin, +pub fn handle_connection( + mut stream: impl Read + Write + Unpin, ip: IpAddr, fb: Arc, statistics_tx: Sender, @@ -69,7 +67,6 @@ pub async fn handle_connection( statistics_tx .send(StatisticsEvent::ConnectionCreated { ip }) - .await .expect("Statistics channel disconnected"); // TODO: Try performance of Vec<> on heap instead of stack. Also bigger buffer @@ -90,7 +87,6 @@ pub async fn handle_connection( // If there are any bytes left over from the previous loop iteration leave them as is and but the new data behind let bytes_read = match stream .read(&mut buffer[leftover_bytes_in_buffer..NETWORK_BUFFER_SIZE - PARSER_LOOKAHEAD]) - .await { Ok(bytes_read) => bytes_read, Err(_) => { @@ -108,7 +104,6 @@ pub async fn handle_connection( ip, bytes: statistics_bytes_read, }) - .await .expect("Statistics channel disconnected"); last_statistics = Instant::now(); statistics_bytes_read = 0; @@ -137,8 +132,7 @@ pub async fn handle_connection( &fb, &mut stream, parser_state, - ) - .await; + ); // IMPORTANT: We have to subtract 1 here, as e.g. we have "PX 0 0\n" data_end is 7 and parser_state.last_byte_parsed is 6. // This happens, because last_byte_parsed is an index starting at 0, so index 6 is from an array of length 7 @@ -161,7 +155,6 @@ pub async fn handle_connection( statistics_tx .send(StatisticsEvent::ConnectionClosed { ip }) - .await .expect("Statistics channel disconnected"); } @@ -185,7 +178,7 @@ mod test { use crate::test::helpers::MockTcpStream; use rstest::{fixture, rstest}; use std::time::Duration; - use tokio::sync::mpsc::{self, Receiver}; + use std::sync::mpsc::{self, Receiver}; #[fixture] fn ip() -> IpAddr { @@ -199,7 +192,7 @@ mod test { #[fixture] fn statistics_channel() -> (Sender, Receiver) { - mpsc::channel(10000) + mpsc::channel() } #[rstest] @@ -215,8 +208,8 @@ mod test { #[case("HELP", std::str::from_utf8(crate::parser::HELP_TEXT).unwrap())] #[case("HELP\n", std::str::from_utf8(crate::parser::HELP_TEXT).unwrap())] #[case("bla bla bla\nSIZE\nblub\nbla", "SIZE 1920 1080\n")] - #[tokio::test] - async fn test_correct_responses_to_general_commands( + #[test] + fn test_correct_responses_to_general_commands( #[case] input: &str, #[case] expected: &str, ip: IpAddr, @@ -224,7 +217,7 @@ mod test { statistics_channel: (Sender, Receiver), ) { let mut stream = MockTcpStream::from_input(input); - handle_connection(&mut stream, ip, fb, statistics_channel.0).await; + handle_connection(&mut stream, ip, fb, statistics_channel.0); assert_eq!(expected, stream.get_output()); } @@ -269,8 +262,8 @@ mod test { "PX 0 0 ffffff\nPX 42 42 000000\n" )] // The get pixel result is also offseted #[case("OFFSET 0 0\nPX 0 42 abcdef\nPX 0 42\n", "PX 0 42 abcdef\n")] - #[tokio::test] - async fn test_setting_pixel( + #[test] + fn test_setting_pixel( #[case] input: &str, #[case] expected: &str, ip: IpAddr, @@ -278,7 +271,7 @@ mod test { statistics_channel: (Sender, Receiver), ) { let mut stream = MockTcpStream::from_input(input); - handle_connection(&mut stream, ip, fb, statistics_channel.0).await; + handle_connection(&mut stream, ip, fb, statistics_channel.0); assert_eq!(expected, stream.get_output()); } @@ -286,15 +279,15 @@ mod test { #[rstest] #[case("PX 0 0 aaaaaa\n")] #[case("PX 0 0 aa\n")] - #[tokio::test] - async fn test_safe( + #[test] + fn test_safe( #[case] input: &str, ip: IpAddr, fb: Arc, statistics_channel: (Sender, Receiver), ) { let mut stream = MockTcpStream::from_input(input); - handle_connection(&mut stream, ip, fb.clone(), statistics_channel.0).await; + handle_connection(&mut stream, ip, fb.clone(), statistics_channel.0); // Test if it panics assert_eq!(fb.get(0, 0).unwrap() & 0x00ff_ffff, 0xaaaaaa); } @@ -315,8 +308,8 @@ mod test { #[case(500, 500, 300, 400)] #[case(fb().get_width(), fb().get_height(), 0, 0)] #[case(fb().get_width() - 1, fb().get_height() - 1, 1, 1)] - #[tokio::test] - async fn test_drawing_rect( + #[test] + fn test_drawing_rect( #[case] width: usize, #[case] height: usize, #[case] offset_x: usize, @@ -361,8 +354,7 @@ mod test { ip, Arc::clone(&fb), statistics_channel.0.clone(), - ) - .await; + ); assert_eq!("", stream.get_output()); // Read the pixels again @@ -372,8 +364,7 @@ mod test { ip, Arc::clone(&fb), statistics_channel.0.clone(), - ) - .await; + ); assert_eq!(fill_commands, stream.get_output()); // We can also do coloring and reading in a single connection @@ -383,8 +374,7 @@ mod test { ip, Arc::clone(&fb), statistics_channel.0.clone(), - ) - .await; + ); assert_eq!(combined_commands_expected, stream.get_output()); // Check that nothing else was colored @@ -394,8 +384,7 @@ mod test { ip, Arc::clone(&fb), statistics_channel.0.clone(), - ) - .await; + ); assert_eq!(read_other_pixels_commands_expected, stream.get_output()); } } diff --git a/src/parser.rs b/src/parser.rs index f7f4044..a390a80 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -3,7 +3,7 @@ use const_format::formatcp; use log::{info, warn}; use std::simd::{u32x8, Simd, SimdUint}; use std::sync::Arc; -use tokio::io::AsyncWriteExt; +use std::io::Write; pub const PARSER_LOOKAHEAD: usize = "PX 1234 1234 rrggbbaa\n".len(); // Longest possible command pub const HELP_TEXT: &[u8] = formatcp!("\ @@ -53,10 +53,10 @@ const fn string_to_number(input: &[u8]) -> u64 { /// TODO: Implement support for 16K (15360 × 8640). /// Currently the parser only can read up to 4 digits of x or y coordinates. /// If you buy me a big enough screen I will kindly implement this feature. -pub async fn parse_pixelflut_commands( +pub fn parse_pixelflut_commands( buffer: &[u8], fb: &Arc, - mut stream: impl AsyncWriteExt + Unpin, + mut stream: impl Write + Unpin, // We don't pass this as mutual reference but instead hand it around - I guess on the stack? // I don't know what I'm doing, hoping for best performance anyway ;) parser_state: ParserState, @@ -223,7 +223,6 @@ pub async fn parse_pixelflut_commands( ) .as_bytes(), ) - .await { Ok(_) => (), Err(_) => continue, @@ -302,7 +301,6 @@ pub async fn parse_pixelflut_commands( stream .write_all(format!("SIZE {} {}\n", fb.get_width(), fb.get_height()).as_bytes()) - .await .expect("Failed to write bytes to tcp socket"); continue; } else if current_command & 0xffff_ffff == string_to_number(b"HELP\0\0\0\0") { @@ -311,7 +309,6 @@ pub async fn parse_pixelflut_commands( stream .write_all(HELP_TEXT) - .await .expect("Failed to write bytes to tcp socket"); continue; } diff --git a/src/sinks/vnc.rs b/src/sinks/vnc.rs index cc6a351..59af2cc 100644 --- a/src/sinks/vnc.rs +++ b/src/sinks/vnc.rs @@ -6,7 +6,7 @@ use rusttype::{point, Font, Scale}; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; -use tokio::sync::mpsc::Sender; +use std::sync::mpsc::Sender; use vncserver::{ rfb_framebuffer_malloc, rfb_get_screen, rfb_init_server, rfb_mark_rect_as_modified, rfb_run_event_loop, RfbScreenInfoPtr, @@ -105,7 +105,7 @@ impl<'a> VncServer<'a> { height_up_to_stats_text as i32, ); self.statistics_tx - .blocking_send(StatisticsEvent::FrameRendered) + .send(StatisticsEvent::FrameRendered) .unwrap(); if !self.statistics_information_rx.is_empty() { diff --git a/src/statistics.rs b/src/statistics.rs index 1984736..b7338aa 100644 --- a/src/statistics.rs +++ b/src/statistics.rs @@ -4,10 +4,11 @@ use std::{ cmp::max, collections::{hash_map::Entry, HashMap}, fs::File, + sync::mpsc::Receiver, net::IpAddr, time::{Duration, Instant}, }; -use tokio::sync::{broadcast, mpsc::Receiver}; +use tokio::sync::{broadcast}; pub const STATS_REPORT_INTERVAL: Duration = Duration::from_millis(1000); pub const STATS_SLIDING_WINDOW_SIZE: usize = 5; @@ -106,7 +107,7 @@ impl Statistics { let mut last_save_file_written = Instant::now(); let mut statistics_information_event = StatisticsInformationEvent::default(); - while let Some(statistics_update) = self.statistics_rx.recv().await { + while let Ok(statistics_update) = self.statistics_rx.recv() { self.statistic_events += 1; match statistics_update { StatisticsEvent::ConnectionCreated { ip } => {