Skip to content

Commit

Permalink
optimise auto refresh.
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Jan 11, 2018
1 parent 4dce304 commit efb3d2c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 44 deletions.
69 changes: 34 additions & 35 deletions jetcache-core/src/main/java/com/alicp/jetcache/RefreshCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class RefreshCache<K, V> extends LoadingCache<K, V> {
try {
tryLockAndRunMethod = Cache.class.getMethod("tryLockAndRun",
Object.class, long.class, TimeUnit.class, Runnable.class);
getMethod = Cache.class.getMethod("get", Object.class);
getMethod = Cache.class.getMethod("GET", Object.class);
putMethod = Cache.class.getMethod("put", Object.class, Object.class);
} catch (NoSuchMethodException e) {
throw new CacheException(e);
Expand Down Expand Up @@ -143,18 +143,37 @@ private void cancel() {
taskMap.remove(taskId);
}

private void load() {
try {
if (logger.isDebugEnabled()) {
logger.debug("refresh {}", key);
private void load() throws Throwable {
logger.debug("refresh {}", key);
CacheLoader<K, V> loader = config.getLoader();
loader = CacheUtil.createProxyLoader(cache, loader, eventConsumer);
V v = loader.load(key);
cache.PUT(key, v);
}

private void externalLoad(final Cache concreteCache, final long currentTime)
throws Throwable {
byte[] newKey = ((AbstractExternalCache) concreteCache).buildKey(key);
byte[] lockKey = combine(newKey, "_#RL#".getBytes());
long loadTimeOut = RefreshCache.this.config.getRefreshPolicy().getRefreshLockTimeoutMillis();
Runnable r = () -> {
try {
long refreshMillis = config.getRefreshPolicy().getRefreshMillis();
byte[] timestampKey = combine(newKey, "_#TS#".getBytes());
// AbstractExternalCache buildKey method will not convert byte[]
CacheGetResult refreshTimeResult = (CacheGetResult) getMethod.invoke(concreteCache, timestampKey);
if (refreshTimeResult.isSuccess() && (currentTime < Long.parseLong(refreshTimeResult.getValue().toString()) + refreshMillis)) {
return;
}
load();
// AbstractExternalCache buildKey method will not convert byte[]
putMethod.invoke(concreteCache, timestampKey, String.valueOf(System.currentTimeMillis()));
} catch (Throwable e){
throw new CacheException("refresh error", e);
}
CacheLoader<K, V> loader = config.getLoader();
loader = CacheUtil.createProxyLoader(cache, loader, eventConsumer);
V v = loader.load(key);
cache.PUT(key, v);
} catch (Throwable e) {
throw new CacheInvokeException(e);
}
};
// AbstractExternalCache buildKey method will not convert byte[]
tryLockAndRunMethod.invoke(concreteCache, lockKey, loadTimeOut, TimeUnit.MILLISECONDS, r);
}

@Override
Expand All @@ -168,39 +187,19 @@ public void run() {
long stopRefreshAfterLastAccessMillis = config.getRefreshPolicy().getStopRefreshAfterLastAccessMillis();
if (stopRefreshAfterLastAccessMillis > 0) {
if (lastAccessTime + stopRefreshAfterLastAccessMillis < now) {
if (logger.isDebugEnabled()) {
logger.debug("cancel refresh: {}", key);
}
logger.debug("cancel refresh: {}", key);
cancel();
return;
}
}
Cache concreteCache = concreteCache();
if (concreteCache instanceof AbstractExternalCache) {
byte[] newKey = ((AbstractExternalCache) concreteCache).buildKey(key);
long refreshMillis = config.getRefreshPolicy().getRefreshMillis();
byte[] timestampKey = combine(newKey, "_#TS#".getBytes());
// AbstractExternalCache buildKey method will not convert byte[]
String refreshTime = (String) getMethod.invoke(concreteCache, timestampKey);
if (refreshTime != null && (now < Long.parseLong(refreshTime) + refreshMillis)) {
return;
}

byte[] lockKey = combine(newKey, "_#RL#".getBytes());
long loadTimeOut = RefreshCache.this.config.getRefreshPolicy().getRefreshLockTimeoutMillis();
Runnable r = this::load;
// AbstractExternalCache buildKey method will not convert byte[]
tryLockAndRunMethod.invoke(concreteCache, lockKey, loadTimeOut, TimeUnit.MILLISECONDS, r);

// AbstractExternalCache buildKey method will not convert byte[]
putMethod.invoke(concreteCache, timestampKey, String.valueOf(System.currentTimeMillis()));
externalLoad(concreteCache, now);
} else {
load();
}
} catch (InvocationTargetException e) {
logger.error("load key error: key=" + key, e.getTargetException());
} catch (Throwable e) {
logger.error("load key error: key=" + key, e);
logger.error("refresh error: key=" + key, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -153,14 +156,12 @@ private static void refreshCacheTest2(Cache cache) throws Exception {
long refreshMillis = cache.config().getRefreshPolicy().getRefreshMillis();
long stopRefresh = cache.config().getRefreshPolicy().getStopRefreshAfterLastAccessMillis();

Assert.assertEquals("refreshCacheTest2_K1_V0", cache.get("refreshCacheTest2_K1"));
Set s = new HashSet();
s.add("refreshCacheTest2_K1");
s.add("refreshCacheTest2_K2");
Map values = cache.getAll(s);
long key1StartRefreshTime = System.currentTimeMillis();
Assert.assertEquals(1, monitor.getCacheStat().getGetCount());
Assert.assertEquals(0, monitor.getCacheStat().getGetHitCount());
Assert.assertEquals(1, monitor.getCacheStat().getGetMissCount());
Assert.assertEquals(1, monitor.getCacheStat().getLoadCount());
Assert.assertEquals(1, monitor.getCacheStat().getPutCount());
Assert.assertEquals("refreshCacheTest2_K2_V1", cache.get("refreshCacheTest2_K2"));

Assert.assertEquals(2, monitor.getCacheStat().getGetCount());
Assert.assertEquals(0, monitor.getCacheStat().getGetHitCount());
Assert.assertEquals(2, monitor.getCacheStat().getGetMissCount());
Expand All @@ -180,10 +181,10 @@ private static void refreshCacheTest2(Cache cache) throws Exception {
cache.config().setRefreshPolicy(null);//stop refresh

Assert.assertEquals(3, monitor.getCacheStat().getLoadCount());
Assert.assertNotEquals("refreshCacheTest2_K1_V0", cache.get("refreshCacheTest2_K1"));
Assert.assertNotEquals(values.get("refreshCacheTest2_K1"), cache.get("refreshCacheTest2_K1"));
Assert.assertEquals(3, monitor.getCacheStat().getLoadCount());
// refresh task stopped, but K/V is not expires
Assert.assertEquals("refreshCacheTest2_K2_V1", cache.get("refreshCacheTest2_K2"));
Assert.assertEquals(values.get("refreshCacheTest2_K2"), cache.get("refreshCacheTest2_K2"));
Assert.assertEquals(3, monitor.getCacheStat().getLoadCount());

cache.config().getMonitors().remove(monitor);
Expand Down

0 comments on commit efb3d2c

Please sign in to comment.