From c3cacc80e096132c12f79719fceff0e01a3a8e91 Mon Sep 17 00:00:00 2001 From: scwlkq Date: Sat, 2 Mar 2024 23:32:43 +0800 Subject: [PATCH 1/2] implement methods from storage-plugin.admin(rocketmq) --- eventmesh-dashboard-core/pom.xml | 10 ++ .../core/controller/TopicController.java | 14 +-- .../dashboard/core/service/TopicService.java | 9 +- .../service/store/RocketmqTopicService.java | 105 ++++++++++++++++-- .../src/main/resources/application-dev.yml | 6 +- 5 files changed, 120 insertions(+), 24 deletions(-) diff --git a/eventmesh-dashboard-core/pom.xml b/eventmesh-dashboard-core/pom.xml index 651cad83..ea4a432d 100644 --- a/eventmesh-dashboard-core/pom.xml +++ b/eventmesh-dashboard-core/pom.xml @@ -101,6 +101,16 @@ rocketmq-client 4.9.7 + + org.apache.rocketmq + rocketmq-tools + 4.9.5 + + + org.apache.rocketmq + rocketmq-remoting + 4.9.5 + diff --git a/eventmesh-dashboard-core/src/main/java/org/apache/eventmesh/dashboard/core/controller/TopicController.java b/eventmesh-dashboard-core/src/main/java/org/apache/eventmesh/dashboard/core/controller/TopicController.java index f95ffbbe..1d320916 100644 --- a/eventmesh-dashboard-core/src/main/java/org/apache/eventmesh/dashboard/core/controller/TopicController.java +++ b/eventmesh-dashboard-core/src/main/java/org/apache/eventmesh/dashboard/core/controller/TopicController.java @@ -58,23 +58,23 @@ public ResponseEntity preflight() { } @CrossOrigin - @GetMapping - public Result> getList() { - List topicList = topicService.getTopic(); + @GetMapping("/getList") + public Result> getList() throws Exception { + List topicList = topicService.getTopics(); return Result.success(topicList); } @CrossOrigin - @PostMapping - public Result create(@RequestBody CreateTopicRequest createTopicRequest) { + @PostMapping("/create") + public Result create(@RequestBody CreateTopicRequest createTopicRequest) throws Exception { String topicName = createTopicRequest.getName(); topicService.createTopic(topicName); return Result.success(); } @CrossOrigin - @DeleteMapping - public Result delete(@RequestBody DeleteTopicRequest deleteTopicRequest) { + @DeleteMapping("/delete") + public Result delete(@RequestBody DeleteTopicRequest deleteTopicRequest) throws Exception { String topicName = deleteTopicRequest.getName(); topicService.deleteTopic(topicName); return Result.success(); diff --git a/eventmesh-dashboard-core/src/main/java/org/apache/eventmesh/dashboard/core/service/TopicService.java b/eventmesh-dashboard-core/src/main/java/org/apache/eventmesh/dashboard/core/service/TopicService.java index 980aed35..4db1555f 100644 --- a/eventmesh-dashboard-core/src/main/java/org/apache/eventmesh/dashboard/core/service/TopicService.java +++ b/eventmesh-dashboard-core/src/main/java/org/apache/eventmesh/dashboard/core/service/TopicService.java @@ -27,12 +27,9 @@ public interface TopicService { - /** - * TODO rename to getTopics after being implemented - */ - List getTopic(); + List getTopics() throws Exception; - void createTopic(String topicName); + void createTopic(String topicName) throws Exception; - void deleteTopic(String topicName); + void deleteTopic(String topicName) throws Exception; } diff --git a/eventmesh-dashboard-core/src/main/java/org/apache/eventmesh/dashboard/core/service/store/RocketmqTopicService.java b/eventmesh-dashboard-core/src/main/java/org/apache/eventmesh/dashboard/core/service/store/RocketmqTopicService.java index bc0b7bf3..8bbd3efa 100644 --- a/eventmesh-dashboard-core/src/main/java/org/apache/eventmesh/dashboard/core/service/store/RocketmqTopicService.java +++ b/eventmesh-dashboard-core/src/main/java/org/apache/eventmesh/dashboard/core/service/store/RocketmqTopicService.java @@ -21,15 +21,28 @@ import org.apache.eventmesh.dashboard.core.model.TopicProperties; import org.apache.eventmesh.dashboard.core.service.TopicService; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.admin.TopicOffset; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Set; +import java.util.UUID; import org.springframework.stereotype.Service; import lombok.extern.slf4j.Slf4j; -/** - * TODO implement methods from storage-plugin.admin - */ @Slf4j @Service @@ -37,22 +50,98 @@ public class RocketmqTopicService implements TopicService { AdminProperties adminProperties; + private final RPCHook rpcHook; + + protected String nameServerAddr; + + protected String clusterName; + + private int numOfQueue = 4; + private int queuePermission = 6; + + private String accessKey; + + private String secretKey; + + + public RocketmqTopicService(AdminProperties adminProperties) { this.adminProperties = adminProperties; + this.nameServerAddr = adminProperties.getStore().getRocketmq().getNamesrvAddr(); + this.clusterName = adminProperties.getStore().getRocketmq().getClusterName(); + this.accessKey = adminProperties.getStore().getRocketmq().getAccessKey(); + this.secretKey = adminProperties.getStore().getRocketmq().getSecretKey(); + this.rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); } - @Override - public List getTopic() { - return null; + private DefaultMQAdminExt createMQAdminExt() { + DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook); + String groupId = UUID.randomUUID().toString(); + adminExt.setAdminExtGroup("admin_ext_group-" + groupId); + adminExt.setNamesrvAddr(nameServerAddr); + return adminExt; } @Override - public void createTopic(String topicName) { + public List getTopics() throws Exception { + DefaultMQAdminExt adminExt = createMQAdminExt(); + try { + List result = new ArrayList<>(); + adminExt.start(); + Set topicList = adminExt.fetchAllTopicList().getTopicList(); + for (String topic : topicList) { + long messageCount = 0; + TopicStatsTable topicStats = adminExt.examineTopicStats(topic); + HashMap offsetTable = topicStats.getOffsetTable(); + for (TopicOffset topicOffset : offsetTable.values()) { + messageCount += topicOffset.getMaxOffset() - topicOffset.getMinOffset(); + } + result.add(new TopicProperties( + topic, messageCount)); + } + + result.sort(Comparator.comparing(t -> t.name)); + return result; + } finally { + adminExt.shutdown(); + } } @Override - public void deleteTopic(String topicName) { + public void createTopic(String topicName) throws Exception { + if (StringUtils.isBlank(topicName)) { + throw new Exception("Topic name can not be blank"); + } + DefaultMQAdminExt adminExt = createMQAdminExt(); + try { + adminExt.start(); + Set brokerAddress = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName); + for (String masterAddress : brokerAddress) { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topicName); + topicConfig.setReadQueueNums(numOfQueue); + topicConfig.setWriteQueueNums(numOfQueue); + topicConfig.setPerm(queuePermission); + adminExt.createAndUpdateTopicConfig(masterAddress, topicConfig); + } + } finally { + adminExt.shutdown(); + } + } + @Override + public void deleteTopic(String topicName) throws Exception { + if (StringUtils.isBlank(topicName)) { + throw new Exception("Topic name can not be blank."); + } + DefaultMQAdminExt adminExt = createMQAdminExt(); + try { + adminExt.start(); + Set brokerAddress = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName); + adminExt.deleteTopicInBroker(brokerAddress, topicName); + } finally { + adminExt.shutdown(); + } } } diff --git a/eventmesh-dashboard-core/src/main/resources/application-dev.yml b/eventmesh-dashboard-core/src/main/resources/application-dev.yml index f61a3c39..c7dd2d93 100644 --- a/eventmesh-dashboard-core/src/main/resources/application-dev.yml +++ b/eventmesh-dashboard-core/src/main/resources/application-dev.yml @@ -19,9 +19,9 @@ spring: type: com.alibaba.druid.pool.DruidDataSource druid: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://localhost:3306/eventmesh-dashboard?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8 + url: jdbc:mysql://175.27.155.139:3306/eventmesh-dashboard?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8 username: root - password: root + password: mysql123 initial-size: 1 max-active: 50 @@ -65,7 +65,7 @@ eventmesh: standalone: # TODO rocketmq: - namesrvAddr: 127.0.0.1:9876;127.0.0.1:9876 + namesrvAddr: 175.27.155.139:9876;175.27.155.139:9876 clusterName: DefaultCluster accessKey: '********' secretKey: '********' From a9db650cc5f3ec8be9b5e50b6e7b9a81a8bfee89 Mon Sep 17 00:00:00 2001 From: scwlkq Date: Sat, 2 Mar 2024 23:35:34 +0800 Subject: [PATCH 2/2] revert application-dev.yml --- .../src/main/resources/application-dev.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eventmesh-dashboard-core/src/main/resources/application-dev.yml b/eventmesh-dashboard-core/src/main/resources/application-dev.yml index c7dd2d93..f61a3c39 100644 --- a/eventmesh-dashboard-core/src/main/resources/application-dev.yml +++ b/eventmesh-dashboard-core/src/main/resources/application-dev.yml @@ -19,9 +19,9 @@ spring: type: com.alibaba.druid.pool.DruidDataSource druid: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://175.27.155.139:3306/eventmesh-dashboard?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8 + url: jdbc:mysql://localhost:3306/eventmesh-dashboard?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8 username: root - password: mysql123 + password: root initial-size: 1 max-active: 50 @@ -65,7 +65,7 @@ eventmesh: standalone: # TODO rocketmq: - namesrvAddr: 175.27.155.139:9876;175.27.155.139:9876 + namesrvAddr: 127.0.0.1:9876;127.0.0.1:9876 clusterName: DefaultCluster accessKey: '********' secretKey: '********'