Skip to content

Commit 9aff209

Browse files
committed
Audit tests WIP
1 parent 7a13ec8 commit 9aff209

File tree

7 files changed

+124
-575
lines changed

7 files changed

+124
-575
lines changed

CLAUDE.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
## Notes
44
- DO NOT ADD COMMENTS UNLESS INSTRUCTED
55
- Put imports at the top of the file, never in functions
6-
- Always run `just ci` before claiming a task is complete and fix any issues
6+
- For rust tasks always run `just rust-ci` before claiming a task is complete and fix any issues
7+
- For typescript tasks always run `just ui-ci` before claiming a task is complete and fix any issues
78
- Use `just fix` to fix formatting and warnings
89
- Always add dependencies to the cargo.toml in the root and reference them in the crate cargo files
910
- Always use the latest dependency versions. Use https://crates.io/ to find dependency versions when adding new deps

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,12 @@ serde_json = "1.0.143"
4848
dotenvy = "0.15.7"
4949
testcontainers = { version = "0.23.1", features = ["blocking"] }
5050
testcontainers-modules = { version = "0.11.2", features = ["postgres", "kafka", "minio"] }
51+
futures-util = "0.3.32"
5152

5253
# Kafka and S3 dependencies
5354
rdkafka = { version = "0.37.0", features = ["libz-static"] }
54-
aws-sdk-s3 = "1.62.0"
55-
aws-config = "1.6.0"
55+
aws-config = "1.1.7"
56+
aws-sdk-s3 = "1.106.0"
5657
bytes = { version = "1.8.0", features = ["serde"] }
5758
md5 = "0.7.0"
5859
base64 = "0.22.1"

crates/audit/Cargo.toml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,14 @@ clap = { workspace = true }
2222

2323
# Additional dependencies for Kafka and S3
2424
rdkafka = { workspace = true }
25-
aws-sdk-s3 = { workspace = true }
2625
aws-config = { workspace = true }
26+
aws-sdk-s3 = { workspace = true }
2727
bytes = { workspace = true }
2828
md5 = { workspace = true }
29-
base64 = { workspace = true }
29+
futures = "0.3.31"
3030

3131
[dev-dependencies]
3232
tokio-test = "0.4.4"
3333
testcontainers = { workspace = true }
3434
testcontainers-modules = { workspace = true }
35-
eyre = { workspace = true }
36-
aws-credential-types = "1.2"
35+
eyre = { workspace = true }

crates/audit/src/archiver.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
1-
use std::time::Duration;
2-
use crate::reader::{KafkaMempoolReader, MempoolEventReader, TimestampedEvent};
1+
use crate::reader::{KafkaMempoolReader, MempoolEventReader};
32
use crate::storage::{MempoolEventWriter, S3MempoolEventWriter};
43
use anyhow::Result;
4+
use aws_sdk_s3::Client as S3Client;
5+
use std::time::Duration;
56
use tokio::time::sleep;
67
use tracing::{error, info};
78

8-
99
pub struct KafkaMempoolArchiver {
1010
reader: KafkaMempoolReader,
1111
writer: S3MempoolEventWriter,
1212
}
1313

14-
1514
impl KafkaMempoolArchiver {
1615
pub async fn new(
1716
kafka_brokers: &str,
@@ -20,7 +19,10 @@ impl KafkaMempoolArchiver {
2019
bucket: String,
2120
) -> Result<Self> {
2221
let reader = KafkaMempoolReader::new(kafka_brokers, topic, group_id).await?;
23-
let writer = S3MempoolEventWriter::new(bucket).await?;
22+
23+
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
24+
let s3_client = S3Client::new(&config);
25+
let writer = S3MempoolEventWriter::new(s3_client, bucket);
2426

2527
Ok(Self { reader, writer })
2628
}

crates/audit/src/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,4 +122,4 @@ impl MempoolEventReader for KafkaMempoolReader {
122122
}
123123
Ok(())
124124
}
125-
}
125+
}

crates/audit/src/storage.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use crate::types::{BundleId, MempoolEvent, TransactionId};
22
use anyhow::Result;
33
use async_trait::async_trait;
4-
use aws_config::BehaviorVersion;
5-
use aws_sdk_s3::{primitives::ByteStream, Client as S3Client};
6-
use base64;
4+
use aws_sdk_s3::primitives::ByteStream;
5+
use aws_sdk_s3::Client as S3Client;
76
use serde::{Deserialize, Serialize};
87
use std::collections::HashSet;
98
use std::fmt;
@@ -51,14 +50,7 @@ pub struct S3MempoolEventWriter {
5150
}
5251

5352
impl S3MempoolEventWriter {
54-
pub async fn new(bucket: String) -> Result<Self> {
55-
let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
56-
let s3_client = S3Client::new(&config);
57-
58-
Ok(Self { s3_client, bucket })
59-
}
60-
61-
pub fn with_client(s3_client: S3Client, bucket: String) -> Self {
53+
pub fn new(s3_client: S3Client, bucket: String) -> Self {
6254
Self { s3_client, bucket }
6355
}
6456

@@ -197,7 +189,6 @@ impl S3MempoolEventWriter {
197189
async fn put_object_idempotent(&self, key: &str, data: Vec<u8>) -> Result<()> {
198190
let md5_digest = md5::compute(&data);
199191
let content_hash_hex = format!("{:x}", md5_digest);
200-
let content_hash_base64 = base64::encode(&md5_digest[..]);
201192

202193
if let Ok(existing) = self.get_object_etag(key).await {
203194
if existing.trim_matches('"') == content_hash_hex {
@@ -211,14 +202,13 @@ impl S3MempoolEventWriter {
211202
}
212203

213204
let data_size = data.len();
214-
let stream = ByteStream::from(data);
205+
let body = ByteStream::from(data);
215206

216207
self.s3_client
217208
.put_object()
218209
.bucket(&self.bucket)
219210
.key(key)
220-
.body(stream)
221-
.content_md5(&content_hash_base64)
211+
.body(body)
222212
.send()
223213
.await?;
224214

@@ -240,7 +230,7 @@ impl S3MempoolEventWriter {
240230
.send()
241231
.await?;
242232

243-
Ok(response.e_tag().unwrap_or_default().to_string())
233+
Ok(response.e_tag().unwrap_or("").to_string())
244234
}
245235

246236
async fn get_bundle_transaction_hashes(&self, bundle_id: BundleId) -> Result<Vec<String>> {
@@ -292,7 +282,7 @@ impl S3MempoolEventWriter {
292282
.await?;
293283

294284
let body = response.body.collect().await?;
295-
Ok(String::from_utf8(body.to_vec())?)
285+
Ok(String::from_utf8(body.into_bytes().to_vec())?)
296286
}
297287
}
298288

0 commit comments

Comments
 (0)