Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-2887 : Auto clear invaild cache entry #3041

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ public ValueHolder<V> getAndFault(K key) throws StoreAccessException {
}

@Override
public ValueHolder<V> computeIfAbsentAndFault(K key, Function<? super K, ? extends V> mappingFunction) throws StoreAccessException {
public ValueHolder<V> computeIfAbsentAndFault(K key, Function<? super K, ? extends V> mappingFunction) {
return computeIfAbsent(key, mappingFunction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.ehcache.core.spi.store.Store;
import org.ehcache.expiry.ExpiryPolicy;
import org.ehcache.internal.TestTimeSource;
import org.ehcache.spi.resilience.StoreAccessRuntimeException;
import org.ehcache.spi.test.After;
import org.ehcache.spi.test.LegalSPITesterException;
import org.ehcache.spi.test.SPITest;
Expand Down Expand Up @@ -193,14 +194,14 @@ public void testStorePassThroughException() throws Exception {

K key = factory.createKey(1L);

RuntimeException exception = new RuntimeException("error");
StorePassThroughException re = new StorePassThroughException(exception);
StoreAccessException exception = new StoreAccessException("error");
StoreAccessRuntimeException re = new StoreAccessRuntimeException(exception);

try {
kvStore.computeIfAbsent(key, keyParam -> {
throw re;
});
} catch (RuntimeException e) {
} catch (Exception e) {
assertThat(e, is(exception));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.ehcache.internal.store;

import org.junit.Ignore;
import org.junit.Test;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ehcache.spi.resilience;

import java.util.concurrent.CompletionException;

/**
* A wrapper Runtime exception used when don't want to handle the checkedException an internal operation fails on a {@link org.ehcache.Cache}.
*
* @author nnares
*/
public class StoreAccessRuntimeException extends RuntimeException {

private static final long serialVersionUID = 6249505400891654776L;

/**
* Creates a new exception wrapping the {@link Throwable cause} passed in.
*
* @param cause the cause of this exception
*/
public StoreAccessRuntimeException(StoreAccessException cause) {
super(cause);
}

@Override
public StoreAccessException getCause() {
return (StoreAccessException) super.getCause();
}

/**
* Wrapped the received {@link java.lang.RuntimeException} to {@link org.ehcache.spi.resilience.StoreAccessException},
* so that received {@link java.lang.RuntimeException} can reach {@link org.ehcache.spi.resilience.ResilienceStrategy}
*
* @param re a {@link java.lang.RuntimeException} that is being handled
* @return {@link org.ehcache.spi.resilience.StoreAccessException} a type in which wrapping the received {@link java.lang.RuntimeException}
*/
public static StoreAccessException handleRuntimeException(RuntimeException re) {

if (re instanceof StoreAccessRuntimeException || re instanceof CompletionException) {
Throwable cause = re.getCause();
if (cause instanceof RuntimeException) {
return new StoreAccessException(cause);
}
}
return new StoreAccessException(re);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.ehcache.javadoc.PublicApi;
import org.ehcache.spi.resilience.StoreAccessException;
import org.ehcache.spi.resilience.StoreAccessRuntimeException;

/**
* A generic wrapper runtime exception that will not be caught and
Expand Down Expand Up @@ -69,13 +70,15 @@ public synchronized Throwable fillInStackTrace() {
* @throws RuntimeException if {@code re} is a {@code StorePassThroughException} containing a {@code RuntimeException}
*/
public static StoreAccessException handleException(Exception re) {
if(re instanceof StorePassThroughException) {
if (re instanceof StorePassThroughException) {
Throwable cause = re.getCause();
if(cause instanceof RuntimeException) {
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
return new StoreAccessException(cause);
}
} else if (re instanceof StoreAccessRuntimeException) {
return (StoreAccessException) re.getCause();
} else {
return new StoreAccessException(re);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.ehcache.core.spi.time.TimeSourceService;
import org.ehcache.impl.store.HashUtils;
import org.ehcache.impl.serialization.TransientStateRepository;
import org.ehcache.sizeof.annotations.IgnoreSizeOf;
import org.ehcache.spi.resilience.StoreAccessRuntimeException;
import org.ehcache.spi.serialization.Serializer;
import org.ehcache.spi.serialization.StatefulSerializer;
import org.ehcache.core.spi.store.Store;
Expand Down Expand Up @@ -89,6 +89,7 @@
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
Expand All @@ -99,6 +100,7 @@
import static org.ehcache.config.Eviction.noAdvice;
import static org.ehcache.core.config.ExpiryUtils.isExpiryDurationInfinite;
import static org.ehcache.core.exceptions.StorePassThroughException.handleException;
import static org.ehcache.spi.resilience.StoreAccessRuntimeException.handleRuntimeException;

/**
* {@link Store} and {@link HigherCachingTier} implementation for on heap.
Expand Down Expand Up @@ -701,11 +703,13 @@ public ValueHolder<V> getOrComputeIfAbsent(K key, Function<K, ValueHolder<V>> so

long now = timeSource.getTimeMillis();
if (cachedValue == null) {
Fault<V> fault = new Fault<>(() -> source.apply(key));
Fault<V> fault = new Fault<>();
cachedValue = backEnd.putIfAbsent(key, fault);

if (cachedValue == null) {
return resolveFault(key, backEnd, now, fault);
ValueHolder<V> valueHolder = resolveFault(key, backEnd, now, fault, source);
fault.complete(valueHolder);
return valueHolder;
}
}

Expand All @@ -716,11 +720,13 @@ public ValueHolder<V> getOrComputeIfAbsent(K key, Function<K, ValueHolder<V>> so
expireMappingUnderLock(key, cachedValue);

// On expiration, we might still be able to get a value from the fault. For instance, when a load-writer is used
Fault<V> fault = new Fault<>(() -> source.apply(key));
Fault<V> fault = new Fault<>();
cachedValue = backEnd.putIfAbsent(key, fault);

if (cachedValue == null) {
return resolveFault(key, backEnd, now, fault);
ValueHolder<V> valueHolder = resolveFault(key, backEnd, now, fault, source);
fault.complete(valueHolder);
return valueHolder;
}
}
else {
Expand All @@ -733,7 +739,7 @@ public ValueHolder<V> getOrComputeIfAbsent(K key, Function<K, ValueHolder<V>> so
// Return the value that we found in the cache (by getting the fault or just returning the plain value depending on what we found)
return getValue(cachedValue);
} catch (RuntimeException re) {
throw handleException(re);
throw handleRuntimeException(re);
}
}

Expand Down Expand Up @@ -764,9 +770,9 @@ public ValueHolder<V> getOrDefault(K key, Function<K, ValueHolder<V>> source) th
}
}

private ValueHolder<V> resolveFault(K key, Backend<K, V> backEnd, long now, Fault<V> fault) throws StoreAccessException {
private ValueHolder<V> resolveFault(K key, Backend<K, V> backEnd, long now, Fault<V> fault, Function<K, ValueHolder<V>> source) throws StoreAccessException {
try {
ValueHolder<V> value = fault.getValueHolder();
ValueHolder<V> value = source.apply(key);
OnHeapValueHolder<V> newValue;
if(value != null) {
newValue = importValueFromLowerTier(key, value, now, backEnd, fault);
Expand Down Expand Up @@ -810,8 +816,12 @@ private ValueHolder<V> resolveFault(K key, Backend<K, V> backEnd, long now, Faul

getOrComputeIfAbsentObserver.end(CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome.FAULT_FAILED);
return newValue;

} catch (StoreAccessRuntimeException e) {
fault.fail(e);
backEnd.remove(key, fault);
throw e.getCause();
} catch (Throwable e) {
fault.fail(e);
backEnd.remove(key, fault);
throw new StoreAccessException(e);
}
Expand Down Expand Up @@ -992,61 +1002,43 @@ private static class Fault<V> extends OnHeapValueHolder<V> {

private static final int FAULT_ID = -1;

@IgnoreSizeOf
private final Supplier<ValueHolder<V>> source;
private ValueHolder<V> value;
private Throwable throwable;
private boolean complete;
/**
* valueFuture of type {@link java.util.concurrent.CompletableFuture} to ensure consistent state in concurrent usage
*/
private CompletableFuture<ValueHolder<V>> valueFuture;

public Fault(Supplier<ValueHolder<V>> source) {
public Fault() {
super(FAULT_ID, 0, true);
this.source = source;
this.valueFuture = new CompletableFuture<>();
}

/**
* mark the process for fault object as completed in concurrent usage
*/
private void complete(ValueHolder<V> value) {
synchronized (this) {
this.value = value;
this.complete = true;
notifyAll();
}
valueFuture.complete(value);
}

/**
* method to get the {@link org.ehcache.core.spi.store.Store.ValueHolder}
* Block the thread coming to get the value in concurrent usage
*
* @return {@link org.ehcache.core.spi.store.Store.ValueHolder}
*/
private ValueHolder<V> getValueHolder() {
synchronized (this) {
if (!complete) {
try {
complete(source.get());
} catch (Throwable e) {
fail(e);
}
}
}

return throwOrReturn();
return valueFuture.join();
}

@Override
public long getId() {
throw new UnsupportedOperationException("You should NOT call that?!");
}

private ValueHolder<V> throwOrReturn() {
if (throwable != null) {
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
}
throw new RuntimeException("Faulting from repository failed", throwable);
}
return value;
}

/**
* method to mark the Fault object process is failed
*/
private void fail(Throwable t) {
synchronized (this) {
this.throwable = t;
this.complete = true;
notifyAll();
}
throwOrReturn();
valueFuture.completeExceptionally(t);
}

@Override
Expand Down Expand Up @@ -1101,7 +1093,19 @@ public long size() {

@Override
public String toString() {
return "[Fault : " + (complete ? (throwable == null ? String.valueOf(value) : throwable.getMessage()) : "???") + "]";
String valueOrException;
if (valueFuture.isDone()) {
if (valueFuture.isCancelled()) {
valueOrException = "Cancel";
} else if (valueFuture.isCompletedExceptionally()) {
valueOrException = "Exception";
} else {
valueOrException = "Complete";
}
} else {
valueOrException = "InComplete";
}
return "[Fault : " + valueOrException + "]";
}

@Override
Expand Down
Loading