Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding reentrant locks, for raceconditon caused by connectivity issues #14643

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
package org.apache.pinot.server.realtime;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
Expand All @@ -38,13 +39,14 @@

// Singleton class.
public class ControllerLeaderLocator {
private static ControllerLeaderLocator _instance = null;
private static volatile ControllerLeaderLocator _instance;
public static final Logger LOGGER = LoggerFactory.getLogger(ControllerLeaderLocator.class);

// Minimum millis which must elapse between consecutive invalidation of cache
private static final long MIN_INVALIDATE_INTERVAL_MS = 30_000L;

private final HelixManager _helixManager;
private final ReentrantLock _lock = new ReentrantLock();

// Co-ordinates of the last known controller leader for each of the lead-controller every partitions,
// with partition number being the key and controller hostname and port pair being the value. If the lead
Expand All @@ -59,21 +61,23 @@ public class ControllerLeaderLocator {

ControllerLeaderLocator(HelixManager helixManager) {
_helixManager = helixManager;
_cachedControllerLeaderMap = new HashMap<>();
_cachedControllerLeaderMap = new ConcurrentHashMap<>();
}

/**
* To be called once when the server starts
* @param helixManager should already be started
*/
public static void create(HelixManager helixManager) {
if (_instance != null) {
// We create multiple server instances in the hybrid cluster integration tests, so allow the call to create an
// instance even if there is already one.
LOGGER.warn("Already created");
return;
if (_instance == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called only once during server starts, which is always single threaded. I don't think we need to synchronize this

synchronized (ControllerLeaderLocator.class) {
if (_instance == null) {
_instance = new ControllerLeaderLocator(helixManager);
}
}
} else {
LOGGER.warn("ControllerLeaderLocator instance already created.");
}
_instance = new ControllerLeaderLocator(helixManager);
}

public static ControllerLeaderLocator getInstance() {
Expand All @@ -90,7 +94,7 @@ public static ControllerLeaderLocator getInstance() {
* @param rawTableName table name without type.
* @return The host-port pair of the current controller leader.
*/
public synchronized Pair<String, Integer> getControllerLeader(String rawTableName) {
public Pair<String, Integer> getControllerLeader(String rawTableName) {
int partitionId = LeadControllerUtils.getPartitionIdForTable(rawTableName);
if (_cachedControllerLeaderValid) {
return _cachedControllerLeaderMap.get(partitionId);
Expand All @@ -109,6 +113,7 @@ public synchronized Pair<String, Integer> getControllerLeader(String rawTableNam
* Thus, simply exiting the method should be enough. Retry will be done in the next request.
*/
private void refreshControllerLeaderMap() {
_lock.lock();
try {
// Checks whether lead controller resource has been enabled or not.
if (LeadControllerUtils.isLeadControllerResourceEnabled(_helixManager)) {
Expand All @@ -118,6 +123,8 @@ private void refreshControllerLeaderMap() {
}
} catch (Exception e) {
LOGGER.error("Exception when checking whether lead controller resource is enable or not.", e);
} finally {
_lock.unlock();
}
}

Expand Down Expand Up @@ -230,16 +237,21 @@ private Pair<String, Integer> convertToHostAndPortPair(String instanceId) {
* Thus the frequency limiting is done to guard against frequent cache refreshes, in cases where we might be
* getting too many NOT_SENT responses due to some other errors.
*/
public synchronized void invalidateCachedControllerLeader() {
public void invalidateCachedControllerLeader() {
long now = getCurrentTimeMs();
long millisSinceLastInvalidate = now - _lastCacheInvalidationTimeMs;
if (millisSinceLastInvalidate < MIN_INVALIDATE_INTERVAL_MS) {
LOGGER.info("Millis since last controller cache value invalidate {} is less than allowed frequency {}. Skipping "
+ "invalidate.", millisSinceLastInvalidate, MIN_INVALIDATE_INTERVAL_MS);
} else {
LOGGER.info("Invalidating cached controller leader value");
_cachedControllerLeaderValid = false;
_lastCacheInvalidationTimeMs = now;
_lock.lock();
try {
LOGGER.info("Invalidating cached controller leader value");
_cachedControllerLeaderValid = false;
_lastCacheInvalidationTimeMs = now;
} finally {
_lock.unlock();
}
}
}

Expand Down
Loading