Skip to content

Commit

Permalink
[ISSUE #42] fix the problem HealthExcutor don't doCheck asynchronously (
Browse files Browse the repository at this point in the history
#43)

* fix: let HealthExecutor support async doCheck

* chore: move test files out of unit folder

* chore: add HealthConstant

* chore

* style: rewrite description assign in CheckResultCache in if-else
  • Loading branch information
Lambert-Rao authored Mar 1, 2024
1 parent 15b1f40 commit d816839
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.console.constant;

public class HealthConstant {
public static final String NEW_LINE_ENDING = "\n";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.dashboard.console.health;

import org.apache.eventmesh.dashboard.console.constant.HealthConstant;
import org.apache.eventmesh.dashboard.console.enums.health.HealthCheckStatus;
import org.apache.eventmesh.dashboard.console.health.check.config.HealthCheckObjectConfig;

Expand All @@ -41,8 +42,12 @@ public void update(String type, Long typeId, HealthCheckStatus status, String re
cacheMap.put(type, subMap);
}
CheckResult oldResult = subMap.get(typeId);
String oldDesc = Objects.isNull(oldResult.getResultDesc()) ? "" : oldResult.getResultDesc() + "\n";
CheckResult result = new CheckResult(status, oldDesc + resultDesc, LocalDateTime.now(),
String description = resultDesc;
if (oldResult.getResultDesc() != null && !oldResult.getResultDesc().isEmpty()) {
description = oldResult.getResultDesc() + HealthConstant.NEW_LINE_ENDING + resultDesc;
}
description += " Latency: " + latency.toString() + "ms";
CheckResult result = new CheckResult(status, description, LocalDateTime.now(),
latency, oldResult.getConfig());
subMap.put(typeId, result);
}
Expand All @@ -53,7 +58,6 @@ public void update(String type, Long typeId, HealthCheckStatus status, String re
subMap = new HashMap<>();
cacheMap.put(type, subMap);
}
CheckResult resultToUpdate = subMap.get(typeId);
subMap.put(typeId, new CheckResult(status, resultDesc, LocalDateTime.now(), latency, config));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.eventmesh.dashboard.console.service.health.HealthDataService;

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import lombok.Getter;
import lombok.Setter;
Expand All @@ -44,6 +46,8 @@ public class HealthExecutor {
@Setter
private CheckResultCache memoryCache;

private final ExecutorService executorService = Executors.newCachedThreadPool();

/**
* execute function is where health check services work.
*
Expand All @@ -58,15 +62,15 @@ public void execute(AbstractHealthCheckService service) {
memoryCache.update(service.getConfig().getHealthCheckResourceType(), service.getConfig().getInstanceId(), HealthCheckStatus.CHECKING, "",
null, service.getConfig());
//The callback interface is used to pass the processing methods for checking success and failure.
service.doCheck(new HealthCheckCallback() {
executorService.submit(() -> service.doCheck(new HealthCheckCallback() {
@Override
public void onSuccess() {
//when the health check is successful, the result is updated to the memory cache.
Long latency = System.currentTimeMillis() - startTime;
HealthCheckStatus status =
latency > service.getConfig().getRequestTimeoutMillis() ? HealthCheckStatus.TIMEOUT : HealthCheckStatus.PASSED;
memoryCache.update(service.getConfig().getHealthCheckResourceType(), service.getConfig().getInstanceId(),
status, "health check success", latency
status, "Health check succeed.", latency
);
}

Expand All @@ -78,7 +82,8 @@ public void onFail(Exception e) {
HealthCheckStatus.FAILED, e.getMessage(),
System.currentTimeMillis() - startTime);
}
});
}));

} catch (Exception e) {
log.error("Health check failed for reason: {}. Service config is {}", e, service.getConfig());
memoryCache.update(service.getConfig().getHealthCheckResourceType(), service.getConfig().getInstanceId(), HealthCheckStatus.FAILED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private static void setClassCache(Class<?> clazz) {
*
* @see AbstractHealthCheckService
*/
private Map<String, Map<Long, AbstractHealthCheckService>> checkServiceMap = new ConcurrentHashMap<>();
private final Map<String, Map<Long, AbstractHealthCheckService>> checkServiceMap = new ConcurrentHashMap<>();

private ScheduledThreadPoolExecutor scheduledExecutor;

Expand All @@ -87,10 +87,13 @@ public void insertCheckService(HealthCheckObjectConfig config) {
if (Objects.nonNull(config.getSimpleClassName())) {
Class<?> clazz = HEALTH_CHECK_CLASS_CACHE.get(config.getSimpleClassName());
healthCheckService = createCheckService(clazz, config);
//if simpleClassName is null, use type(storage) and subType(redis) to create healthCheckService
// you can pass an object to create a HealthCheckService(not commonly used)
} else if (Objects.nonNull(config.getCheckClass())) {
healthCheckService = createCheckService(config.getCheckClass(), config);
//if simpleClassName and CheckClass are both null, use type(storage) and subType(redis) to create healthCheckService
//This is the default create method.
//healthCheckService is annotated with @HealthCheckType(type = "storage", subType = "redis")
} else if (Objects.isNull(config.getSimpleClassName())
&& Objects.nonNull(config.getHealthCheckResourceType()) && Objects.nonNull(
} else if (Objects.nonNull(config.getHealthCheckResourceType()) && Objects.nonNull(
config.getHealthCheckResourceSubType())) {
for (Entry<String, Class<?>> entry : HEALTH_CHECK_CLASS_CACHE.entrySet()) {
Class<?> clazz = entry.getValue();
Expand All @@ -101,9 +104,6 @@ public void insertCheckService(HealthCheckObjectConfig config) {
break;
}
}
// you can pass an object to create a HealthCheckService(not commonly used)
} else if (Objects.nonNull(config.getCheckClass())) {
healthCheckService = createCheckService(config.getCheckClass(), config);
}
} catch (Exception e) {
log.error("create healthCheckService failed, healthCheckObjectConfig:{}", config, e);
Expand All @@ -113,6 +113,13 @@ public void insertCheckService(HealthCheckObjectConfig config) {
if (Objects.isNull(healthCheckService)) {
throw new RuntimeException("No construct method of Health Check Service is found, config is {}" + config);
}
insertCheckService(healthCheckService);
}

public void insertCheckService(AbstractHealthCheckService checkService) {
Map<Long, AbstractHealthCheckService> subMap = checkServiceMap.computeIfAbsent(checkService.getConfig().getHealthCheckResourceType(),
k -> new ConcurrentHashMap<>());
subMap.put(checkService.getConfig().getInstanceId(), checkService);
}

public void deleteCheckService(String resourceType, Long resourceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.eventmesh.dashboard.console.health.check.config;

import java.util.Properties;

import lombok.Data;

@Data
Expand All @@ -32,6 +34,8 @@ public class HealthCheckObjectConfig {

private Class<?> checkClass;

private Properties eventmeshProperties;

private Long clusterId;

private String connectUrl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ public void init() {
}
redisUrl = builder.build().toString();
}
RedisClient redisClient = RedisClient.create(redisUrl);
redisClient = RedisClient.create(redisUrl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ public void initMock() {
}

@Test
public void testExecute() {
public void testExecute() throws InterruptedException {
healthExecutor.execute(successHealthCheckService);
healthExecutor.execute(failHealthCheckService);
Thread.sleep(1000);
assertEquals(2, memoryCache.getCacheMap().get("storage").size());
assertNotEquals(memoryCache.getCacheMap().get("storage").get(1L).getStatus(), memoryCache.getCacheMap().get("storage").get(2L).getStatus());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.eventmesh.dashboard.console.health;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.apache.eventmesh.dashboard.console.health.callback.HealthCheckCallback;
import org.apache.eventmesh.dashboard.console.health.check.AbstractHealthCheckService;
import org.apache.eventmesh.dashboard.console.health.check.config.HealthCheckObjectConfig;
Expand Down Expand Up @@ -60,6 +58,8 @@ void testInsertCheckServiceWithSimpleClassName() {
HealthCheckObjectConfig config = new HealthCheckObjectConfig();
config.setInstanceId(1L);
config.setSimpleClassName("StorageRedisCheck");
config.setHealthCheckResourceType("storage");
config.setHealthCheckResourceSubType("redis");
config.setClusterId(1L);
config.setConnectUrl("redis://localhost:6379");
healthService.insertCheckService(config);
Expand All @@ -69,6 +69,8 @@ void testInsertCheckServiceWithSimpleClassName() {
void testInsertCheckServiceWithClass() {
HealthCheckObjectConfig config = new HealthCheckObjectConfig();
config.setInstanceId(1L);
config.setHealthCheckResourceType("storage");
config.setHealthCheckResourceSubType("redis");
config.setClusterId(1L);
config.setCheckClass(TestHealthCheckService.class);
healthService.insertCheckService(config);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.console.integration.health;

import java.util.List;
import org.apache.eventmesh.dashboard.console.entity.health.HealthCheckResultEntity;
import org.apache.eventmesh.dashboard.console.enums.health.HealthCheckType;
import org.apache.eventmesh.dashboard.console.health.CheckResultCache;
import org.apache.eventmesh.dashboard.console.health.HealthService;
import org.apache.eventmesh.dashboard.console.health.check.config.HealthCheckObjectConfig;
import org.apache.eventmesh.dashboard.console.service.health.HealthDataService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.jdbc.Sql;

@SpringBootTest
@ActiveProfiles("test")
@Sql(executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD, scripts = {"classpath:use-test-schema.sql", "classpath:eventmesh-dashboard.sql"})
public class HealthServiceIntegrateTest {
HealthService healthService = new HealthService();

@Autowired
private HealthDataService healthDataService;

private final CheckResultCache checkResultCache = new CheckResultCache();
@BeforeEach
void init() {
healthService.createExecutor(healthDataService, checkResultCache);
}

@Test
void testStorageRedis() throws InterruptedException {
HealthCheckObjectConfig config = new HealthCheckObjectConfig();
config.setClusterId(1L);
config.setInstanceId(1L);
config.setHealthCheckResourceType("storage");
config.setHealthCheckResourceSubType("redis");
config.setConnectUrl("redis://localhost:6379");
healthService.insertCheckService(config);
healthService.executeAll();
Thread.sleep(1000);
healthService.executeAll();

HealthCheckResultEntity queryEntity = new HealthCheckResultEntity();
queryEntity.setClusterId(1L);
queryEntity.setType(HealthCheckType.STORAGE.getNumber());
queryEntity.setTypeId(1L);
List<HealthCheckResultEntity> results = healthDataService.queryHealthCheckResultByClusterIdAndTypeAndTypeId(queryEntity);
Assertions.assertEquals(2,results.size());
}
}
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<excludes>
<exclude>**/org/apache/eventmesh/dashboard/console/integration/**/*.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

0 comments on commit d816839

Please sign in to comment.