Skip to content

Commit

Permalink
refactor: remove tryEnterUnsafe/exitUnsafe
Browse files Browse the repository at this point in the history
  • Loading branch information
okg-cxf committed Aug 8, 2024
1 parent b4a90ab commit fb65b0a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,14 @@ public HasOngoingSendLoop() {
*
* @return true if entered the loop, false if already have a running loop.
*/
public boolean tryEnterSafeGetVolatile() {
public boolean tryEnter() {
return safe.get() == 0 && /* rare case if QPS is high */ safe.compareAndSet(0, 1);
}

public void exitSafe() {
public void exit() {
safe.set(0);
}

/**
* This method is not thread safe, can only be used from single thread.
*
* @return true if the value was updated
*/
public boolean tryEnterUnsafe() {
if (unsafe) {
return false;
}
unsafe = true;
return true;
}

public void exitUnsafe() {
unsafe = false;
}

}

BatchFlushEndPointContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,24 +601,24 @@ private void scheduleSendJobOnConnected(final ContextualChannel chan) {
LettuceAssert.assertState(chan.eventLoop().inEventLoop(), "must be called in event loop thread");

// Schedule directly
if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterSafeGetVolatile()) {
scheduleSendJobInEventLoopIfNeeded(chan);
if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnter()) {
loopSend(chan);
}
// Otherwise:
// someone will do the job for us
}

private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
final EventLoop eventLoop = chan.eventLoop();
if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterSafeGetVolatile()) {
if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnter()) {
// Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get):
// 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls
// Avg latency: 3.2956217278663s
// Avg QPS: 495238.50056392356/s
// 2. uses eventLoop.execute() directly
// Avg latency: 3.2677197021496998s
// Avg QPS: 476925.0751855796/s
eventLoop.execute(() -> scheduleSendJobInEventLoopIfNeeded(chan));
eventLoop.execute(() -> loopSend(chan));
}

// Otherwise:
Expand All @@ -629,16 +629,6 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
// second loopSend0(), which will call poll()
}

private void scheduleSendJobInEventLoopIfNeeded(final ContextualChannel chan) {
// Guarantee only 1 send loop.
BatchFlushEndPointContext.HasOngoingSendLoop hasOngoingSendLoop = chan.context.batchFlushEndPointContext.hasOngoingSendLoop;
if (hasOngoingSendLoop.tryEnterUnsafe()) {
loopSend(chan);
} else {
hasOngoingSendLoop.exitSafe();
}
}

private void loopSend(final ContextualChannel chan) {
final ConnectionContext connectionContext = chan.context;
final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext.batchFlushEndPointContext;
Expand All @@ -651,7 +641,7 @@ private void loopSend(final ContextualChannel chan) {
}

private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext, final ContextualChannel chan,
int remainingSpinnCount, final boolean exitedSafe) {
int remainingSpinnCount, final boolean exited) {
do {
final int count = pollBatch(batchFlushEndPointContext, chan);
if (count < 0) {
Expand All @@ -669,12 +659,10 @@ private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext
return;
}

if (exitedSafe) {
if (!exited) {
// The send loop will be triggered later when a new task is added,
batchFlushEndPointContext.hasOngoingSendLoop.exitUnsafe();
} else {
// // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call.
batchFlushEndPointContext.hasOngoingSendLoop.exitSafe();
batchFlushEndPointContext.hasOngoingSendLoop.exit();
// // Guarantee thread-safety: no dangling tasks in the queue.
loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, true);
// chan.eventLoop().schedule(() -> loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false), 100,
Expand Down

0 comments on commit fb65b0a

Please sign in to comment.