Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Jun 25, 2024
0 parents commit 22b26f7
Show file tree
Hide file tree
Showing 8 changed files with 1,405 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Generated by Cargo
# will have compiled files and executables
debug/
target/

# It's a library, so ignore the lockfile
Cargo.lock
41 changes: 41 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[package]
name = "example-exex-remote"
version = "0.1.0"
edition = "2021"
rust-version = "1.79"
license = "MIT OR Apache-2.0"
publish = false

[dependencies]
reth = { git = "https://github.com/paradigmxyz/reth" }
reth-exex = { git = "https://github.com/paradigmxyz/reth", features = ["serde"] }
reth-node-api = { git = "https://github.com/paradigmxyz/reth" }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth" }
reth-tracing = { git = "https://github.com/paradigmxyz/reth" }

eyre = "0.6"

tonic = "0.11"
prost = "0.12"
tokio = { version = "1.0", features = ["full"] }
tokio-stream = "0.1"

bincode = "1.3"

[build-dependencies]
tonic-build = "0.11"

[dev-dependencies]
reth-exex-test-utils = { git = "https://github.com/paradigmxyz/reth" }

tokio = "1"

[features]
default = []
optimism = ["reth/optimism"]

[[example]]
name = "exex"

[[example]]
name = "consumer"
4 changes: 4 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fn main() {
tonic_build::compile_protos("proto/exex.proto")
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
}
32 changes: 32 additions & 0 deletions examples/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use example_exex_remote::proto::{remote_ex_ex_client::RemoteExExClient, SubscribeRequest};
use reth_exex::ExExNotification;
use reth_tracing::{tracing::info, RethTracer, Tracer};

#[tokio::main]
async fn main() -> eyre::Result<()> {
let _ = RethTracer::new().init()?;

let mut client = RemoteExExClient::connect("http://[::1]:10000")
.await?
.max_encoding_message_size(usize::MAX)
.max_decoding_message_size(usize::MAX);

let mut stream = client.subscribe(SubscribeRequest {}).await?.into_inner();
while let Some(notification) = stream.message().await? {
let notification = ExExNotification::try_from(&notification)?;

match notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};
}

Ok(())
}
77 changes: 77 additions & 0 deletions examples/exex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use example_exex_remote::proto::{
remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
ExExNotification as ProtoExExNotification, SubscribeRequest as ProtoSubscribeRequest,
};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};

#[derive(Debug)]
struct ExExService {
notifications: broadcast::Sender<ExExNotification>,
}

#[tonic::async_trait]
impl RemoteExEx for ExExService {
type SubscribeStream = ReceiverStream<Result<ProtoExExNotification, Status>>;

async fn subscribe(
&self,
_request: Request<ProtoSubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let (tx, rx) = mpsc::channel(1);

let mut notifications = self.notifications.subscribe();
tokio::spawn(async move {
while let Ok(notification) = notifications.recv().await {
tx.send(Ok((&notification).try_into().expect("failed to encode")))
.await
.expect("failed to send notification to client");
}
});

Ok(Response::new(ReceiverStream::new(rx)))
}
}

async fn exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
notifications: broadcast::Sender<ExExNotification>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}

let _ = notifications.send(notification);
}

Ok(())
}

fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let notifications = broadcast::channel(1).0;

let server = Server::builder()
.add_service(RemoteExExServer::new(ExExService {
notifications: notifications.clone(),
}))
.serve("[::1]:10000".parse().unwrap());

let handle = builder
.node(EthereumNode::default())
.install_exex("Remote", |ctx| async move { Ok(exex(ctx, notifications)) })
.launch()
.await?;

handle.node.task_executor.spawn_critical("gRPC server", async move {
server.await.expect("gRPC server crashed")
});

handle.wait_for_node_exit().await
})
}
Loading

0 comments on commit 22b26f7

Please sign in to comment.