Skip to content

Commit 4079b16

Browse files
authored
fix: add back bundleWithMetadata (#52)
* add back bundleWithMetadata * fix integration test
1 parent 86b275c commit 4079b16

File tree

5 files changed

+19
-26
lines changed

5 files changed

+19
-26
lines changed

crates/bundle-pool/src/source.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use async_trait::async_trait;
33
use rdkafka::consumer::{Consumer, StreamConsumer};
44
use rdkafka::{ClientConfig, Message};
55
use std::fmt::Debug;
6-
use tips_core::{Bundle, BundleWithMetadata};
6+
use tips_core::BundleWithMetadata;
77
use tokio::sync::mpsc;
88
use tracing::{error, trace};
99

@@ -52,29 +52,22 @@ impl BundleSource for KafkaBundleSource {
5252
}
5353
};
5454

55-
let bundle: Bundle = match serde_json::from_slice(payload) {
56-
Ok(b) => b,
57-
Err(e) => {
58-
error!(error = %e, "Failed to deserialize bundle");
59-
continue;
60-
}
61-
};
55+
let bundle_with_metadata: BundleWithMetadata =
56+
match serde_json::from_slice(payload) {
57+
Ok(b) => b,
58+
Err(e) => {
59+
error!(error = %e, "Failed to deserialize bundle");
60+
continue;
61+
}
62+
};
6263

6364
trace!(
64-
bundle = ?bundle,
65+
bundle = ?bundle_with_metadata,
6566
offset = message.offset(),
6667
partition = message.partition(),
6768
"Received bundle from Kafka"
6869
);
6970

70-
let bundle_with_metadata = match BundleWithMetadata::load(bundle) {
71-
Ok(b) => b,
72-
Err(e) => {
73-
error!(error = %e, "Failed to load bundle");
74-
continue;
75-
}
76-
};
77-
7871
if let Err(e) = self.publisher.send(bundle_with_metadata) {
7972
error!(error = ?e, "Failed to publish bundle to queue");
8073
}

crates/bundle-pool/tests/integration_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async fn test_kafka_bundle_source_to_pool_integration() -> Result<(), Box<dyn st
6565
let test_bundle = create_test_bundle(vec![tx1], Some(100), None, None);
6666
let test_bundle_uuid = *test_bundle.uuid();
6767

68-
let bundle_payload = serde_json::to_string(test_bundle.bundle())?;
68+
let bundle_payload = serde_json::to_string(&test_bundle)?;
6969

7070
kafka_producer
7171
.send(

crates/core/src/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub struct CancelBundle {
6565
pub replacement_uuid: String,
6666
}
6767

68-
#[derive(Debug, Clone)]
68+
#[derive(Debug, Clone, Serialize, Deserialize)]
6969
pub struct BundleWithMetadata {
7070
bundle: Bundle,
7171
uuid: Uuid,

crates/ingress-rpc/src/queue.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ use anyhow::Result;
33
use async_trait::async_trait;
44
use backon::{ExponentialBuilder, Retryable};
55
use rdkafka::producer::{FutureProducer, FutureRecord};
6-
use tips_core::Bundle;
6+
use tips_core::BundleWithMetadata;
77
use tokio::time::Duration;
88
use tracing::{error, info};
99

1010
/// A queue to buffer transactions
1111
#[async_trait]
1212
pub trait QueuePublisher: Send + Sync {
13-
async fn publish(&self, bundle: &Bundle, bundle_hash: &B256) -> Result<()>;
13+
async fn publish(&self, bundle: &BundleWithMetadata, bundle_hash: &B256) -> Result<()>;
1414
}
1515

1616
/// A queue to buffer transactions
@@ -27,7 +27,7 @@ impl KafkaQueuePublisher {
2727

2828
#[async_trait]
2929
impl QueuePublisher for KafkaQueuePublisher {
30-
async fn publish(&self, bundle: &Bundle, bundle_hash: &B256) -> Result<()> {
30+
async fn publish(&self, bundle: &BundleWithMetadata, bundle_hash: &B256) -> Result<()> {
3131
let key = bundle_hash.to_string();
3232
let payload = serde_json::to_vec(&bundle)?;
3333

@@ -75,7 +75,7 @@ impl QueuePublisher for KafkaQueuePublisher {
7575
mod tests {
7676
use super::*;
7777
use rdkafka::config::ClientConfig;
78-
use tips_core::BundleWithMetadata;
78+
use tips_core::{Bundle, BundleWithMetadata};
7979
use tokio::time::{Duration, Instant};
8080

8181
fn create_test_bundle() -> Bundle {
@@ -97,7 +97,7 @@ mod tests {
9797
let bundle_hash = bundle_with_metadata.bundle_hash();
9898

9999
let start = Instant::now();
100-
let result = publisher.publish(&bundle, &bundle_hash).await;
100+
let result = publisher.publish(&bundle_with_metadata, &bundle_hash).await;
101101
let elapsed = start.elapsed();
102102

103103
// the backoff tries at minimum 100ms, so verify we tried at least once

crates/ingress-rpc/src/service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ where
7070
let bundle_hash = bundle_with_metadata.bundle_hash();
7171
if let Err(e) = self
7272
.bundle_queue
73-
.publish(bundle_with_metadata.bundle(), &bundle_hash)
73+
.publish(&bundle_with_metadata, &bundle_hash)
7474
.await
7575
{
7676
warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e);
@@ -124,7 +124,7 @@ where
124124

125125
if let Err(e) = self
126126
.bundle_queue
127-
.publish(bundle_with_metadata.bundle(), &bundle_hash)
127+
.publish(&bundle_with_metadata, &bundle_hash)
128128
.await
129129
{
130130
warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e);

0 commit comments

Comments
 (0)