Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #45] Implement methods from storage-plugin.admin(rocketmq) #55

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
51be376
add rocketmqUtils
scwlkq Mar 9, 2024
b891623
add RocketmqUtils.java
scwlkq Mar 9, 2024
5ee7786
add RocketmqUtils.java
scwlkq Mar 9, 2024
17892e3
change rocketmqUtils impl from MQAdmin to RemotingClient
scwlkq Mar 9, 2024
cfcc41f
fix style
scwlkq Mar 9, 2024
5f76e29
fix style
scwlkq Mar 9, 2024
3a3415c
fix style
scwlkq Mar 9, 2024
dc2634c
fix style
scwlkq Mar 9, 2024
6d4a283
fix style
scwlkq Mar 9, 2024
6561d69
fix test style
scwlkq Mar 9, 2024
4b8fb1d
[ISSUE #29] Set up EventMesh Dashboard Front-end (#56)
SLSJL Mar 9, 2024
449b352
modify rocket dependency version
scwlkq Mar 10, 2024
7ebde4b
move classparse to RocketmqUtils.getTopics
scwlkq Mar 10, 2024
d523e26
fix style
scwlkq Mar 10, 2024
eaf4a3a
Revert "[ISSUE #29] Set up EventMesh Dashboard Front-end (#56)"
scwlkq Mar 13, 2024
e2f4c09
refactor TopicProperties
scwlkq Mar 13, 2024
16e7a53
synchronous
scwlkq Mar 14, 2024
50d40b5
try to sync rocketmq related code
scwlkq Mar 14, 2024
31c0117
move classparser to rocketmqUtil
scwlkq Mar 14, 2024
e2b6c48
move RocketmqProperties to properties
scwlkq Mar 14, 2024
d898abc
[ISSUE #57] Modify the field, synchronize the modification, and add t…
zzxxiansheng Mar 12, 2024
717ff8b
[ISSUE #60] add SDK manager (#62)
Lambert-Rao Mar 15, 2024
fa1c716
[ISSUE #64] Support automated deployment and Fix runtime packaging er…
Pil0tXia Mar 15, 2024
b33d5b0
move clientManager related to core module && add RocketmqService
scwlkq Mar 17, 2024
6e4ce01
revert application-dev.yml
scwlkq Mar 17, 2024
07f71d5
remove @Service
scwlkq Mar 17, 2024
8ff1297
sync
scwlkq Mar 17, 2024
418c6b7
move clientManager to core module
scwlkq Mar 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions eventmesh-dashboard-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@
<artifactId>fastjson2</artifactId>
<version>2.0.40</version>
</dependency>
<!-- Event Store -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.5</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>4.9.5</version>
</dependency>
scwlkq marked this conversation as resolved.
Show resolved Hide resolved
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageType;

import java.util.List;
import java.util.Map;
import java.util.Queue;

/**
* One record displayed in 'Topic' page.
*/

@Data
public class TopicProperties {

public String name;
public long messageCount;
private TopicConfig rocketmqTopicConfig;

@JsonCreator
public TopicProperties(
@JsonProperty("name") String name,
@JsonProperty("messageCount") long messageCount) {
super();
this.name = name;
this.messageCount = messageCount;
}
}
scwlkq marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.apache.eventmesh.dashboard.common.properties;

import lombok.Data;

@Data
public class RocketmqProperties {
private String namesrvAddr;

private String clusterName;

private String brokerUrl;

private String endPoint;

private int writeQueueNums;

private int readQueueNums;

private String accessKey;

private String secretKey;

private Long requestTimeoutMillis = 10000L;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.apache.eventmesh.dashboard.common.util;

import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

import java.util.*;
import java.util.concurrent.ConcurrentMap;

@Slf4j
@UtilityClass
public class RocketmqUtils {
private final RemotingClient remotingClient;

static{
NettyClientConfig config = new NettyClientConfig();
config.setUseTLS(false);
remotingClient = new NettyRemotingClient(config);
remotingClient.start();
}


public void createTopic(String topicName, String topicFilterTypeName, int perm, String nameServerAddr,
int readQueueNums, int writeQueueNums, long requestTimeoutMillis) {
try {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicName);
requestHeader.setTopicFilterType(topicFilterTypeName);
requestHeader.setReadQueueNums(readQueueNums);
requestHeader.setWriteQueueNums(writeQueueNums);
requestHeader.setPerm(perm);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
Object result = remotingClient.invokeSync(nameServerAddr, request, requestTimeoutMillis);
log.info("rocketmq create topic result:" + result.toString());
} catch (Exception e) {
log.error("RocketmqTopicCheck init failed when examining topic stats.", e);
}
}

public List<TopicConfig> getTopics(String nameServerAddr, long requestTimeoutMillis) {
List<TopicConfig> topicConfigList = new ArrayList<>();
try {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, (CommandCustomHeader) null);
RemotingCommand response = remotingClient.invokeSync(nameServerAddr, request, 3000L);
TopicConfigSerializeWrapper allTopicConfig = TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
ConcurrentMap<String, TopicConfig> topicConfigTable = allTopicConfig.getTopicConfigTable();
topicConfigList = new ArrayList<>(topicConfigTable.values());
} catch (Exception e) {
log.error("RocketmqTopicCheck init failed when examining topic stats.", e);
}
return topicConfigList;
}


public void deleteTopic(String topicName, String nameServerAddr, long requestTimeoutMillis) {
try {
DeleteTopicRequestHeader deleteTopicRequestHeader = new DeleteTopicRequestHeader();
deleteTopicRequestHeader.setTopic(topicName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, null);
Object result = remotingClient.invokeSync(nameServerAddr, request, requestTimeoutMillis);

log.info("rocketmq delete topic result:" + result.toString());
} catch (Exception e) {
log.error("RocketmqTopicCheck init failed when examining topic stats.", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,22 @@ public ResponseEntity<Object> preflight() {
}

@CrossOrigin
@GetMapping
@GetMapping("/list")
public Result<List<TopicProperties>> getList() {
List<TopicProperties> topicList = topicCore.getTopic();
List<TopicProperties> topicList = topicCore.getTopics();
return Result.success(topicList);
}

@CrossOrigin
@PostMapping
@PostMapping("/create")
public Result<Object> create(@RequestBody CreateTopicRequest createTopicRequest) {
String topicName = createTopicRequest.getName();
topicCore.createTopic(topicName);
return Result.success();
}

@CrossOrigin
@DeleteMapping
@DeleteMapping("/delete")
public Result<Object> delete(@RequestBody DeleteTopicRequest deleteTopicRequest) {
String topicName = deleteTopicRequest.getName();
topicCore.deleteTopic(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Properties;

import lombok.Data;
import org.apache.eventmesh.dashboard.common.properties.RocketmqProperties;

@Data
public class HealthCheckObjectConfig {
Expand Down Expand Up @@ -56,13 +57,6 @@ public class HealthCheckObjectConfig {

private Long requestTimeoutMillis = 100000L;

private RocketmqConfig rocketmqConfig = new RocketmqConfig();
private RocketmqProperties rocketmqProperties = new RocketmqProperties();

@Data
public class RocketmqConfig {

private String nameServerUrl;
private String brokerUrl;
private String endPoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Rocketmq4BrokerCheck(HealthCheckObjectConfig healthCheckObjectConfig) {
public void doCheck(HealthCheckCallback callback) {
try {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null);
remotingClient.invokeAsync(getConfig().getRocketmqConfig().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis(),
remotingClient.invokeAsync(getConfig().getRocketmqProperties().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis(),
new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
Expand All @@ -71,7 +71,7 @@ public void operationComplete(ResponseFuture responseFuture) {

@Override
public void init() {
if (getConfig().getRocketmqConfig().getBrokerUrl() == null || getConfig().getRocketmqConfig().getBrokerUrl().isEmpty()) {
if (getConfig().getRocketmqProperties().getBrokerUrl() == null || getConfig().getRocketmqProperties().getBrokerUrl().isEmpty()) {
throw new IllegalArgumentException("RocketmqCheck failed. BrokerUrl is null.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Rocketmq4NameServerCheck(HealthCheckObjectConfig healthCheckObjectConfig)
public void doCheck(HealthCheckCallback callback) {
try {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_NAMESRV_CONFIG, null);
remotingClient.invokeAsync(getConfig().getRocketmqConfig().getNameServerUrl(), request, getConfig().getRequestTimeoutMillis(),
remotingClient.invokeAsync(getConfig().getRocketmqProperties().getNamesrvAddr(), request, getConfig().getRequestTimeoutMillis(),
new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
Expand All @@ -67,7 +67,7 @@ public void operationComplete(ResponseFuture responseFuture) {

@Override
public void init() {
if (getConfig().getRocketmqConfig().getNameServerUrl() == null || getConfig().getRocketmqConfig().getNameServerUrl().isEmpty()) {
if (getConfig().getRocketmqProperties().getNamesrvAddr() == null || getConfig().getRocketmqProperties().getNamesrvAddr().isEmpty()) {
throw new RuntimeException("RocketmqNameServerCheck init failed, nameServerUrl is empty");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.rocketmq.client.producer.SendStatus.SEND_OK;

import org.apache.eventmesh.dashboard.common.util.RocketmqUtils;
import org.apache.eventmesh.dashboard.console.constant.health.HealthCheckTypeConstant;
import org.apache.eventmesh.dashboard.console.constant.health.HealthConstant;
import org.apache.eventmesh.dashboard.console.function.health.annotation.HealthCheckType;
Expand Down Expand Up @@ -81,8 +82,8 @@ public void doCheck(HealthCheckCallback callback) {
log.debug("RocketmqTopicCheck start, uuid:{}", uuid);
try {
Message msg = new Message(HealthConstant.ROCKETMQ_CHECK_TOPIC, "eventmesh-dashboard-rocketmq-topic-check", uuid
.getBytes(
RemotingHelper.DEFAULT_CHARSET));
.getBytes(
RemotingHelper.DEFAULT_CHARSET));
synchronized (this) {
producer.send(msg, new SendCallback() {
@Override
Expand Down Expand Up @@ -138,30 +139,18 @@ public void init() {

//TODO there are many functions that can be reused, they should be collected in a util module
//this function that create topics can be reused
try {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(HealthConstant.ROCKETMQ_CHECK_TOPIC);
requestHeader.setTopicFilterType(TopicFilterType.SINGLE_TAG.name());
requestHeader.setReadQueueNums(8);
requestHeader.setWriteQueueNums(8);
requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
Object result = remotingClient.invokeSync(getConfig().getRocketmqConfig().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis());
log.info(result.toString());
} catch (Exception e) {
log.error("RocketmqTopicCheck init failed when examining topic stats.", e);
return;
}
RocketmqUtils.createTopic(HealthConstant.ROCKETMQ_CHECK_TOPIC, TopicFilterType.SINGLE_TAG.name(), PermName.PERM_READ | PermName.PERM_WRITE,
getConfig().getRocketmqProperties().getBrokerUrl(), 8, 8, getConfig().getRequestTimeoutMillis());
scwlkq marked this conversation as resolved.
Show resolved Hide resolved

try {
producer = new DefaultMQProducer(HealthConstant.ROCKETMQ_CHECK_PRODUCER_GROUP);
producer.setNamesrvAddr(getConfig().getRocketmqConfig().getNameServerUrl());
producer.setNamesrvAddr(getConfig().getRocketmqProperties().getNamesrvAddr());
producer.setCompressMsgBodyOverHowmuch(16);
producer.start();

consumer = new DefaultMQPushConsumer(HealthConstant.ROCKETMQ_CHECK_CONSUMER_GROUP);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setNamesrvAddr(getConfig().getRocketmqConfig().getNameServerUrl());
consumer.setNamesrvAddr(getConfig().getRocketmqProperties().getNamesrvAddr());
consumer.subscribe(HealthConstant.ROCKETMQ_CHECK_TOPIC, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
Expand Down
7 changes: 0 additions & 7 deletions eventmesh-dashboard-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,5 @@
<artifactId>nacos-client</artifactId>
<version>2.2.4</version>
</dependency>

<!-- Event Store -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.7</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,55 @@
package org.apache.eventmesh.dashboard.core.store;

import org.apache.eventmesh.dashboard.common.model.TopicProperties;
import org.apache.eventmesh.dashboard.core.config.AdminProperties;
import org.apache.eventmesh.dashboard.common.properties.RocketmqProperties;
import org.apache.eventmesh.dashboard.common.util.RocketmqUtils;
import org.apache.eventmesh.dashboard.service.store.TopicCore;

import java.util.ArrayList;
import java.util.List;

import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.PermName;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;

/**
* TODO implement methods from storage-plugin.admin
*/

@Slf4j
@Service
public class RocketmqTopicCore implements TopicCore {

AdminProperties adminProperties;
private final RocketmqProperties rocketmqProperties;

public RocketmqTopicCore(AdminProperties adminProperties) {
this.adminProperties = adminProperties;
public RocketmqTopicCore(RocketmqProperties rocketmqProperties) {
this.rocketmqProperties = rocketmqProperties;
}

@Override
public List<TopicProperties> getTopic() {
return null;
public List<TopicProperties> getTopics() {
List<TopicConfig> topicConfigList =
RocketmqUtils.getTopics(rocketmqProperties.getNamesrvAddr(), rocketmqProperties.getRequestTimeoutMillis());
List<TopicProperties> topicPropertiesList = new ArrayList<>();
for (TopicConfig topicConfig : topicConfigList) {
TopicProperties topicProperties = new TopicProperties();
topicProperties.setRocketmqTopicConfig(topicConfig);
topicPropertiesList.add(topicProperties);
}
return topicPropertiesList;
}

@Override
public void createTopic(String topicName) {

RocketmqUtils.createTopic(topicName, TopicFilterType.SINGLE_TAG.name(),
PermName.PERM_READ | PermName.PERM_WRITE, rocketmqProperties.getNamesrvAddr(),
rocketmqProperties.getReadQueueNums(), rocketmqProperties.getWriteQueueNums(),
rocketmqProperties.getRequestTimeoutMillis());
}

@Override
public void deleteTopic(String topicName) {

RocketmqUtils.deleteTopic(topicName, rocketmqProperties.getNamesrvAddr(),
rocketmqProperties.getRequestTimeoutMillis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@

public interface TopicCore {

/**
* TODO rename to getTopics after being implemented
*/
List<TopicProperties> getTopic();
List<TopicProperties> getTopics();

void createTopic(String topicName);

Expand Down