Skip to content

Commit

Permalink
move handler logic into block subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Feb 26, 2025
1 parent c966f82 commit 570f689
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
4 changes: 2 additions & 2 deletions kafka-producer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ async fn main() -> anyhow::Result<()> {
let storage = node.storage();
let ps_subscriber = PsSubscriber::new(storage.clone());

ShardStateApplier::new(storage.clone(), (rpc_state.1, writer, ps_subscriber))
ShardStateApplier::new(storage.clone(), (rpc_state.1, ps_subscriber))
};

node.run(
archive_block_provider.chain((blockchain_block_provider, storage_block_provider)),
(state_applier, rpc_state.0),
(writer, state_applier, rpc_state.0),
)
.await?;

Expand Down
43 changes: 29 additions & 14 deletions kafka-producer/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures_util::{FutureExt, StreamExt};
use rayon::prelude::*;
use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
use tycho_block_util::block::BlockStuff;
use tycho_core::block_strider::{StateSubscriber, StateSubscriberContext};
use tycho_core::block_strider::{BlockSubscriber, BlockSubscriberContext};

use crate::config::KafkaConsumerConfig;

Expand Down Expand Up @@ -177,16 +177,26 @@ struct TransactionToKafka {
timestamp: u32,
}

impl StateSubscriber for KafkaProducer {
type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
impl BlockSubscriber for KafkaProducer {
type Prepared = ();
type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
type HandleBlockFut<'a> = futures_util::future::Ready<Result<()>>;

fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
async move {
self.handle_block(&cx.block).await?;
Ok(())
}
.boxed()
}

fn handle_block<'a>(
&'a self,
_: &'a BlockSubscriberContext,
_: Self::Prepared,
) -> Self::HandleBlockFut<'a> {
futures_util::future::ready(Ok(()))
}
}

#[expect(clippy::large_enum_variant, reason = "doesn't matter")]
Expand All @@ -195,20 +205,25 @@ pub enum OptionalStateSubscriber {
Blackhole,
}

impl StateSubscriber for OptionalStateSubscriber {
type HandleStateFut<'a> = futures_util::future::Either<
<KafkaProducer as StateSubscriber>::HandleStateFut<'a>,
futures_util::future::Ready<Result<()>>,
>;
impl BlockSubscriber for OptionalStateSubscriber {
type Prepared = ();
type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
type HandleBlockFut<'a> = futures_util::future::Ready<Result<()>>;

fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
match self {
OptionalStateSubscriber::KafkaProducer(producer) => {
futures_util::future::Either::Left(producer.handle_block(&cx.block).boxed())
}
OptionalStateSubscriber::Blackhole => {
futures_util::future::Either::Right(futures_util::future::ok(()))
producer.handle_block(&cx.block).boxed()
}
OptionalStateSubscriber::Blackhole => futures_util::future::ok(()).boxed(),
}
}

fn handle_block<'a>(
&'a self,
_: &'a BlockSubscriberContext,
_: Self::Prepared,
) -> Self::HandleBlockFut<'a> {
futures_util::future::ready(Ok(()))
}
}

0 comments on commit 570f689

Please sign in to comment.