Skip to content

Conversation

@prathyushpv
Copy link
Contributor

@prathyushpv prathyushpv commented Oct 9, 2025

What changed?

Add a lock in DLQWriter that will serialize writes to a queue.

Why?

DLQ uses CAS operations in persistence layer. All shards can write tasks to DLQ if multiple tasks start failing with
terminal errors. This can cause a large number of writes to fail because of CAS error. We can prevent this by serializing writes to the queue.

CAS errors are still possible since multiple history hosts can write to the same queue. But less operations will fail with this change.

How did you test it?

  • built
  • run locally and tested manually in a test cluster
  • covered by existing tests
  • added new unit test(s)
  • added new functional test(s)

@prathyushpv prathyushpv requested review from a team as code owners October 9, 2025 17:39
Copy link
Contributor

@carlydf carlydf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approved with comment

// from multiple shards causing CAS conflicts in the persistence layer.
mu := q.getQueueMutex(queueKey)
mu.Lock()
defer mu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at all the stuff that happens in this function, it seems to me that it would be slightly more efficient to release the lock on after line 98, or even right after line 95 (basically as soon as q.dlqWriter.EnqueueTask completes).
The rest of the function does metrics and logging which don't need to lock to be correct. If you still want to keep the benefit of being able to defer mu.Unlock(), you could move the write part out into its own function, so that you can release the lock and then update metrics. Maybe the metrics stuff is so fast that it doesnt even affect lock contention, but idk everything helps 🤷‍♀️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like

func doLockedWrite(...) error {
	// Acquire a process-level lock for this specific DLQ to prevent concurrent writes
	// from multiple shards causing CAS conflicts in the persistence layer.
	mu := q.getQueueMutex(queueKey)
	mu.Lock()
	defer mu.Unlock()

	_, err := q.dlqWriter.CreateQueue(ctx, &persistence.CreateQueueRequest{
		QueueKey: queueKey,
	})
	if err != nil {
		if !errors.Is(err, persistence.ErrQueueAlreadyExists) {
			return fmt.Errorf("%w: %v", ErrCreateDLQ, err)
		}
	}
	resp, err := q.dlqWriter.EnqueueTask(ctx, &persistence.EnqueueTaskRequest{
		QueueType:     queueKey.QueueType,
		SourceCluster: queueKey.SourceCluster,
		TargetCluster: queueKey.TargetCluster,
		Task:          task,
		SourceShardID: sourceShardID,
	})
	if err != nil {
		return fmt.Errorf("%w: %v", ErrSendTaskToDLQ, err)
	}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense! Thanks! Let me make the change

@prathyushpv prathyushpv enabled auto-merge (squash) October 15, 2025 16:45
@prathyushpv prathyushpv merged commit b5382c3 into main Oct 15, 2025
57 checks passed
@prathyushpv prathyushpv deleted the ppv/dlqCAS branch October 15, 2025 17:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants