Skip to content

Commit

Permalink
chore: handle eventLoop.inEventLoop() case of scheduleSendJobIfNeeded()
Browse files Browse the repository at this point in the history
  • Loading branch information
okg-cxf committed Aug 8, 2024
1 parent e989353 commit 460f596
Showing 1 changed file with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -601,15 +601,17 @@ private void scheduleSendJobOnConnected(final ContextualChannel chan) {
LettuceAssert.assertState(chan.eventLoop().inEventLoop(), "must be called in event loop thread");

// Schedule directly
if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnter()) {
loopSend(chan);
}
// Otherwise:
// someone will do the job for us
loopSend(chan, false);
}

private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
final EventLoop eventLoop = chan.eventLoop();
if (eventLoop.inEventLoop()) {
// Possible in reactive() mode.
loopSend(chan, false);
return;
}

if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnter()) {
// Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get):
// 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls
Expand All @@ -618,7 +620,7 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
// 2. uses eventLoop.execute() directly
// Avg latency: 3.2677197021496998s
// Avg QPS: 476925.0751855796/s
eventLoop.execute(() -> loopSend(chan));
eventLoop.execute(() -> loopSend(chan, true));
}

// Otherwise:
Expand All @@ -629,19 +631,19 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
// second loopSend0(), which will call poll()
}

private void loopSend(final ContextualChannel chan) {
private void loopSend(final ContextualChannel chan, boolean entered) {
final ConnectionContext connectionContext = chan.context;
final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext.batchFlushEndPointContext;
if (connectionContext.isChannelInactiveEventFired() || batchFlushEndPointContext.hasRetryableFailedToSendTasks()) {
return;
}

LettuceAssert.assertState(channel == chan, "unexpected: channel not match but closeStatus == null");
loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false);
loopSend0(batchFlushEndPointContext, chan, writeSpinCount, entered);
}

private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext, final ContextualChannel chan,
int remainingSpinnCount, final boolean exited) {
int remainingSpinnCount, final boolean entered) {
do {
final int count = pollBatch(batchFlushEndPointContext, chan);
if (count < 0) {
Expand All @@ -655,16 +657,16 @@ private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext

if (remainingSpinnCount <= 0) {
// Don't need to exitUnsafe since we still have an ongoing consume tasks in this thread.
chan.eventLoop().execute(() -> loopSend(chan));
chan.eventLoop().execute(() -> loopSend(chan, entered));
return;
}

if (!exited) {
if (entered) {
// The send loop will be triggered later when a new task is added,
// // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call.
batchFlushEndPointContext.hasOngoingSendLoop.exit();
// // Guarantee thread-safety: no dangling tasks in the queue.
loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, true);
loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, false);
// chan.eventLoop().schedule(() -> loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false), 100,
// TimeUnit.NANOSECONDS);
}
Expand Down

0 comments on commit 460f596

Please sign in to comment.