diff --git a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java index 1fbac959f5..5fecd5c41e 100644 --- a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java +++ b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java @@ -38,7 +38,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.core.annotation.Order; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -46,6 +45,7 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; @@ -164,48 +164,63 @@ private ChangeParameterNotifyRequest buildChangeRequest(ExecutorProperties befor * @param executorProperties */ private void checkNotifyConsistencyAndReplace(ExecutorProperties executorProperties) { - boolean checkNotifyConfig = false; - boolean checkNotifyAlarm = false; Map> newDynamicThreadPoolNotifyMap = configModeNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties); Map> notifyConfigs = threadPoolBaseSendMessageService.getNotifyConfigs(); - if (CollectionUtil.isNotEmpty(notifyConfigs)) { - for (Map.Entry> each : newDynamicThreadPoolNotifyMap.entrySet()) { - if (checkNotifyConfig) { - break; - } - List notifyConfigDTOS = notifyConfigs.get(each.getKey()); - for (NotifyConfigDTO notifyConfig : each.getValue()) { - if (!notifyConfigDTOS.contains(notifyConfig)) { - checkNotifyConfig = true; - break; - } - } - } + + boolean checkNotifyConfig = checkAndReplaceNotifyConfig(newDynamicThreadPoolNotifyMap, notifyConfigs); + boolean checkNotifyAlarm = checkAndReplaceNotifyAlarm(executorProperties); + + if (checkNotifyConfig || checkNotifyAlarm) { + log.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId()); } - if (checkNotifyConfig) { - configModeNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap); - threadPoolBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap); + } + + private boolean checkAndReplaceNotifyConfig(Map> newConfigs, + Map> currentConfigs) { + if (CollectionUtil.isEmpty(currentConfigs)) { + return false; } - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId()); - if (threadPoolNotifyAlarm != null) { - Boolean isAlarm = executorProperties.getAlarm(); - Integer activeAlarm = executorProperties.getActiveAlarm(); - Integer capacityAlarm = executorProperties.getCapacityAlarm(); - if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm())) - || (activeAlarm != null && !Objects.equals(activeAlarm, threadPoolNotifyAlarm.getActiveAlarm())) - || (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) { - checkNotifyAlarm = true; - threadPoolNotifyAlarm.setAlarm(Optional.ofNullable(isAlarm).orElse(threadPoolNotifyAlarm.getAlarm())); - threadPoolNotifyAlarm.setActiveAlarm(Optional.ofNullable(activeAlarm).orElse(threadPoolNotifyAlarm.getActiveAlarm())); - threadPoolNotifyAlarm.setCapacityAlarm(Optional.ofNullable(capacityAlarm).orElse(threadPoolNotifyAlarm.getCapacityAlarm())); + + for (Map.Entry> entry : newConfigs.entrySet()) { + String key = entry.getKey(); + List newNotifyConfigList = entry.getValue(); + List currentNotifyConfigList = currentConfigs.get(key); + + if (currentNotifyConfigList == null || !currentNotifyConfigList.containsAll(newNotifyConfigList)) { + configModeNotifyConfigBuilder.initCacheAndLock(newConfigs); + threadPoolBaseSendMessageService.putPlatform(newConfigs); + return true; } } - if (checkNotifyConfig || checkNotifyAlarm) { - log.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId()); + + return false; + } + + private boolean checkAndReplaceNotifyAlarm(ExecutorProperties executorProperties) { + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId()); + if (threadPoolNotifyAlarm == null) { + return false; + } + + boolean checkNotifyAlarm = false; + Boolean isAlarm = executorProperties.getAlarm(); + Integer activeAlarm = executorProperties.getActiveAlarm(); + Integer capacityAlarm = executorProperties.getCapacityAlarm(); + + if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm())) + || (activeAlarm != null && !Objects.equals(activeAlarm, threadPoolNotifyAlarm.getActiveAlarm())) + || (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) { + checkNotifyAlarm = true; + threadPoolNotifyAlarm.setAlarm(isAlarm != null ? isAlarm : threadPoolNotifyAlarm.getAlarm()); + threadPoolNotifyAlarm.setActiveAlarm(activeAlarm != null ? activeAlarm : threadPoolNotifyAlarm.getActiveAlarm()); + threadPoolNotifyAlarm.setCapacityAlarm(capacityAlarm != null ? capacityAlarm : threadPoolNotifyAlarm.getCapacityAlarm()); } + + return checkNotifyAlarm; } + /** * Check consistency. * @@ -216,19 +231,26 @@ private boolean checkConsistency(String threadPoolId, ExecutorProperties propert ThreadPoolExecutorHolder executorHolder = ThreadPoolExecutorRegistry.getHolder(threadPoolId); ExecutorProperties beforeProperties = executorHolder.getExecutorProperties(); ThreadPoolExecutor executor = executorHolder.getExecutor(); + if (executor == null) { return false; } - boolean result = (properties.getCorePoolSize() != null && !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize())) - || (properties.getMaximumPoolSize() != null && !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize())) - || (properties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) - || (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) - || (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) - || (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) - || - ((properties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) - && Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName()))); - return result; + + return Stream.of( + hasPropertyChanged(beforeProperties.getCorePoolSize(), properties.getCorePoolSize()), + hasPropertyChanged(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()), + hasPropertyChanged(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()), + hasPropertyChanged(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()), + hasPropertyChanged(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()), + hasPropertyChanged(beforeProperties.getRejectedHandler(), properties.getRejectedHandler()), + isQueueCapacityChanged(beforeProperties, properties, executor) + ).anyMatch(Boolean::booleanValue); + } + + private boolean isQueueCapacityChanged(ExecutorProperties beforeProperties, ExecutorProperties properties, ThreadPoolExecutor executor) { + return properties.getQueueCapacity() != null && + !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) && + Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName()); } /** @@ -238,41 +260,62 @@ private boolean checkConsistency(String threadPoolId, ExecutorProperties propert * @param properties */ private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) { - ExecutorProperties beforeProperties = ThreadPoolExecutorRegistry.getHolder(threadPoolId).getExecutorProperties(); - ThreadPoolExecutor executor = ThreadPoolExecutorRegistry.getHolder(threadPoolId).getExecutor(); - if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) { - ThreadPoolExecutorUtil.safeSetPoolSize(executor, properties.getCorePoolSize(), properties.getMaximumPoolSize()); + ThreadPoolExecutorHolder holder = ThreadPoolExecutorRegistry.getHolder(threadPoolId); + ExecutorProperties beforeProperties = holder.getExecutorProperties(); + ThreadPoolExecutor executor = holder.getExecutor(); + + if (executor == null) { + log.warn("Executor is null for threadPoolId: {}", threadPoolId); + return; + } + + setPoolSizes(executor, properties); + updateExecutorProperties(executor, beforeProperties, properties); + updateQueueCapacity(executor, beforeProperties, properties); + } + + private void setPoolSizes(ThreadPoolExecutor executor, ExecutorProperties properties) { + Integer corePoolSize = properties.getCorePoolSize(); + Integer maximumPoolSize = properties.getMaximumPoolSize(); + + if (corePoolSize != null && maximumPoolSize != null) { + ThreadPoolExecutorUtil.safeSetPoolSize(executor, corePoolSize, maximumPoolSize); } else { - if (properties.getMaximumPoolSize() != null) { - executor.setMaximumPoolSize(properties.getMaximumPoolSize()); + if (maximumPoolSize != null) { + executor.setMaximumPoolSize(maximumPoolSize); } - if (properties.getCorePoolSize() != null) { - executor.setCorePoolSize(properties.getCorePoolSize()); + if (corePoolSize != null) { + executor.setCorePoolSize(corePoolSize); } } - if (properties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) { + } + + private void updateExecutorProperties(ThreadPoolExecutor executor, ExecutorProperties beforeProperties, ExecutorProperties properties) { + if (hasPropertyChanged(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) { executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut()); } - if (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) { - if (executor instanceof DynamicThreadPoolExecutor) { - ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut()); - } + if (hasPropertyChanged(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()) && executor instanceof DynamicThreadPoolExecutor) { + ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut()); } - if (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) { + if (hasPropertyChanged(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) { RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(properties.getRejectedHandler()); executor.setRejectedExecutionHandler(rejectedExecutionHandler); } - if (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) { + if (hasPropertyChanged(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) { executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS); } - if (properties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) - && Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())) { - if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) { - ResizableCapacityLinkedBlockingQueue queue = (ResizableCapacityLinkedBlockingQueue) executor.getQueue(); - queue.setCapacity(properties.getQueueCapacity()); - } else { - log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName()); - } + } + + private void updateQueueCapacity(ThreadPoolExecutor executor, ExecutorProperties beforeProperties, ExecutorProperties properties) { + if (hasPropertyChanged(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) + && executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) { + ((ResizableCapacityLinkedBlockingQueue) executor.getQueue()).setCapacity(properties.getQueueCapacity()); + } else { + log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName()); } } + + private boolean hasPropertyChanged(T before, T after) { + return after != null && !Objects.equals(before, after); + } }