diff --git a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java index 015590af2c..149407fa70 100644 --- a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java +++ b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java @@ -580,7 +580,7 @@ public ValueHolder getAndFault(K key) throws StoreAccessException { } @Override - public ValueHolder computeIfAbsentAndFault(K key, Function mappingFunction) throws StoreAccessException { + public ValueHolder computeIfAbsentAndFault(K key, Function mappingFunction) { return computeIfAbsent(key, mappingFunction); } diff --git a/core-spi-test/src/main/java/org/ehcache/internal/store/StoreComputeIfAbsentTest.java b/core-spi-test/src/main/java/org/ehcache/internal/store/StoreComputeIfAbsentTest.java index 3017f59bcd..c04ebd8109 100644 --- a/core-spi-test/src/main/java/org/ehcache/internal/store/StoreComputeIfAbsentTest.java +++ b/core-spi-test/src/main/java/org/ehcache/internal/store/StoreComputeIfAbsentTest.java @@ -21,6 +21,7 @@ import org.ehcache.core.spi.store.Store; import org.ehcache.expiry.ExpiryPolicy; import org.ehcache.internal.TestTimeSource; +import org.ehcache.spi.resilience.StoreAccessRuntimeException; import org.ehcache.spi.test.After; import org.ehcache.spi.test.LegalSPITesterException; import org.ehcache.spi.test.SPITest; @@ -193,14 +194,14 @@ public void testStorePassThroughException() throws Exception { K key = factory.createKey(1L); - RuntimeException exception = new RuntimeException("error"); - StorePassThroughException re = new StorePassThroughException(exception); + StoreAccessException exception = new StoreAccessException("error"); + StoreAccessRuntimeException re = new StoreAccessRuntimeException(exception); try { kvStore.computeIfAbsent(key, keyParam -> { throw re; }); - } catch (RuntimeException e) { + } catch (Exception e) { assertThat(e, is(exception)); } } diff --git a/core-spi-test/src/main/java/org/ehcache/internal/store/StoreSPITest.java b/core-spi-test/src/main/java/org/ehcache/internal/store/StoreSPITest.java index ed60355ef7..84dc8005a4 100644 --- a/core-spi-test/src/main/java/org/ehcache/internal/store/StoreSPITest.java +++ b/core-spi-test/src/main/java/org/ehcache/internal/store/StoreSPITest.java @@ -16,6 +16,7 @@ package org.ehcache.internal.store; +import org.junit.Ignore; import org.junit.Test; /** diff --git a/ehcache-api/src/main/java/org/ehcache/spi/resilience/StoreAccessRuntimeException.java b/ehcache-api/src/main/java/org/ehcache/spi/resilience/StoreAccessRuntimeException.java new file mode 100644 index 0000000000..684e276a14 --- /dev/null +++ b/ehcache-api/src/main/java/org/ehcache/spi/resilience/StoreAccessRuntimeException.java @@ -0,0 +1,62 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.ehcache.spi.resilience; + +import java.util.concurrent.CompletionException; + +/** + * A wrapper Runtime exception used when don't want to handle the checkedException an internal operation fails on a {@link org.ehcache.Cache}. + * + * @author nnares + */ +public class StoreAccessRuntimeException extends RuntimeException { + + private static final long serialVersionUID = 6249505400891654776L; + + /** + * Creates a new exception wrapping the {@link Throwable cause} passed in. + * + * @param cause the cause of this exception + */ + public StoreAccessRuntimeException(StoreAccessException cause) { + super(cause); + } + + @Override + public StoreAccessException getCause() { + return (StoreAccessException) super.getCause(); + } + + /** + * Wrapped the received {@link java.lang.RuntimeException} to {@link org.ehcache.spi.resilience.StoreAccessException}, + * so that received {@link java.lang.RuntimeException} can reach {@link org.ehcache.spi.resilience.ResilienceStrategy} + * + * @param re a {@link java.lang.RuntimeException} that is being handled + * @return {@link org.ehcache.spi.resilience.StoreAccessException} a type in which wrapping the received {@link java.lang.RuntimeException} + */ + public static StoreAccessException handleRuntimeException(RuntimeException re) { + + if (re instanceof StoreAccessRuntimeException || re instanceof CompletionException) { + Throwable cause = re.getCause(); + if (cause instanceof RuntimeException) { + return new StoreAccessException(cause); + } + } + return new StoreAccessException(re); + } + +} diff --git a/ehcache-core/src/main/java/org/ehcache/core/exceptions/StorePassThroughException.java b/ehcache-core/src/main/java/org/ehcache/core/exceptions/StorePassThroughException.java index 62593508b0..89367e23bc 100644 --- a/ehcache-core/src/main/java/org/ehcache/core/exceptions/StorePassThroughException.java +++ b/ehcache-core/src/main/java/org/ehcache/core/exceptions/StorePassThroughException.java @@ -18,6 +18,7 @@ import org.ehcache.javadoc.PublicApi; import org.ehcache.spi.resilience.StoreAccessException; +import org.ehcache.spi.resilience.StoreAccessRuntimeException; /** * A generic wrapper runtime exception that will not be caught and @@ -69,13 +70,15 @@ public synchronized Throwable fillInStackTrace() { * @throws RuntimeException if {@code re} is a {@code StorePassThroughException} containing a {@code RuntimeException} */ public static StoreAccessException handleException(Exception re) { - if(re instanceof StorePassThroughException) { + if (re instanceof StorePassThroughException) { Throwable cause = re.getCause(); - if(cause instanceof RuntimeException) { + if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { return new StoreAccessException(cause); } + } else if (re instanceof StoreAccessRuntimeException) { + return (StoreAccessException) re.getCause(); } else { return new StoreAccessException(re); } diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/heap/OnHeapStore.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/heap/OnHeapStore.java index 1bfee26159..6f48dc0f8c 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/heap/OnHeapStore.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/heap/OnHeapStore.java @@ -52,7 +52,7 @@ import org.ehcache.core.spi.time.TimeSourceService; import org.ehcache.impl.store.HashUtils; import org.ehcache.impl.serialization.TransientStateRepository; -import org.ehcache.sizeof.annotations.IgnoreSizeOf; +import org.ehcache.spi.resilience.StoreAccessRuntimeException; import org.ehcache.spi.serialization.Serializer; import org.ehcache.spi.serialization.StatefulSerializer; import org.ehcache.core.spi.store.Store; @@ -89,6 +89,7 @@ import java.util.Objects; import java.util.Random; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -99,6 +100,7 @@ import static org.ehcache.config.Eviction.noAdvice; import static org.ehcache.core.config.ExpiryUtils.isExpiryDurationInfinite; import static org.ehcache.core.exceptions.StorePassThroughException.handleException; +import static org.ehcache.spi.resilience.StoreAccessRuntimeException.handleRuntimeException; /** * {@link Store} and {@link HigherCachingTier} implementation for on heap. @@ -701,11 +703,13 @@ public ValueHolder getOrComputeIfAbsent(K key, Function> so long now = timeSource.getTimeMillis(); if (cachedValue == null) { - Fault fault = new Fault<>(() -> source.apply(key)); + Fault fault = new Fault<>(); cachedValue = backEnd.putIfAbsent(key, fault); if (cachedValue == null) { - return resolveFault(key, backEnd, now, fault); + ValueHolder valueHolder = resolveFault(key, backEnd, now, fault, source); + fault.complete(valueHolder); + return valueHolder; } } @@ -716,11 +720,13 @@ public ValueHolder getOrComputeIfAbsent(K key, Function> so expireMappingUnderLock(key, cachedValue); // On expiration, we might still be able to get a value from the fault. For instance, when a load-writer is used - Fault fault = new Fault<>(() -> source.apply(key)); + Fault fault = new Fault<>(); cachedValue = backEnd.putIfAbsent(key, fault); if (cachedValue == null) { - return resolveFault(key, backEnd, now, fault); + ValueHolder valueHolder = resolveFault(key, backEnd, now, fault, source); + fault.complete(valueHolder); + return valueHolder; } } else { @@ -733,7 +739,7 @@ public ValueHolder getOrComputeIfAbsent(K key, Function> so // Return the value that we found in the cache (by getting the fault or just returning the plain value depending on what we found) return getValue(cachedValue); } catch (RuntimeException re) { - throw handleException(re); + throw handleRuntimeException(re); } } @@ -764,9 +770,9 @@ public ValueHolder getOrDefault(K key, Function> source) th } } - private ValueHolder resolveFault(K key, Backend backEnd, long now, Fault fault) throws StoreAccessException { + private ValueHolder resolveFault(K key, Backend backEnd, long now, Fault fault, Function> source) throws StoreAccessException { try { - ValueHolder value = fault.getValueHolder(); + ValueHolder value = source.apply(key); OnHeapValueHolder newValue; if(value != null) { newValue = importValueFromLowerTier(key, value, now, backEnd, fault); @@ -810,8 +816,12 @@ private ValueHolder resolveFault(K key, Backend backEnd, long now, Faul getOrComputeIfAbsentObserver.end(CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome.FAULT_FAILED); return newValue; - + } catch (StoreAccessRuntimeException e) { + fault.fail(e); + backEnd.remove(key, fault); + throw e.getCause(); } catch (Throwable e) { + fault.fail(e); backEnd.remove(key, fault); throw new StoreAccessException(e); } @@ -992,37 +1002,31 @@ private static class Fault extends OnHeapValueHolder { private static final int FAULT_ID = -1; - @IgnoreSizeOf - private final Supplier> source; - private ValueHolder value; - private Throwable throwable; - private boolean complete; + /** + * valueFuture of type {@link java.util.concurrent.CompletableFuture} to ensure consistent state in concurrent usage + */ + private CompletableFuture> valueFuture; - public Fault(Supplier> source) { + public Fault() { super(FAULT_ID, 0, true); - this.source = source; + this.valueFuture = new CompletableFuture<>(); } + /** + * mark the process for fault object as completed in concurrent usage + */ private void complete(ValueHolder value) { - synchronized (this) { - this.value = value; - this.complete = true; - notifyAll(); - } + valueFuture.complete(value); } + /** + * method to get the {@link org.ehcache.core.spi.store.Store.ValueHolder} + * Block the thread coming to get the value in concurrent usage + * + * @return {@link org.ehcache.core.spi.store.Store.ValueHolder} + */ private ValueHolder getValueHolder() { - synchronized (this) { - if (!complete) { - try { - complete(source.get()); - } catch (Throwable e) { - fail(e); - } - } - } - - return throwOrReturn(); + return valueFuture.join(); } @Override @@ -1030,23 +1034,11 @@ public long getId() { throw new UnsupportedOperationException("You should NOT call that?!"); } - private ValueHolder throwOrReturn() { - if (throwable != null) { - if (throwable instanceof RuntimeException) { - throw (RuntimeException) throwable; - } - throw new RuntimeException("Faulting from repository failed", throwable); - } - return value; - } - + /** + * method to mark the Fault object process is failed + */ private void fail(Throwable t) { - synchronized (this) { - this.throwable = t; - this.complete = true; - notifyAll(); - } - throwOrReturn(); + valueFuture.completeExceptionally(t); } @Override @@ -1101,7 +1093,19 @@ public long size() { @Override public String toString() { - return "[Fault : " + (complete ? (throwable == null ? String.valueOf(value) : throwable.getMessage()) : "???") + "]"; + String valueOrException; + if (valueFuture.isDone()) { + if (valueFuture.isCancelled()) { + valueOrException = "Cancel"; + } else if (valueFuture.isCompletedExceptionally()) { + valueOrException = "Exception"; + } else { + valueOrException = "Complete"; + } + } else { + valueOrException = "InComplete"; + } + return "[Fault : " + valueOrException + "]"; } @Override diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/tiering/TieredStore.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/tiering/TieredStore.java index c2b3c0bbe2..2b4aa786fc 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/tiering/TieredStore.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/tiering/TieredStore.java @@ -27,6 +27,7 @@ import org.ehcache.core.spi.store.events.StoreEventSource; import org.ehcache.core.spi.store.tiering.AuthoritativeTier; import org.ehcache.core.spi.store.tiering.CachingTier; +import org.ehcache.spi.resilience.StoreAccessRuntimeException; import org.ehcache.spi.service.OptionalServiceDependencies; import org.ehcache.spi.service.Service; import org.ehcache.spi.service.ServiceConfiguration; @@ -49,6 +50,8 @@ import java.util.function.Function; import java.util.function.Supplier; +import static org.ehcache.spi.resilience.StoreAccessRuntimeException.handleRuntimeException; + /** * A {@link Store} implementation supporting a tiered caching model. */ @@ -84,17 +87,13 @@ public void invalidateAllWithHash(long hash) throws StoreAccessException { @Override public ValueHolder get(final K key) throws StoreAccessException { - try { - return cachingTier().getOrComputeIfAbsent(key, keyParam -> { - try { - return authoritativeTier.getAndFault(keyParam); - } catch (StoreAccessException cae) { - throw new StorePassThroughException(cae); - } - }); - } catch (StoreAccessException ce) { - return handleStoreAccessException(ce); - } + return cachingTier().getOrComputeIfAbsent(key, keyParam -> { + try { + return authoritativeTier.getAndFault(keyParam); + } catch (StoreAccessException cae) { + throw new StorePassThroughException(cae); + } + }); } @Override @@ -320,17 +319,13 @@ public ValueHolder computeAndGet(final K key, final BiFunction computeIfAbsent(final K key, final Function mappingFunction) throws StoreAccessException { - try { - return cachingTier().getOrComputeIfAbsent(key, keyParam -> { - try { - return authoritativeTier.computeIfAbsentAndFault(keyParam, mappingFunction); - } catch (StoreAccessException cae) { - throw new StorePassThroughException(cae); - } - }); - } catch (StoreAccessException ce) { - return handleStoreAccessException(ce); - } + return cachingTier().getOrComputeIfAbsent(key, keyParam -> { + try { + return authoritativeTier.computeIfAbsentAndFault(keyParam, mappingFunction); + } catch (StoreAccessException cae) { + throw new StoreAccessRuntimeException(cae); + } + }); } @Override @@ -379,18 +374,20 @@ private CachingTier cachingTier() { return cachingTierRef.get(); } + /** + * Handling the received {@link org.ehcache.spi.resilience.StoreAccessException} + * + * @param ce a {@link org.ehcache.spi.resilience.StoreAccessException} that is being handled + * @return {@link org.ehcache.core.spi.store.Store.ValueHolder} + */ private ValueHolder handleStoreAccessException(StoreAccessException ce) throws StoreAccessException { Throwable cause = ce.getCause(); if (cause instanceof StorePassThroughException) { throw (StoreAccessException) cause.getCause(); - } - if (cause instanceof Error) { + }if (cause instanceof Error) { throw (Error) cause; } - if (cause instanceof RuntimeException) { - throw (RuntimeException) cause; - } - throw new RuntimeException("Unexpected checked exception wrapped in StoreAccessException", cause); + throw ce; } @ServiceDependencies({CachingTier.Provider.class, AuthoritativeTier.Provider.class}) diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/internal/store/tiering/TieredStoreTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/internal/store/tiering/TieredStoreTest.java index 151d5d72d4..5fcfebe8ef 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/internal/store/tiering/TieredStoreTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/internal/store/tiering/TieredStoreTest.java @@ -32,6 +32,8 @@ import org.ehcache.core.spi.store.tiering.CachingTier; import org.ehcache.impl.internal.store.heap.OnHeapStore; import org.ehcache.impl.internal.store.offheap.OffHeapStore; +import org.ehcache.spi.resilience.StoreAccessRuntimeException; +import org.ehcache.spi.serialization.SerializerException; import org.ehcache.spi.service.Service; import org.ehcache.spi.service.ServiceProvider; import org.hamcrest.Matchers; @@ -151,8 +153,26 @@ public void testGetThrowsRuntimeException() throws Exception { try { tieredStore.get(1); fail("We should get an Error"); - } catch (RuntimeException e) { - assertSame(error, e); + } catch (Exception e) { + assertSame(error, e.getCause()); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testGetThrowsSerializerException() throws Exception { + SerializerException serializerException = new SerializerException(); + StoreAccessException error = new StoreAccessException("SerializerException wrapped in StoreAccessException", serializerException); + when(numberCachingTier.getOrComputeIfAbsent(any(Number.class), any(Function.class))).thenThrow(error); + + TieredStore tieredStore = new TieredStore<>(numberCachingTier, numberAuthoritativeTier); + + try { + tieredStore.get(1); + fail("We should get an Error"); + } catch (StoreAccessException e) { + assertSame(serializerException, e.getCause()); + assertEquals("SerializerException wrapped in StoreAccessException", e.getMessage()); } } @@ -167,41 +187,40 @@ public void testGetThrowsError() throws Exception { try { tieredStore.get(1); fail("We should get an Error"); - } catch (Error e) { - assertSame(error, e); + } catch (Exception e) { + assertSame(error, e.getCause()); } } @Test @SuppressWarnings("unchecked") public void testGetThrowsException() throws Exception { - Exception error = new Exception(); - when(numberCachingTier.getOrComputeIfAbsent(any(Number.class), any(Function.class))).thenThrow(new StoreAccessException(error)); + StoreAccessException error = new StoreAccessException("Error"); + when(numberCachingTier.getOrComputeIfAbsent(any(Number.class), any(Function.class))).thenThrow(new StoreAccessRuntimeException(error)); TieredStore tieredStore = new TieredStore<>(numberCachingTier, numberAuthoritativeTier); try { tieredStore.get(1); fail("We should get an Error"); - } catch (RuntimeException e) { + } catch (Exception e) { assertSame(error, e.getCause()); - assertEquals("Unexpected checked exception wrapped in StoreAccessException", e.getMessage()); } } @Test @SuppressWarnings("unchecked") - public void testGetThrowsPassthrough() throws Exception { + public void testGetThrowsStoreAccessRuntime() throws Exception { StoreAccessException error = new StoreAccessException("inner"); - when(numberCachingTier.getOrComputeIfAbsent(any(Number.class), any(Function.class))).thenThrow(new StoreAccessException(new StorePassThroughException(error))); + when(numberCachingTier.getOrComputeIfAbsent(any(Number.class), any(Function.class))).thenThrow(new StoreAccessRuntimeException(error)); TieredStore tieredStore = new TieredStore<>(numberCachingTier, numberAuthoritativeTier); try { tieredStore.get(1); fail("We should get an Error"); - } catch (StoreAccessException e) { - assertSame(error, e); + } catch (StoreAccessRuntimeException e) { + assertSame(error, e.getCause()); } } @@ -412,7 +431,24 @@ public void testComputeIfAbsentThrowsError() throws Exception { try { tieredStore.computeIfAbsent(1, n -> null); fail("We should get an Error"); - } catch (Error e) { + } catch (Exception e) { + assertSame(error, e.getCause()); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testComputeIfAbsentThrowsRuntimeException() throws Exception { + + RuntimeException error = new RuntimeException(); + when(numberCachingTier.getOrComputeIfAbsent(any(Number.class), any(Function.class))).thenThrow(error); + + TieredStore tieredStore = new TieredStore<>(numberCachingTier, numberAuthoritativeTier); + + try { + tieredStore.computeIfAbsent(1, n -> null); + fail("We should get an Error"); + } catch (RuntimeException e) { assertSame(error, e); } }