Rust client utilities for streaming NEAR blockchain blocks via an Aurora Borealis Blocks API (gRPC) endpoint. It builds strongly-typed clients from the upstream borealis-prototypes
protobuf definitions and converts streamed block messages into near-indexer-primitives::StreamerMessage
structures ready for ingestion / indexing pipelines.
- gRPC streaming client (Tonic) for Borealis Blocks API
ReceiveBlocks
- Automatic protobuf generation from the
borealis-prototypes
submodule - Batch processing with ordered delivery guarantees
- Parallel decoding + deterministic (height-ordered) reassembly
- LZ4 + Borealis envelope decoding of NEAR block payloads (Near Block V2)
- Simple builder-based configuration (
BlocksApiConfigBuilder
) - Backpressure via Tokio
mpsc
channel with configurable capacity - Optional bearer token authentication
Experimental / early stage. Expect breaking API changes prior to 1.0.0
.
git clone https://github.com/defuse-protocol/blocksapi-rs.git
cd blocksapi-rs
git submodule update --init --recursive
The crate is not (yet) published. Add it via a git dependency:
[dependencies]
blocksapi-rs = { git = "https://github.com/defuse-protocol/blocksapi-rs.git", tag = "v0.1.0" }
Or using a local path:
[dependencies]
blocksapi-rs = { path = "../blocksapi-rs" }
Pinned via rust-toolchain
to 1.87.0
.
The build.rs
searches recursively under proto/borealis-prototypes
for all .proto
files and generates modules into $OUT_DIR
using tonic-prost-build
.
If you see:
Error: borealis-prototypes submodule not found!
Run:
git submodule update --init --recursive
Regenerate after upstream proto updates:
git -C proto/borealis-prototypes pull origin main
cargo clean && cargo build
All configuration uses BlocksApiConfig
(build with BlocksApiConfigBuilder
). Key fields:
Field | Type | Default | Description |
---|---|---|---|
server_addr |
String |
(required) | gRPC endpoint base URL (e.g. http://localhost:4300 ). |
start_on |
Option<u64> |
None |
Specific block height to target (closest). If None , starts from latest available. |
stream_name |
String |
v2_mainnet_near_blocks |
Server stream identifier. |
blocksapi_token |
Option<String> |
None |
Bearer auth token if server requires authentication. |
batch_size |
usize |
20 |
Messages accumulated before parallel decode & ordered dispatch. |
concurrency |
usize |
100 |
Size of internal mpsc channel (and spawn concurrency). |
tcp_keepalive |
u64 |
30 |
TCP keepalive (s). |
http2_keepalive_interval |
u64 |
30 |
HTTP/2 PING interval (s). |
keepalive_timeout |
u64 |
5 |
Keepalive timeout (s). |
connect_timeout |
u64 |
10 |
Connection timeout (s). |
http2_adaptive_window |
bool |
true |
Enable adaptive flow control in h2. |
connection_window_size |
u32 |
134217728 |
Initial connection window (bytes). |
stream_window_size |
u32 |
134217728 |
Initial per-stream window (bytes). |
buffer_size |
usize |
1073741824 |
Tonic buffer size (bytes). |
concurrency_limit |
usize |
1024 |
Max concurrent HTTP/2 streams/requests. |
Environment variables (convention used in example.main.rs
):
Variable | Meaning |
---|---|
BLOCKS_API_SERVER_ADDR |
Overrides server_addr |
BLOCKS_API_START_BLOCK |
Parsed as u64 for start_on |
BLOCKSAPI_TOKEN |
Bearer token (without Bearer prefix) |
- Build a config
- Call
streamer(config)
→ returns(JoinHandle, Receiver<StreamerMessage>)
- Consume messages until channel closes / end of stream
- Await the join handle to surface terminal errors
Example:
let config = BlocksApiConfigBuilder::default()
.server_addr("https://blocks.your-network.example")
.start_on(123_456_789u64) // or omit
.blocksapi_token(Some("my-secret-token".into()))
.batch_size(50)
.build()?;
let (task, mut rx) = blocksapi::streamer(config);
while let Some(msg) = rx.recv().await {
// Process StreamerMessage
do_something(&msg)?;
}
task.await??; // Propagate errors
For each ReceiveBlocksResponse
batch:
- Raw gRPC response → extract
BlockMessage
withRawPayload
- Validate format ==
PayloadNearBlockV2
- CBOR decode Borealis envelope (versioned wrapper) via
ciborium
- Extract inner LZ4-compressed block JSON blob → decompress
- Deserialize into
near_indexer_primitives::StreamerMessage
viaserde_json
- Sort batch by
block.header.height
to ensure monotonic order - Forward downstream via
mpsc::Sender
This ensures deterministic ordering even with parallel per-message decoding.
- Increase
batch_size
to improve parallel decode efficiency (higher latency tolerance) - Adjust
concurrency
(channel size) to accommodate downstream backpressure - Tune HTTP/2 window sizes for high-throughput / high-latency networks
- Ensure LZ4 decompression keeps pace (consider profiling if CPU-bound)
- Per-message decode errors are logged (
tracing::error!
) and skipped - Stream errors bubble up →
start()
returnsErr
and join handle resolves with error - Unexpected block height regressions emit a
tracing::warn!
Integrate with your preferred subscriber (e.g. tracing_subscriber::fmt()
).
Rebuild protos after updating submodule:
git submodule update --remote --recursive
cargo clean && cargo build