Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: AbstractCacheManager destroy Framework's executorService when shutdown #14091

Open
wants to merge 14 commits into
base: 3.2
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.resource.Disposable;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.common.utils.LRUCache;
import org.apache.dubbo.common.utils.NamedThreadFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_SERVER_SHUTDOWN_TIMEOUT;
Expand All @@ -40,10 +45,9 @@
public abstract class AbstractCacheManager<V> implements Disposable {
protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass());

private ScheduledExecutorService executorService;

protected FileCacheStore cacheStore;
protected LRUCache<String, V> cache;
private List<Disposable> disposableResources = new ArrayList<>();

protected void init(
boolean enableFileCache,
Expand All @@ -54,9 +58,12 @@ protected void init(
int interval,
ScheduledExecutorService executorService) {
this.cache = new LRUCache<>(entrySize);
registerDisposable(() -> this.cache.clear());

try {
cacheStore = FileCacheStoreFactory.getInstance(filePath, fileName, enableFileCache);
registerDisposable(() -> cacheStore.destroy());

Map<String, String> properties = cacheStore.loadCache(entrySize);
if (logger.isDebugEnabled()) {
logger.debug("Successfully loaded " + getName() + " cache from file " + fileName + ", entries "
Expand All @@ -69,22 +76,45 @@ protected void init(
}
// executorService can be empty if FileCacheStore fails
if (executorService == null) {
this.executorService = Executors.newSingleThreadScheduledExecutor(
executorService = Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory("Dubbo-cache-refreshing-scheduler", true));
} else {
this.executorService = executorService;
registerDisposable(newExecutorDisposer(executorService));
}

this.executorService.scheduleWithFixedDelay(
final ScheduledFuture<?> newFuture = executorService.scheduleWithFixedDelay(
new CacheRefreshTask<>(this.cacheStore, this.cache, this, fileSize),
10,
interval,
TimeUnit.MINUTES);

registerDisposable(() -> newFuture.cancel(true));

} catch (Exception e) {
logger.error(COMMON_FAILED_LOAD_MAPPING_CACHE, "", "", "Load mapping from local cache file error ", e);
}
}

protected void registerDisposable(Disposable resource) {
this.disposableResources.add(resource);
}

private Disposable newExecutorDisposer(final ExecutorService executor) {
Assert.notNull(executor, "ExecutorService can not be null");
return () -> {
// Try to destroy self-created executorService instance.
executor.shutdownNow();
try {
if (!executor.awaitTermination(
ConfigurationUtils.reCalShutdownTime(DEFAULT_SERVER_SHUTDOWN_TIMEOUT), TimeUnit.MILLISECONDS)) {
logger.warn(
COMMON_UNEXPECTED_EXCEPTION, "", "", "Wait global executor service terminated timeout.");
}
} catch (InterruptedException e) {
logger.warn(COMMON_UNEXPECTED_EXCEPTION, "", "", "destroy resources failed: " + e.getMessage(), e);
}
};
}

protected abstract V toValueType(String value);

protected abstract String getName();
Expand Down Expand Up @@ -124,25 +154,15 @@ public void update(Map<String, V> newCache) {
}
}

@Override
public void destroy() {
if (executorService != null) {
executorService.shutdownNow();
try {
if (!executorService.awaitTermination(
ConfigurationUtils.reCalShutdownTime(DEFAULT_SERVER_SHUTDOWN_TIMEOUT), TimeUnit.MILLISECONDS)) {
logger.warn(
COMMON_UNEXPECTED_EXCEPTION, "", "", "Wait global executor service terminated timeout.");
}
} catch (InterruptedException e) {
logger.warn(COMMON_UNEXPECTED_EXCEPTION, "", "", "destroy resources failed: " + e.getMessage(), e);
}
}
if (cacheStore != null) {
cacheStore.destroy();
}
if (cache != null) {
cache.clear();
// destroy in FILO order.
Disposable[] elements = this.disposableResources.toArray(new Disposable[0]);

for (int i = elements.length - 1; i >= 0; i--) {
elements[i].destroy();
}
this.disposableResources.clear();
}

public static class CacheRefreshTask<V> implements Runnable {
Expand Down
Loading