Skip to content

Commit

Permalink
fix: prevent long stall during reconnection/disconnection by providin…
Browse files Browse the repository at this point in the history
…g LinkedHashSetQueue instead of ArrayDeque for CommandHandler#stack
  • Loading branch information
okg-cxf committed Aug 8, 2024
1 parent cb73980 commit d3662c0
Show file tree
Hide file tree
Showing 4 changed files with 479 additions and 9 deletions.
39 changes: 34 additions & 5 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class ClientOptions implements Serializable {

public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.create();

public static final boolean DEFAULT_USE_HASH_INDEX_QUEUE = false;

private final boolean autoReconnect;

private final boolean cancelCommandsOnReconnectFailure;
Expand Down Expand Up @@ -97,6 +99,8 @@ public class ClientOptions implements Serializable {

private final TimeoutOptions timeoutOptions;

private final boolean useHashIndexedQueue;

protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
Expand All @@ -112,6 +116,7 @@ protected ClientOptions(Builder builder) {
this.sslOptions = builder.sslOptions;
this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure;
this.timeoutOptions = builder.timeoutOptions;
this.useHashIndexedQueue = builder.useHashIndexedQueue;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -129,6 +134,7 @@ protected ClientOptions(ClientOptions original) {
this.sslOptions = original.getSslOptions();
this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure();
this.timeoutOptions = original.getTimeoutOptions();
this.useHashIndexedQueue = original.isUseHashIndexedQueue();
}

/**
Expand Down Expand Up @@ -192,6 +198,8 @@ public static class Builder {

private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;

private boolean useHashIndexedQueue = DEFAULT_USE_HASH_INDEX_QUEUE;

protected Builder() {
}

Expand Down Expand Up @@ -247,8 +255,8 @@ public Builder bufferUsageRatio(int bufferUsageRatio) {
*
* @param policy the policy to use in {@link io.lettuce.core.protocol.CommandHandler}
* @return {@code this}
* @since 6.0
* @see DecodeBufferPolicies
* @since 6.0
*/
public Builder decodeBufferPolicy(DecodeBufferPolicy policy) {

Expand Down Expand Up @@ -295,8 +303,8 @@ public Builder pingBeforeActivateConnection(boolean pingBeforeActivateConnection
*
* @param protocolVersion version to use.
* @return {@code this}
* @since 6.0
* @see ProtocolVersion#newestSupported()
* @since 6.0
*/
public Builder protocolVersion(ProtocolVersion protocolVersion) {

Expand All @@ -315,9 +323,9 @@ public Builder protocolVersion(ProtocolVersion protocolVersion) {
*
* @param publishOnScheduler true/false
* @return {@code this}
* @since 5.2
* @see org.reactivestreams.Subscriber#onNext(Object)
* @see ClientResources#eventExecutorGroup()
* @since 5.2
*/
public Builder publishOnScheduler(boolean publishOnScheduler) {
this.publishOnScheduler = publishOnScheduler;
Expand Down Expand Up @@ -422,6 +430,20 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) {
return this;
}

/**
* Use hash indexed queue which provides O(1) remove(Object) thus won't cause blocking issues.
*
* @param useHashIndexedQueue true/false
* @return {@code this}
* @see io.lettuce.core.protocol.CommandHandler.AddToStack
* @since 5.1
*/
@SuppressWarnings("JavadocReference")
public Builder useHashIndexQueue(boolean useHashIndexedQueue) {
this.useHashIndexedQueue = useHashIndexedQueue;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand All @@ -439,7 +461,6 @@ public ClientOptions build() {
*
* @return a {@link ClientOptions.Builder} to create new {@link ClientOptions} whose settings are replicated from the
* current {@link ClientOptions}.
*
* @since 5.1
*/
public ClientOptions.Builder mutate() {
Expand Down Expand Up @@ -498,7 +519,6 @@ public DecodeBufferPolicy getDecodeBufferPolicy() {
*
* @return zero.
* @since 5.2
*
* @deprecated since 6.0 in favor of {@link DecodeBufferPolicy}.
*/
@Deprecated
Expand Down Expand Up @@ -637,6 +657,15 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

/**
* Whether we should use hash indexed queue, which provides O(1) remove(Object)
*
* @return if hash indexed queue should be used
*/
public boolean isUseHashIndexedQueue() {
return useHashIndexedQueue;
}

/**
* Behavior of connections in disconnected state.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
package io.lettuce.core.datastructure.queue;

import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;

import io.lettuce.core.internal.LettuceAssert;

/**
* @author chenxiaofan
*/
@SuppressWarnings("unchecked")
public class HashIndexedQueue<E> extends AbstractQueue<E> {

private final Map<E, Object> map; // Object can be Node<E> or List<Node<E>>

private Node<E> head;

private Node<E> tail;

private int size;

private static class Node<E> {

E value;

Node<E> next;

Node<E> prev;

Node(E value) {
this.value = value;
}

}

public HashIndexedQueue() {
map = new HashMap<>();
size = 0;
}

@Override
public boolean add(E e) {
return offer(e);
}

@Override
public boolean offer(E e) {
final Node<E> newNode = new Node<>(e);
if (tail == null) {
head = tail = newNode;
} else {
tail.next = newNode;
newNode.prev = tail;
tail = newNode;
}

if (!map.containsKey(e)) {
map.put(e, newNode);
} else {
Object current = map.get(e);
if (current instanceof Node) {
List<Node<E>> nodes = new ArrayList<>();
nodes.add((Node<E>) current);
nodes.add(newNode);
map.put(e, nodes);
} else {
((List<Node<E>>) current).add(newNode);
}
}
size++;
return true;
}

@Override
public E poll() {
if (head == null) {
return null;
}
E value = head.value;
removeNodeFromMap(head);
head = head.next;
if (head == null) {
tail = null;
} else {
head.prev = null;
}
size--;
return value;
}

@Override
public E peek() {
if (head == null) {
return null;
}
return head.value;
}

@Override
public boolean remove(Object o) {
return removeFirstOccurrence(o);
}

@Override
public int size() {
return size;
}

@Override
public boolean contains(Object o) {
return map.containsKey(o);
}

public class Iterator implements java.util.Iterator<E> {

private Node<E> current;

private Node<E> prev;

private Iterator() {
current = HashIndexedQueue.this.head;
prev = null;
}

@Override
public boolean hasNext() {
return current != null;
}

@Override
public E next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
E value = current.value;
prev = current;
current = current.next;
return value;
}

@Override
public void remove() {
if (prev != null) {
removeNodeFromMap(prev);
removeNode(prev);
size--;
// remove once
prev = null;
}
}

}

@Override
public Iterator iterator() {
return new Iterator();
}

@Override
public boolean removeAll(Collection<?> c) {
boolean modified = false;
for (Object e : c) {
if (removeAllOccurrences(e)) {
modified = true;
}
}
return modified;
}

@Override
public void clear() {
head = null;
tail = null;
map.clear();
size = 0;
}

private boolean removeFirstOccurrence(Object o) {
Object current = map.get(o);
if (current == null) {
return false;
}
if (current instanceof Node) {
Node<E> node = (Node<E>) current;
removeNode(node);
map.remove(o);
} else {
List<Node<E>> nodes = (List<Node<E>>) current;
Node<E> node = nodes.remove(0);
if (nodes.isEmpty()) {
map.remove(o);
}
removeNode(node);
}
size--;
return true;
}

private boolean removeAllOccurrences(Object o) {
Object current = map.get(o);
if (current == null) {
return false;
}
if (current instanceof Node) {
final Node<E> node = (Node<E>) current;
removeNode(node);
size--;
} else {
final List<Node<E>> nodes = (List<Node<E>>) current;
for (Node<E> node : nodes) {
removeNode(node);
size--;
}
}
map.remove(o);
return true;
}

private void removeNode(Node<E> node) {
if (node.prev != null) {
node.prev.next = node.next;
} else {
head = node.next;
}
if (node.next != null) {
node.next.prev = node.prev;
} else {
tail = node.prev;
}
}

private void removeNodeFromMap(Node<E> node) {
E value = node.value;
Object current = map.get(value);
if (current instanceof Node) {
LettuceAssert.assertState(current == node, "current != node");
map.remove(value);
} else {
List<Node<E>> nodes = (List<Node<E>>) current;
final boolean removed = nodes.remove(node);
LettuceAssert.assertState(removed, "!nodes.remove(node)");
if (nodes.isEmpty()) {
map.remove(value);
}
}
}

}
Loading

0 comments on commit d3662c0

Please sign in to comment.