From 688741dc2a86dc6a191c8a91c85280b22c2566d2 Mon Sep 17 00:00:00 2001 From: Kornel Date: Wed, 24 Jan 2024 00:16:52 +0000 Subject: [PATCH] Move out ordqueue impl --- Cargo.lock | 10 ++++++ Cargo.toml | 1 + src/error.rs | 8 ++--- src/lib.rs | 21 +++++++------ src/ordqueue.rs | 84 ------------------------------------------------- 5 files changed, 26 insertions(+), 98 deletions(-) delete mode 100644 src/ordqueue.rs diff --git a/Cargo.lock b/Cargo.lock index 3876864..e36cfc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -324,6 +324,7 @@ dependencies = [ "loop9", "natord", "num-traits", + "ordered-channel", "pbr", "quick-error", "resize", @@ -483,6 +484,15 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "ordered-channel" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8dd66e3218b7ee8b7c17275f8327121b9c43b29b3e9d2f812d0aa48105fd7e8d" +dependencies = [ + "crossbeam-channel", +] + [[package]] name = "pbr" version = "1.1.1" diff --git a/Cargo.toml b/Cargo.toml index 4744b60..c06acff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ loop9 = "0.1.5" # noisy-float 0.2 bug num-traits = { version = "0.2.17", features = ["i128", "std"] } crossbeam-utils = "0.8.19" +ordered-channel = { version = "1.0.0", features = ["crossbeam-channel"] } [dependencies.ffmpeg] package = "ffmpeg-next" diff --git a/src/error.rs b/src/error.rs index 70bc7b0..c33611c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -59,16 +59,16 @@ impl From for Error { } } -impl From> for Error { +impl From> for Error { #[cold] - fn from(_: crossbeam_channel::SendError) -> Self { + fn from(_: ordered_channel::SendError) -> Self { Self::ThreadSend } } -impl From for Error { +impl From for Error { #[cold] - fn from(_: crossbeam_channel::RecvError) -> Self { + fn from(_: ordered_channel::RecvError) -> Self { Self::Aborted } } diff --git a/src/lib.rs b/src/lib.rs index 8aaf8b5..611e97e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,8 +36,9 @@ use rgb::*; mod error; pub use crate::error::*; -mod ordqueue; -use crate::ordqueue::{OrdQueue, OrdQueueIter}; +use ordered_channel::Sender as OrdQueue; +use ordered_channel::Receiver as OrdQueueIter; +use ordered_channel::bounded as ordqueue_new; pub mod progress; use crate::progress::*; pub mod c_api; @@ -480,7 +481,7 @@ impl Writer { #[inline(never)] fn write_frames(&self, write_queue: Receiver, writer: &mut dyn Write, reporter: &mut dyn ProgressReporter) -> CatResult<()> { - let (send, recv) = ordqueue::new(2); + let (lzw_queue, lzw_recv) = ordqueue_new(2); minipool::new_scope((if self.settings.s.fast || self.settings.gifsicle_loss() > 0 { 3 } else { 1 }).try_into().unwrap(), "lzw", move || { let mut pts_in_delay_units = 0_u64; @@ -488,7 +489,7 @@ impl Writer { let mut enc = RustEncoder::new(writer, written.clone()); let mut n_done = 0; - for tmp in recv { + for tmp in lzw_recv { let (end_pts, ordinal_frame_number, frame, screen_width, screen_height): (f64, _, _, _, _) = tmp; // delay=1 doesn't work, and it's too late to drop frames now let delay = ((end_pts * 100_f64).round() as u64) @@ -520,7 +521,7 @@ impl Writer { } let frame = RustEncoder::<&mut dyn std::io::Write>::compress_frame(frame, &self.settings)?; - send.push(frame_index, (end_pts, ordinal_frame_number, frame, screen_width, screen_height))?; + lzw_queue.send(frame_index, (end_pts, ordinal_frame_number, frame, screen_width, screen_height))?; } Ok(()) }) @@ -540,7 +541,7 @@ impl Writer { #[inline(never)] fn write_inner(&self, decode_queue_recv: Receiver, writer: &mut dyn Write, reporter: &mut dyn ProgressReporter) -> CatResult<()> { thread::scope(|s| { - let (diff_queue, diff_queue_recv) = ordqueue::new(0); + let (diff_queue, diff_queue_recv) = ordqueue_new(0); let resize_thread = thread::Builder::new().name("resize".into()).spawn_scoped(s, move || { self.make_resize(decode_queue_recv, diff_queue) })?; @@ -548,7 +549,7 @@ impl Writer { let diff_thread = thread::Builder::new().name("diff".into()).spawn_scoped(s, move || { self.make_diffs(diff_queue_recv, quant_queue) })?; - let (remap_queue, remap_queue_recv) = ordqueue::new(0); + let (remap_queue, remap_queue_recv) = ordqueue_new(0); let quant_thread = thread::Builder::new().name("quant".into()).spawn_scoped(s, move || { self.quantize_frames(quant_queue_recv, remap_queue) })?; @@ -591,7 +592,7 @@ impl Writer { }; let resized = resized_binary_alpha(image, self.settings.s.width, self.settings.s.height, self.settings.matte)?; let frame_blurred = if self.settings.extra_effort { smart_blur(resized.as_ref()) } else { less_smart_blur(resized.as_ref()) }; - diff_queue.push(frame.frame_index, InputFrameResized { + diff_queue.send(frame.frame_index, InputFrameResized { frame: resized, frame_blurred, presentation_timestamp: frame.presentation_timestamp, @@ -745,7 +746,7 @@ impl Writer { let needs_transparency = frame_index > 0 || (frame_index == 0 && first_frame_has_transparency); let (liq, remap, liq_image, out_buf) = self.quantize(image, &importance_map, frame_index == 0, needs_transparency, prev_frame_keeps)?; - remap_queue.push(frame_index as usize, RemapMessage { + Ok(remap_queue.send(frame_index as usize, RemapMessage { ordinal_frame_number, end_pts, dispose, @@ -753,7 +754,7 @@ impl Writer { liq_image, out_buf, has_next_frame, - }) + })?) }) } diff --git a/src/ordqueue.rs b/src/ordqueue.rs deleted file mode 100644 index d21e48d..0000000 --- a/src/ordqueue.rs +++ /dev/null @@ -1,84 +0,0 @@ -use crate::error::CatResult; -use crossbeam_channel::{Receiver, Sender}; -use std::cmp::Ordering; -use std::collections::BinaryHeap; -use std::iter::FusedIterator; - -pub struct OrdQueue { - sender: Sender>, -} - -impl Clone for OrdQueue { - #[inline] - fn clone(&self) -> Self { - Self { sender: self.sender.clone() } - } -} - -pub struct OrdQueueIter { - receiver: Receiver>, - next_index: usize, - receive_buffer: BinaryHeap>, -} - -pub fn new(depth: usize) -> (OrdQueue, OrdQueueIter) { - let (sender, receiver) = crossbeam_channel::bounded(depth); - (OrdQueue { - sender, - }, OrdQueueIter { - receiver, - next_index: 0, - receive_buffer: BinaryHeap::new() - }) -} - -impl OrdQueue { - #[inline] - pub fn push(&self, index: usize, item: T) -> CatResult<()> { - self.sender.send(ReverseTuple(index, item))?; - Ok(()) - } -} - -impl FusedIterator for OrdQueueIter {} - -impl Iterator for OrdQueueIter { - type Item = T; - - #[inline(never)] - fn next(&mut self) -> Option { - while self.receive_buffer.peek().map(|i| i.0) != Some(self.next_index) { - match self.receiver.recv() { - Ok(item) => { - self.receive_buffer.push(item); - }, - Err(_) => { - // Sender dropped (but continue to dump receive_buffer buffer) - break; - }, - } - } - - if let Some(item) = self.receive_buffer.pop() { - self.next_index += 1; - Some(item.1) - } else { - None - } - } -} - -struct ReverseTuple(usize, T); -impl PartialEq for ReverseTuple { - #[inline] - fn eq(&self, o: &Self) -> bool { o.0.eq(&self.0) } -} -impl Eq for ReverseTuple {} -impl PartialOrd for ReverseTuple { - #[inline] - fn partial_cmp(&self, o: &Self) -> Option { o.0.partial_cmp(&self.0) } -} -impl Ord for ReverseTuple { - #[inline] - fn cmp(&self, o: &Self) -> Ordering { o.0.cmp(&self.0) } -}