Skip to content

Commit

Permalink
add new constructors, add read_input_stream_async method, change read…
Browse files Browse the repository at this point in the history
…_input_stream api
  • Loading branch information
yggverse committed Oct 24, 2024
1 parent e838189 commit 62c0f5e
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 19 deletions.
108 changes: 90 additions & 18 deletions src/client/connection/input_stream/byte_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod error;
pub use error::Error;

use gio::{prelude::InputStreamExt, Cancellable, InputStream};
use glib::{object::IsA, Bytes};
use glib::{object::IsA, Bytes, Priority};

pub const DEFAULT_CAPACITY: usize = 0x400;
pub const DEFAULT_CHUNK_SIZE: usize = 0x100;
Expand All @@ -14,25 +14,42 @@ pub struct ByteBuffer {
}

impl ByteBuffer {
/// Create dynamically allocated bytes buffer from `gio::InputStream`
// Constructors

/// Create new dynamically allocated bytes buffer with default capacity
pub fn new() -> Self {
Self::with_capacity(Some(DEFAULT_CAPACITY))
}

/// Create new dynamically allocated bytes buffer with initial capacity
///
/// Options:
/// * `capacity` bytes request to reduce extra memory overwrites (1024 by default)
/// * initial bytes request to reduce extra memory overwrites (1024 by default)
pub fn with_capacity(value: Option<usize>) -> Self {
Self {
bytes: Vec::with_capacity(match value {
Some(capacity) => capacity,
None => DEFAULT_CAPACITY,
}),
}
}

// Readers

/// Populate bytes buffer synchronously from `gio::InputStream`
///
/// Options:
/// * `input_stream` https://docs.gtk.org/gio/class.InputStream.html
/// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html
/// * `chunk_size` bytes limit to read per iter (256 by default)
/// * `max_size` bytes limit to prevent memory overflow (1M by default)
pub fn from_input_stream(
input_stream: &InputStream, // @TODO
pub fn read_input_stream(
mut self,
input_stream: InputStream,
cancellable: Option<&impl IsA<Cancellable>>,
capacity: Option<usize>,
chunk_size: Option<usize>,
max_size: Option<usize>,
) -> Result<Self, Error> {
// Create buffer with initial capacity
let mut buffer: Vec<Bytes> = Vec::with_capacity(match capacity {
Some(value) => value,
None => DEFAULT_CAPACITY,
});

// Disallow unlimited buffer, use defaults on None
let limit = match max_size {
Some(value) => value,
Expand All @@ -41,7 +58,7 @@ impl ByteBuffer {

loop {
// Check buffer size to prevent memory overflow
if buffer.len() > limit {
if self.bytes.len() > limit {
return Err(Error::Overflow);
}

Expand All @@ -56,18 +73,73 @@ impl ByteBuffer {
Ok(bytes) => {
// No bytes were read, end of stream
if bytes.len() == 0 {
break;
return Ok(self);
}

// Save chunk to buffer
buffer.push(bytes);
self.bytes.push(bytes);
}
Err(_) => return Err(Error::Stream),
Err(_) => return Err(Error::StreamChunkRead),
};
}
}

/// Populate bytes buffer asynchronously from `gio::InputStream`,
/// apply callback function to `Ok(Self)` on success
///
/// Options:
/// * `input_stream` https://docs.gtk.org/gio/class.InputStream.html
/// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html
/// * `priority` e.g. https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html
/// * `chunk_size` optional bytes limit to read per iter (256 by default)
/// * `max_size` optional bytes limit to prevent memory overflow (1M by default)
/// * `callback` user function to apply on complete
pub fn read_input_stream_async(
mut self,
input_stream: InputStream,
cancellable: Cancellable,
priority: Priority,
chunk_size: Option<usize>,
max_size: Option<usize>,
callback: impl FnOnce(Result<Self, Error>) + 'static,
) {
// Clone reference counted chunk dependencies
let _input_stream = input_stream.clone();
let _cancellable = cancellable.clone();

// Continue bytes reading
input_stream.read_bytes_async(
match max_size {
Some(value) => value,
None => DEFAULT_MAX_SIZE,
},
priority,
Some(&cancellable),
move |result| -> () {
match result {
Ok(bytes) => {
// No bytes were read, end of stream
if bytes.len() == 0 {
return callback(Ok(self));
}

// Save chunk to buffer
self.bytes.push(bytes);

// Done
Ok(Self { bytes: buffer })
// Continue bytes reading...
self.read_input_stream_async(
_input_stream,
_cancellable,
priority,
chunk_size,
max_size,
callback,
);
}
Err(_) => callback(Err(Error::StreamChunkReadAsync)),
}
},
);
}

/// Get link to bytes collected
Expand Down
3 changes: 2 additions & 1 deletion src/client/connection/input_stream/byte_buffer/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub enum Error {
Overflow,
Stream,
StreamChunkRead,
StreamChunkReadAsync,
}

0 comments on commit 62c0f5e

Please sign in to comment.