Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iceberg_rust_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iceberg_rust_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ default = ["julia"]
julia = []

[dependencies]
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "f2e1fa29c983244d607d5b61e789e810b291f810" }
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "a49fc45ff8231fba5b6f9ffa98f9c6c9c21777d8" }
object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false }
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
Expand Down
11 changes: 9 additions & 2 deletions iceberg_rust_ffi/src/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,24 @@ pub extern "C" fn iceberg_new_incremental_scan(
// Use macros from scan_common for shared functionality
impl_select_columns!(iceberg_incremental_select_columns, IcebergIncrementalScan);

impl_scan_builder_method!(
iceberg_incremental_scan_with_manifest_file_concurrency_limit,
IcebergIncrementalScan,
with_manifest_file_concurrency_limit,
n: usize
);

impl_scan_builder_method!(
iceberg_incremental_scan_with_data_file_concurrency_limit,
IcebergIncrementalScan,
with_concurrency_limit_data_files,
with_data_file_concurrency_limit,
n: usize
);

impl_scan_builder_method!(
iceberg_incremental_scan_with_manifest_entry_concurrency_limit,
IcebergIncrementalScan,
with_concurrency_limit_manifest_entries,
with_manifest_entry_concurrency_limit,
n: usize
);

Expand Down
5 changes: 4 additions & 1 deletion src/RustyIceberg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ export FILE_COLUMN, POS_COLUMN
const rust_lib = iceberg_rust_ffi_jll.libiceberg_rust_ffi

"""
struct StaticConfig

Runtime configuration for the Iceberg library.
Value of 0 means use all CPU cores, regardless of the number of threads in the Julia runtime.
"""
struct StaticConfig
n_threads::Culonglong
Expand Down Expand Up @@ -89,7 +92,7 @@ This starts a `tokio` runtime for handling Iceberg requests.
It must be called before sending a request.
"""
function init_runtime(
config::StaticConfig=StaticConfig(0);
config::StaticConfig=StaticConfig(Threads.nthreads());
on_rust_panic::Function=default_panic_hook
)
global _PANIC_HOOK
Expand Down
17 changes: 17 additions & 0 deletions src/incremental.jl
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,23 @@ function select_columns!(scan::IncrementalScan, column_names::Vector{String})
return nothing
end

"""
with_manifest_file_concurrency_limit(scan::IncrementalScan, n::UInt)

Sets the manifest file concurrency level for the incremental scan.
"""
function with_manifest_file_concurrency_limit(scan::IncrementalScan, n::UInt)
result = @ccall rust_lib.iceberg_incremental_scan_with_manifest_file_concurrency_limit(
convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}},
n::Csize_t
)::Cint

if result != 0
error("Failed to set data file concurrency limit for incremental scan")
end
return nothing
end

"""
with_data_file_concurrency_limit!(scan::IncrementalScan, n::UInt)

Expand Down
10 changes: 8 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ using Arrow
# Test runtime initialization - this should work
@test_nowarn init_runtime()

# Test that we can initialize multiple times safely
@test_nowarn init_runtime()
# Test that we can initialize multiple times safely. But a new static config
# wouldn't take effect, would silently ignore the config.
@test_nowarn init_runtime(StaticConfig(1))

println("✅ Runtime initialization successful")
end
Expand Down Expand Up @@ -397,6 +398,11 @@ end
@test scan3.ptr != C_NULL
println("✅ Incremental scan created with nothing for both snapshot IDs")

RustyIceberg.with_manifest_file_concurrency_limit(scan3, UInt(2))
RustyIceberg.with_manifest_entry_concurrency_limit!(scan3, UInt(256))
RustyIceberg.with_data_file_concurrency_limit!(scan3, UInt(1024))
RustyIceberg.with_batch_size!(scan3, UInt(50))

inserts_stream3, deletes_stream3 = RustyIceberg.scan!(scan3)
@test inserts_stream3 != C_NULL
@test deletes_stream3 != C_NULL
Expand Down
Loading