Skip to content

Commit 64f8522

Browse files
committed
add back bundlewmetadata
1 parent d264712 commit 64f8522

File tree

3 files changed

+8
-8
lines changed

3 files changed

+8
-8
lines changed

crates/core/src/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub struct CancelBundle {
6868
pub replacement_uuid: String,
6969
}
7070

71-
#[derive(Debug, Clone)]
71+
#[derive(Debug, Clone, Serialize, Deserialize)]
7272
pub struct BundleWithMetadata {
7373
bundle: Bundle,
7474
uuid: Uuid,

crates/ingress-rpc/src/queue.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ use anyhow::Result;
33
use async_trait::async_trait;
44
use backon::{ExponentialBuilder, Retryable};
55
use rdkafka::producer::{FutureProducer, FutureRecord};
6-
use tips_core::Bundle;
6+
use tips_core::BundleWithMetadata;
77
use tokio::time::Duration;
88
use tracing::{error, info};
99

1010
/// A queue to buffer transactions
1111
#[async_trait]
1212
pub trait QueuePublisher: Send + Sync {
13-
async fn publish(&self, bundle: &Bundle, bundle_hash: &B256) -> Result<()>;
13+
async fn publish(&self, bundle: &BundleWithMetadata, bundle_hash: &B256) -> Result<()>;
1414
}
1515

1616
/// A queue to buffer transactions
@@ -27,7 +27,7 @@ impl KafkaQueuePublisher {
2727

2828
#[async_trait]
2929
impl QueuePublisher for KafkaQueuePublisher {
30-
async fn publish(&self, bundle: &Bundle, bundle_hash: &B256) -> Result<()> {
30+
async fn publish(&self, bundle: &BundleWithMetadata, bundle_hash: &B256) -> Result<()> {
3131
let key = bundle_hash.to_string();
3232
let payload = serde_json::to_vec(&bundle)?;
3333

@@ -75,7 +75,7 @@ impl QueuePublisher for KafkaQueuePublisher {
7575
mod tests {
7676
use super::*;
7777
use rdkafka::config::ClientConfig;
78-
use tips_core::BundleWithMetadata;
78+
use tips_core::{Bundle, BundleWithMetadata};
7979
use tokio::time::{Duration, Instant};
8080

8181
fn create_test_bundle() -> Bundle {
@@ -97,7 +97,7 @@ mod tests {
9797
let bundle_hash = bundle_with_metadata.bundle_hash();
9898

9999
let start = Instant::now();
100-
let result = publisher.publish(&bundle, &bundle_hash).await;
100+
let result = publisher.publish(&bundle_with_metadata, &bundle_hash).await;
101101
let elapsed = start.elapsed();
102102

103103
// the backoff tries at minimum 100ms, so verify we tried at least once

crates/ingress-rpc/src/service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ where
7373
let bundle_hash = bundle_with_metadata.bundle_hash();
7474
if let Err(e) = self
7575
.bundle_queue
76-
.publish(bundle_with_metadata.bundle(), &bundle_hash)
76+
.publish(&bundle_with_metadata, &bundle_hash)
7777
.await
7878
{
7979
warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e);
@@ -129,7 +129,7 @@ where
129129

130130
if let Err(e) = self
131131
.bundle_queue
132-
.publish(bundle_with_metadata.bundle(), &bundle_hash)
132+
.publish(&bundle_with_metadata, &bundle_hash)
133133
.await
134134
{
135135
warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e);

0 commit comments

Comments
 (0)