Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: allow concurrent decompress away from network loop #162

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 128 additions & 43 deletions evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import com.netflix.archaius.api.PropertyRepository;
import com.netflix.evcache.util.Pair;
import net.spy.memcached.transcoders.TranscodeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,16 +83,27 @@ public class EVCacheMemcachedClient extends MemcachedClient {
private final Property<Integer> maxReadDuration, maxWriteDuration;
private final Property<Boolean> enableDebugLogsOnWrongKey;

private volatile boolean alwaysDecodeSync;

public EVCacheMemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs,
Property<Integer> readTimeout, EVCacheClient client) throws IOException {
super(cf, addrs);
this.connectionFactory = cf;
this.readTimeout = readTimeout;
this.client = client;
this.appName = client.getAppName();
this.maxWriteDuration = EVCacheConfig.getInstance().getPropertyRepository().get(appName + ".max.write.duration.metric", Integer.class).orElseGet("evcache.max.write.duration.metric").orElse(50);
this.maxReadDuration = EVCacheConfig.getInstance().getPropertyRepository().get(appName + ".max.read.duration.metric", Integer.class).orElseGet("evcache.max.read.duration.metric").orElse(20);
this.enableDebugLogsOnWrongKey = EVCacheConfig.getInstance().getPropertyRepository().get(appName + ".enable.debug.logs.on.wrongkey", Boolean.class).orElse(false);
final PropertyRepository props = EVCacheConfig.getInstance().getPropertyRepository();
this.maxWriteDuration = props.get(appName + ".max.write.duration.metric", Integer.class).orElseGet("evcache.max.write.duration.metric").orElse(50);
this.maxReadDuration = props.get(appName + ".max.read.duration.metric", Integer.class).orElseGet("evcache.max.read.duration.metric").orElse(20);
this.enableDebugLogsOnWrongKey = props.get(appName + ".enable.debug.logs.on.wrongkey", Boolean.class).orElse(false);

// TODO in future remove this flag so that decode does not block the IO loop
// the default/legacy behavior (true) is effectively to decode on the IO loop, set to false to use the transcode threads
this.alwaysDecodeSync = true;
props.get(appName + ".get.alwaysDecodeSync", Boolean.class)
.orElseGet("evcache.get.alwaysDecodeSync")
.orElse(true)
.subscribe(v -> alwaysDecodeSync = v);
}

public NodeLocator getNodeLocator() {
Expand Down Expand Up @@ -124,62 +140,100 @@ private boolean isWrongKeyReturned(String original_key, String returned_key) {
}

public <T> EVCacheOperationFuture<T> asyncGet(final String key, final Transcoder<T> tc, EVCacheGetOperationListener<T> listener) {
final CountDownLatch latch = new CountDownLatch(1);
final EVCacheOperationFuture<T> rv = new EVCacheOperationFuture<T>(key, latch, new AtomicReference<T>(null), readTimeout.get().intValue(), executorService, client);
// we should only complete the latch when decode AND complete have completed
final CountDownLatch latch = new CountDownLatch(2);

final EVCacheOperationFuture<T> rv = new EVCacheOperationFuture<>(
key, latch, new AtomicReference<T>(null), readTimeout.get(), executorService, client);

final DistributionSummary dataSizeDS = getDataSizeDistributionSummary(
EVCacheMetricsFactory.GET_OPERATION,
EVCacheMetricsFactory.READ,
EVCacheMetricsFactory.IPC_SIZE_INBOUND);

@SuppressWarnings("unchecked")
final Transcoder<T> transcoder = (tc == null) ? (Transcoder<T>) getTranscoder() : tc;
final boolean shouldLog = log.isDebugEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName);

final Operation op = opFact.get(key, new GetOperation.Callback() {
private Future<T> val = null;
// not volatile since only ever used from memcached loop callbacks
private boolean asyncDecodeIssued = false;

public void receivedStatus(OperationStatus status) {
if (log.isDebugEnabled()) log.debug("Getting Key : " + key + "; Status : " + status.getStatusCode().name() + (log.isTraceEnabled() ? " Node : " + getEVCacheNode(key) : "")
+ "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime()));
try {
if (val != null) {
if (log.isTraceEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName)) log.trace("Key : " + key + "; val : " + val.get());
rv.set(val.get(), status);
// both volatile to ensure sync across transcode threads and memcached loop
private volatile T value;
private volatile OperationStatus status = null;

public void gotData(String k, int flags, byte[] data) {
if (isWrongKeyReturned(key, k)) {
return;
}

if (shouldLog) {
log.debug("Read data : key {}; flags : {}; data : {}", key, flags, data);
if (data != null) {
log.debug("Key : {}; val size : {}", key, data.length);
} else {
if (log.isTraceEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName)) log.trace("Key : " + key + "; val is null");
rv.set(null, status);
log.debug("Key : {}; val is null", key);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
rv.set(null, status);
}
}

@SuppressWarnings("unchecked")
public void gotData(String k, int flags, byte[] data) {
if (data != null) {
dataSizeDS.record(data.length);

if (isWrongKeyReturned(key, k)) return;
if (tcService == null) {
log.error("tcService is null, will not be able to decode");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do latch.countDown() here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 2b60f17, I restructured it too. Hopefully reduce chance of bugs like this.

throw new RuntimeException("TranscoderSevice is null. Not able to decode");
}

if (log.isDebugEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName)) log.debug("Read data : key " + key + "; flags : " + flags + "; data : " + data);
if (data != null) {
if (log.isDebugEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName)) log.debug("Key : " + key + "; val size : " + data.length);
getDataSizeDistributionSummary(EVCacheMetricsFactory.GET_OPERATION, EVCacheMetricsFactory.READ, EVCacheMetricsFactory.IPC_SIZE_INBOUND).record(data.length);
if (tc == null) {
if (tcService == null) {
log.error("tcService is null, will not be able to decode");
throw new RuntimeException("TranscoderSevice is null. Not able to decode");
} else {
final Transcoder<T> t = (Transcoder<T>) getTranscoder();
val = tcService.decode(t, new CachedData(flags, data, t.getMaxSize()));
}
CachedData chunk = new CachedData(flags, data, transcoder.getMaxSize());
boolean doSync = alwaysDecodeSync || (!transcoder.asyncDecode(chunk));

if (doSync) {
value = transcoder.decode(chunk);
rv.set(value, status);
} else {
if (tcService == null) {
log.error("tcService is null, will not be able to decode");
throw new RuntimeException("TranscoderSevice is null. Not able to decode");
} else {
val = tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
}
asyncDecodeIssued = true;
final Transcoder<T> wrappedTranscoder = decodeAndThen(transcoder, (decoded) -> {
value = decoded;
rv.set(decoded, status);
latch.countDown();
});
tcService.decode(wrappedTranscoder, chunk);
}
} else {
if (log.isDebugEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName)) log.debug("Key : " + key + "; val is null" );
}
}

public void receivedStatus(OperationStatus status) {
this.status = status;

// On rare occasion, it might be possible that transcoder finishes and starts to call rv.set(),
// at the exact time that receivedStatus here does a set(). This means that through unlucky timing
// here we might drop the decoded value that was set by the transcoder.
//
// We add a simple if check to see if the transcode thread has changed the value after we did rv.set(),
// and if it has, we will do it again. Since value is only set once (after decode), we need only a single
// check here. It is important that it is a separate volatile read from value.

T before = value;
rv.set(before, status);
T after = value;

if (after != before) {
rv.set(after, status);
}
}

public void complete() {
// if an async decode was never issued, issue an extra countdown, since 2 latch values were set
if (!asyncDecodeIssued) {
latch.countDown();
}

latch.countDown();

final String metricHit = (asyncDecodeIssued || value != null) ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO;
final String host = ((rv.getStatus().getStatusCode().equals(StatusCode.TIMEDOUT) && rv.getOperation() != null) ? getHostName(rv.getOperation().getHandlingNode().getSocketAddress()) : null);
getTimer(EVCacheMetricsFactory.GET_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), (val != null ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO), host, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);
getTimer(EVCacheMetricsFactory.GET_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), metricHit, host, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);
rv.signalComplete();
}
});
Expand All @@ -189,6 +243,37 @@ public void complete() {
return rv;
}

// A Transcode wrapper to allow an action to be performed after decode has completed.
static <T> Transcoder<T> decodeAndThen(Transcoder<T> transcoder, Consumer<T> completed) {
return new Transcoder<T>() {
@Override
public boolean asyncDecode(CachedData d) {
return transcoder.asyncDecode(d);
}

@Override
public CachedData encode(T o) {
throw new UnsupportedOperationException("encode");
}

@Override
public T decode(CachedData d) {
T decoded = null;
try {
decoded = transcoder.decode(d);
return decoded;
} finally {
completed.accept(decoded);
}
}

@Override
public int getMaxSize() {
return transcoder.getMaxSize();
}
};
}

public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> keys,
final Transcoder<T> tc,
EVCacheGetOperationListener<T> listener) {
Expand Down