Skip to content

Commit

Permalink
Use Gstreamer Buffer map in StreamCapture (#216)
Browse files Browse the repository at this point in the history
* store frame buffer in StreamCapture

* add missed commented code

* add comment
  • Loading branch information
edgarriba authored Jan 9, 2025
1 parent 992a7c5 commit bbc9ada
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 22 deletions.
4 changes: 2 additions & 2 deletions crates/kornia-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ log = { workspace = true }
thiserror = { workspace = true }

# optional dependencies
gst = { version = "0.23.0", package = "gstreamer", optional = true }
gst-app = { version = "0.23.0", package = "gstreamer-app", optional = true }
gst = { version = "0.23.4", package = "gstreamer", optional = true }
gst-app = { version = "0.23.4", package = "gstreamer-app", optional = true }
memmap2 = "0.9.4"
turbojpeg = { version = "1.0.0", optional = true }

Expand Down
85 changes: 70 additions & 15 deletions crates/kornia-io/src/stream/capture.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
use std::sync::{Arc, Mutex};

use crate::stream::error::StreamCaptureError;
use gst::prelude::*;
use kornia_image::Image;

// utility struct to store the frame buffer
struct FrameBuffer {
buffer: gst::MappedBuffer<gst::buffer::Readable>,
width: usize,
height: usize,
}

/// Represents a stream capture pipeline using GStreamer.
pub struct StreamCapture {
pipeline: gst::Pipeline,
appsink: gst_app::AppSink,
last_frame: Arc<Mutex<Option<FrameBuffer>>>,
}

impl StreamCapture {
Expand All @@ -31,7 +40,33 @@ impl StreamCapture {
.dynamic_cast::<gst_app::AppSink>()
.map_err(StreamCaptureError::DowncastPipelineError)?;

Ok(Self { pipeline, appsink })
let last_frame = Arc::new(Mutex::new(None));

appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let last_frame = last_frame.clone();
move |sink| {
last_frame
.lock()
.map_err(|_| gst::FlowError::Error)
.and_then(|mut guard| {
Self::extract_frame_buffer(sink)
.map(|frame_buffer| {
guard.replace(frame_buffer);
gst::FlowSuccess::Ok
})
.map_err(|_| gst::FlowError::Error)
})
}
})
.build(),
);

Ok(Self {
pipeline,
last_frame,
})
}

/// Starts the stream capture pipeline and processes messages on the bus.
Expand All @@ -54,11 +89,23 @@ impl StreamCapture {
/// # Returns
///
/// An Option containing the last captured Image or None if no image has been captured yet.
pub fn grab(&self) -> Result<Option<Image<u8, 3>>, StreamCaptureError> {
self.appsink
.try_pull_sample(gst::ClockTime::ZERO)
.map(Self::extract_image_frame)
.transpose()
pub fn grab(&mut self) -> Result<Option<Image<u8, 3>>, StreamCaptureError> {
let mut last_frame = self
.last_frame
.lock()
.map_err(|_| StreamCaptureError::LockError)?;

last_frame.take().map_or(Ok(None), |frame_buffer| {
// TODO: solve the zero copy issue
// https://discourse.gstreamer.org/t/zero-copy-video-frames/3856/2
let img = Image::<u8, 3>::new(
[frame_buffer.width, frame_buffer.height].into(),
frame_buffer.buffer.to_owned(),
)
.map_err(|_| StreamCaptureError::CreateImageFrameError)?;

Ok(Some(img))
})
}

/// Closes the stream capture pipeline.
Expand All @@ -73,16 +120,18 @@ impl StreamCapture {
Ok(())
}

/// Extracts an image frame from the AppSink.
/// Extracts a frame buffer from the AppSink.
///
/// # Arguments
///
/// * `sample` - The sample to extract the frame from.
/// * `appsink` - The AppSink to extract the frame buffer from.
///
/// # Returns
///
/// A Result containing the extracted Image or a StreamCaptureError.
fn extract_image_frame(sample: gst::Sample) -> Result<Image<u8, 3>, StreamCaptureError> {
/// A Result containing the extracted FrameBuffer or a StreamCaptureError.
fn extract_frame_buffer(appsink: &gst_app::AppSink) -> Result<FrameBuffer, StreamCaptureError> {
let sample = appsink.pull_sample()?;

let caps = sample
.caps()
.ok_or_else(|| StreamCaptureError::GetCapsError)?;
Expand All @@ -100,12 +149,18 @@ impl StreamCapture {
.map_err(|_| StreamCaptureError::GetWidthError)? as usize;

let buffer = sample
.buffer()
.buffer_owned()
.ok_or_else(|| StreamCaptureError::GetBufferError)?
.map_readable()?;
.into_mapped_buffer_readable()
.map_err(|_| StreamCaptureError::GetBufferError)?;

let frame_buffer = FrameBuffer {
buffer,
width,
height,
};

Image::<u8, 3>::new([width, height].into(), buffer.to_owned())
.map_err(|_| StreamCaptureError::CreateImageFrameError)
Ok(frame_buffer)
}
}

Expand Down
4 changes: 4 additions & 0 deletions crates/kornia-io/src/stream/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ pub enum StreamCaptureError {
/// An error occurred when the pipeline is not running.
#[error("Pipeline is not running")]
PipelineNotRunning,

/// An error occurred when the allocator is not found.
#[error("Cannot lock the last frame")]
LockError,
}

// ensure that can be sent over threads
Expand Down
2 changes: 1 addition & 1 deletion examples/features/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let webcam = V4L2CameraConfig::new().with_size(size).build()?;
let mut webcam = V4L2CameraConfig::new().with_size(size).build()?;

// start the background pipeline
webcam.start()?;
Expand Down
2 changes: 1 addition & 1 deletion examples/filters/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let webcam = V4L2CameraConfig::new().with_size(size).build()?;
let mut webcam = V4L2CameraConfig::new().with_size(size).build()?;

// start the background pipeline
webcam.start()?;
Expand Down
2 changes: 1 addition & 1 deletion examples/rtspcam/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let rec = rerun::RecordingStreamBuilder::new("Kornia Rtsp Stream Capture App").spawn()?;

//// create a stream capture object
let capture = RTSPCameraConfig::new()
let mut capture = RTSPCameraConfig::new()
.with_settings(
&args.username,
&args.password,
Expand Down
2 changes: 1 addition & 1 deletion examples/video_write/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let webcam = V4L2CameraConfig::new()
let mut webcam = V4L2CameraConfig::new()
.with_camera_id(args.camera_id)
.with_fps(args.fps as u32)
.with_size(frame_size)
Expand Down
2 changes: 1 addition & 1 deletion examples/webcam/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let webcam = V4L2CameraConfig::new()
let mut webcam = V4L2CameraConfig::new()
.with_camera_id(args.camera_id)
.with_fps(args.fps)
.with_size(ImageSize {
Expand Down

0 comments on commit bbc9ada

Please sign in to comment.