Skip to content

Commit

Permalink
Implement basic NACK functionality to queue (#557)
Browse files Browse the repository at this point in the history
Adds simple NACK functionality where messages are reinserted to the back of the
queue with no delay on being marked as not acknowledged.
  • Loading branch information
svix-daniel authored Jul 12, 2022
1 parent 5e4719a commit 37630ab
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 19 deletions.
4 changes: 0 additions & 4 deletions server/svix-server/src/queue/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ impl TaskQueueSend for MemoryQueueProducer {
Ok(())
}

async fn nack(&self, _delivery: TaskQueueDelivery) -> Result<()> {
Ok(())
}

fn clone_box(&self) -> Box<dyn TaskQueueSend> {
Box::new(self.clone())
}
Expand Down
14 changes: 7 additions & 7 deletions server/svix-server/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,13 @@ trait TaskQueueSend: Sync + Send {
fn clone_box(&self) -> Box<dyn TaskQueueSend>;

async fn ack(&self, delivery: TaskQueueDelivery) -> Result<()>;
async fn nack(&self, delivery: TaskQueueDelivery) -> Result<()>;

/// By default NACKing a [`TaskQueueDelivery`] simply reinserts it in the back of the queue without
/// any delay.
async fn nack(&self, delivery: TaskQueueDelivery) -> Result<()> {
tracing::debug!("nack {}", delivery.id);
self.send(delivery.task, None).await
}
}

impl Clone for Box<dyn TaskQueueSend> {
Expand Down Expand Up @@ -248,11 +254,7 @@ mod tests {
assert_recv(&mut rx_mem, msg_1).await;
}

// TODO: Eliminate code duplication in ack and nack tests

#[tokio::test]
#[ignore]
// ack only works with the Redis queue as of present, so this test is ignored for now
async fn test_ack() {
let (tx_mem, mut rx_mem) = memory::new_pair().await;
assert!(tx_mem
Expand Down Expand Up @@ -283,8 +285,6 @@ mod tests {
}

#[tokio::test]
#[ignore]
// nack only works with the Redis queue as of present, so this test is ignored for now
async fn test_nack() {
let (tx_mem, mut rx_mem) = memory::new_pair().await;
assert!(tx_mem
Expand Down
50 changes: 42 additions & 8 deletions server/svix-server/src/queue/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,6 @@ impl TaskQueueSend for RedisQueueProducer {
Ok(())
}

async fn nack(&self, delivery: TaskQueueDelivery) -> Result<()> {
tracing::error!(
"NACKing is not yet supported | Delivery ID: {}",
delivery.id
);
Ok(())
}

fn clone_box(&self) -> Box<dyn TaskQueueSend> {
Box::new(self.clone())
}
Expand Down Expand Up @@ -788,6 +780,48 @@ pub mod tests {
}
}

#[tokio::test]
async fn test_nack() {
let cfg = crate::cfg::load().unwrap();
let pool = get_pool(cfg).await;

let (p, mut c) = new_pair_inner(
pool,
Duration::from_millis(500),
"",
"{test}_nack",
"{test}_nack_delayed",
)
.await;

tokio::time::sleep(Duration::from_millis(550)).await;

flush_stale_queue_items(p.clone(), &mut c).await;

let mt = QueueTask::MessageV1(MessageTask {
msg_id: MessageId("test".to_owned()),
app_id: ApplicationId("test".to_owned()),
endpoint_id: EndpointId("test".to_owned()),
trigger_type: MessageAttemptTriggerType::Manual,
attempt_count: 0,
});
p.send(mt.clone(), None).await.unwrap();

let recv = c.receive_all().await.unwrap().pop().unwrap();
assert_eq!(recv.task, mt);
p.nack(recv).await.unwrap();

tokio::select! {
recv = c.receive_all() => {
assert_eq!(recv.unwrap().pop().unwrap().task, mt);
}

_ = tokio::time::sleep(Duration::from_secs(1)) => {
panic!("Expected QueueTask");
}
}
}

#[tokio::test]
async fn test_delay() {
let cfg = crate::cfg::load().unwrap();
Expand Down

0 comments on commit 37630ab

Please sign in to comment.