Skip to content

Conversation

@DeStefaniAndrei
Copy link
Contributor

  • Add KafkaUserOpAuditArchiver struct:
    Generic struct over UserOpEventReader and UserOpEventWriter traits
    Runs an infinite loop that:
    1. Reads UserOp events from Kafka
    2. Spawns async tasks to write events to S3
    3. Commits Kafka offsets after successful writes
    4. Records metrics for monitoring

End-to-End Integration Tests (integration_tests.rs)

Added 7 end-to-end tests using Docker (Kafka + MinIO) covering single events, multiple events, deduplication, all event types, and full lifecycle.

- Add KafkaUserOpAuditArchiver struct
- Implements read → write → commit loop for UserOp events
- Add integration test (ignored due to MinIO etag limitation)
.unwrap_or_default()
.as_millis() as i64;
let event_age_ms = now_ms.saturating_sub(event.timestamp);
self.metrics.event_age.record(event_age_ms as f64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT of having the audit metrics separated by tags?

Similar to how we record metrics for RPC calls here: https://github.com/base/tips/blob/master/crates/ingress-rpc/src/metrics.rs#L5-L8 , and instead we can call it type (or a better name 😅)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants