Skip to content

Commit

Permalink
Optimize code style
Browse files Browse the repository at this point in the history
  • Loading branch information
wanghuaiyuan committed Sep 26, 2024
1 parent 568f412 commit f674f1b
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 39 deletions.
4 changes: 2 additions & 2 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.lock.AdaptiveBackOffLock;
import org.apache.rocketmq.store.lock.AdaptiveBackOffLockImpl;
import org.apache.rocketmq.store.lock.CollisionRetreatLock;
import org.apache.rocketmq.store.lock.BackOffSpinLock;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.queue.MultiDispatchUtils;
import org.apache.rocketmq.store.util.LibC;
Expand Down Expand Up @@ -134,7 +134,7 @@ protected PutMessageThreadLocal initialValue() {
}
};
this.putMessageLock = messageStore.getMessageStoreConfig().getUseABSLock() ? new AdaptiveBackOffLockImpl() :
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new CollisionRetreatLock();
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new BackOffSpinLock();

this.flushDiskWatcher = new FlushDiskWatcher();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class AdaptiveBackOffLockImpl implements AdaptiveBackOffLock {
//state
private AtomicBoolean state = new AtomicBoolean(true);

private final static float SWAP_MUTEX_LOCK_RATIO = 0.8f;

private final static float SPIN_LOCK_ADAPTIVE_RATIO = 1.5f;

private Map<String, AdaptiveBackOffLock> locks;

private final List<AtomicInteger> tpsTable;
Expand All @@ -46,18 +50,18 @@ public class AdaptiveBackOffLockImpl implements AdaptiveBackOffLock {
public AdaptiveBackOffLockImpl() {
this.locks = new HashMap<>();
this.locks.put("Reentrant", new PutMessageReentrantLock());
this.locks.put("Collision", new CollisionRetreatLock());
this.locks.put("BackOff", new BackOffSpinLock());

this.tpsTable = new ArrayList<>(2);
this.tpsTable.add(new AtomicInteger(0));
this.tpsTable.add(new AtomicInteger(0));

adaptiveLock = this.locks.get("Collision");
adaptiveLock = this.locks.get("BackOff");
}

@Override
public void lock() {
tpsTable.get(LocalTime.now().getSecond() % 2).getAndIncrement();
this.tpsTable.get(LocalTime.now().getSecond() % 2).getAndIncrement();
boolean state;
do {
state = this.state.get();
Expand Down Expand Up @@ -94,8 +98,8 @@ public void swap() {
return;
}

if (this.adaptiveLock instanceof CollisionRetreatLock) {
CollisionRetreatLock lock = (CollisionRetreatLock) this.adaptiveLock;
if (this.adaptiveLock instanceof BackOffSpinLock) {
BackOffSpinLock lock = (BackOffSpinLock) this.adaptiveLock;
int base = Math.min(200 + tps / 200, 500);
if (lock.getNumberOfRetreat(slot) * base >= tps) {
if (lock.isAdapt()) {
Expand All @@ -104,12 +108,12 @@ public void swap() {
this.tpsSwapCriticalPoint = tps;
needSwap = true;
}
} else if (lock.getNumberOfRetreat(slot) * base * 3 / 2 <= tps) {
} else if (lock.getNumberOfRetreat(slot) * base * SPIN_LOCK_ADAPTIVE_RATIO <= tps) {
lock.adapt(false);
}
lock.setNumberOfRetreat(slot, 0);
} else {
if (tps <= this.tpsSwapCriticalPoint * 4 / 5) {
if (tps <= this.tpsSwapCriticalPoint * SWAP_MUTEX_LOCK_RATIO) {
needSwap = true;
}
}
Expand All @@ -122,11 +126,11 @@ public void swap() {
} while (currentThreadNum != 0);

try {
if (this.adaptiveLock instanceof CollisionRetreatLock) {
if (this.adaptiveLock instanceof BackOffSpinLock) {
this.adaptiveLock = this.locks.get("Reentrant");
} else {
this.adaptiveLock = this.locks.get("Collision");
((CollisionRetreatLock) this.adaptiveLock).adapt(false);
this.adaptiveLock = this.locks.get("BackOff");
((BackOffSpinLock) this.adaptiveLock).adapt(false);
}
} catch (Exception e) {
//ignore
Expand All @@ -152,9 +156,9 @@ public void isOpen(boolean open, boolean isUseReentrantLock) {
} while (currentThreadNum != 0);

if (open) {
adaptiveLock = this.locks.get("Collision");
adaptiveLock = this.locks.get("BackOff");
} else {
adaptiveLock = !isUseReentrantLock ? this.locks.get("Collision") : this.locks.get("Reentrant");
adaptiveLock = !isUseReentrantLock ? this.locks.get("BackOff") : this.locks.get("Reentrant");
}
state.set(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,24 @@
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class CollisionRetreatLock implements AdaptiveBackOffLock {
public class BackOffSpinLock implements AdaptiveBackOffLock {

private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

private int optimalDegree;

private int initOptimalDegree;
private final static int INITIAL_DEGREE = 1000;

private int maxOptimalDegree;
private final static int MAX_OPTIMAL_DEGREE = 10000;

private final List<AtomicInteger> numberOfRetreat;

public CollisionRetreatLock() {
this.initOptimalDegree = 1000;
this.maxOptimalDegree = 10000;
this.optimalDegree = initOptimalDegree;
public BackOffSpinLock() {
this.optimalDegree = INITIAL_DEGREE;

numberOfRetreat = new ArrayList<>(2);
numberOfRetreat.add(new AtomicInteger(0));
Expand Down Expand Up @@ -78,33 +77,25 @@ public void setOptimalDegree(int optimalDegree) {
this.optimalDegree = optimalDegree;
}

public int getMaxOptimalDegree() {
return maxOptimalDegree;
}

public void setMaxOptimalDegree(int maxOptimalDegree) {
this.maxOptimalDegree = maxOptimalDegree;
}

public boolean isAdapt() {
return optimalDegree < maxOptimalDegree;
return optimalDegree < MAX_OPTIMAL_DEGREE;
}

public synchronized void adapt(boolean isRise) {
if (isRise) {
if (optimalDegree * 2 <= maxOptimalDegree) {
if (optimalDegree * 2 <= MAX_OPTIMAL_DEGREE) {
optimalDegree *= 2;
} else {
if (optimalDegree + initOptimalDegree <= maxOptimalDegree) {
optimalDegree += initOptimalDegree;
if (optimalDegree + INITIAL_DEGREE <= MAX_OPTIMAL_DEGREE) {
optimalDegree += INITIAL_DEGREE;
}
}
} else {
if (optimalDegree / 2 >= initOptimalDegree) {
if (optimalDegree / 2 >= INITIAL_DEGREE) {
optimalDegree /= 2;
} else {
if (optimalDegree > 2 * initOptimalDegree) {
optimalDegree -= initOptimalDegree;
if (optimalDegree > 2 * INITIAL_DEGREE) {
optimalDegree -= MAX_OPTIMAL_DEGREE;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void init() {

@Test
public void testAdaptiveLock() throws InterruptedException {
assertTrue(adaptiveLock.getAdaptiveLock() instanceof CollisionRetreatLock);
assertTrue(adaptiveLock.getAdaptiveLock() instanceof BackOffSpinLock);

adaptiveLock.lock();
new Thread(new Runnable() {
Expand All @@ -46,7 +46,7 @@ public void run() {
}).start();
Thread.sleep(1000);
adaptiveLock.unlock();
assertEquals(2000, ((CollisionRetreatLock) adaptiveLock.getAdaptiveLock()).getOptimalDegree());
assertEquals(2000, ((BackOffSpinLock) adaptiveLock.getAdaptiveLock()).getOptimalDegree());



Expand All @@ -68,6 +68,6 @@ public void run() {
adaptiveLock.lock();
Thread.sleep(1000);
adaptiveLock.unlock();
assertTrue(adaptiveLock.getAdaptiveLock() instanceof CollisionRetreatLock);
assertTrue(adaptiveLock.getAdaptiveLock() instanceof BackOffSpinLock);
}
}

0 comments on commit f674f1b

Please sign in to comment.