Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #97] Runtime tcp http grpc client impl #96

Merged
merged 23 commits into from
Apr 15, 2024
Merged
30 changes: 28 additions & 2 deletions eventmesh-dashboard-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@
</properties>

<dependencies>
<!-- grpc -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.51.0</version>
</dependency>
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>

<!-- EventMesh Dashboard modules -->
<dependency>
<groupId>org.apache.eventmesh.dashboard.common</groupId>
Expand All @@ -52,6 +65,19 @@
<version>0.0.1-SNAPSHOT</version>
</dependency>

<!-- eventmesh -->
<dependency>
<groupId>org.apache.eventmesh</groupId>
<artifactId>eventmesh-sdk-java</artifactId>
<version>1.10.0-release</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Meta - nacos client -->
<dependency>
<groupId>com.alibaba.nacos</groupId>
Expand All @@ -61,7 +87,7 @@
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.3.0</version>
<version>0.7.5</version>
</dependency>

<!-- health check client -->
Expand Down Expand Up @@ -121,4 +147,4 @@
<scope>test</scope>
</dependency>
</dependencies>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RocketMQProduceSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RocketMQPushConsumerSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RocketMQRemotingSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeGrpcConsumerSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeGrpcProducerSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeHttpConsumerSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeHttpProducerSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeTcpCloudEventSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeTcpEventMeshSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeTcpOpenMessageSDKOperation;

import java.util.AbstractMap.SimpleEntry;
import java.util.Map;
Expand All @@ -38,10 +46,17 @@
*/
public class SDKManager {

private static final SDKManager INSTANCE = new SDKManager();
private static volatile SDKManager INSTANCE = null;


public static SDKManager getInstance() {
public static synchronized SDKManager getInstance() {
if (INSTANCE == null) {
synchronized (SDKManager.class) {
if (INSTANCE == null) {
INSTANCE = new SDKManager();
}
}
}
return INSTANCE;
}

Expand Down Expand Up @@ -73,6 +88,17 @@ public static SDKManager getInstance() {

clientCreateOperationMap.put(SDKTypeEnum.META_ETCD, new EtcdSDKOperation());

clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_EVENTMESH_CLIENT, new RuntimeSDKOperation());

clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_TCP_CLOUDEVENT_CLIENT, new RuntimeTcpCloudEventSDKOperation());
clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_TCP_EVENTMESH_CLIENT, new RuntimeTcpEventMeshSDKOperation());
clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_TCP_OPENMESSAGE_CLIENT, new RuntimeTcpOpenMessageSDKOperation());

clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_HTTP_PRODUCER, new RuntimeHttpProducerSDKOperation());
clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_HTTP_CONSUMER, new RuntimeHttpConsumerSDKOperation());

clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_GRPC_PRODUCER, new RuntimeGrpcProducerSDKOperation());
clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_GRPC_CONSUMER, new RuntimeGrpcConsumerSDKOperation());
}

private SDKManager() {
Expand Down Expand Up @@ -114,4 +140,9 @@ public void deleteClient(SDKTypeEnum clientTypeEnum, String uniqueKey) {
public Object getClient(SDKTypeEnum clientTypeEnum, String uniqueKey) {
return this.clientMap.get(clientTypeEnum).get(uniqueKey);
}

// get all client
public Map<String, Object> getClients(SDKTypeEnum clientTypeEnum) {
return this.clientMap.get(clientTypeEnum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,16 @@ public enum SDKTypeEnum {
META_NACOS_NAMING,

META_ETCD,

RUNTIME_EVENTMESH_CLIENT,

RUNTIME_TCP_CLOUDEVENT_CLIENT,
RUNTIME_TCP_EVENTMESH_CLIENT,
RUNTIME_TCP_OPENMESSAGE_CLIENT,

RUNTIME_HTTP_PRODUCER,
RUNTIME_HTTP_CONSUMER,

RUNTIME_GRPC_PRODUCER,
RUNTIME_GRPC_CONSUMER,
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@

package org.apache.eventmesh.dashboard.core.function.SDK.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CreateEtcdConfig implements CreateSDKConfig {

private String etcdServerAddress;

@Builder.Default()
private int connectTime = 10;

@Override
public String getUniqueKey() {
return etcdServerAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,24 @@

package org.apache.eventmesh.dashboard.core.function.SDK.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CreateRedisConfig implements CreateSDKConfig {

private String redisUrl;

private String password;

@Builder.Default
private int timeOut = 10;

@Override
public String getUniqueKey() {
return redisUrl;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.core.function.SDK.config;

import org.apache.eventmesh.common.protocol.tcp.UserAgent;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CreateRuntimeConfig implements CreateSDKConfig {

// 127.0.0.1:10105;127.0.0.2:10105
private String runtimeServerAddress;

// protocol example:HTTP,TCP,GRPC
private String protocol;
// for tcp:cloudevents,eventmeshmessage,openmessage
private String protocolName;

// producer or consumer
private String clientType;

// topic
private String topic;

// config
private String env;
private String idc;
private String ip;
private String sys;
private String pid;
private String username;
private String password;

// producer
private String producerGroup;

// consumer
private String consumerGroup;
private String subUrl;

// Agent
private UserAgent userAgent;

@Override
public String getUniqueKey() {
return runtimeServerAddress;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateEtcdConfig;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateSDKConfig;

import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;

import io.etcd.jetcd.Client;
Expand All @@ -39,6 +40,7 @@ public SimpleEntry<String, KV> createClient(CreateSDKConfig clientConfig) {
try {
final Client client = Client.builder()
.endpoints(getSplitEndpoints(etcdConfig))
.connectTimeout(Duration.ofSeconds(etcdConfig.getConnectTime()))
.build();
kvClient = client.getKVClient();
} catch (EtcdException e) {
Expand All @@ -48,7 +50,7 @@ public SimpleEntry<String, KV> createClient(CreateSDKConfig clientConfig) {
}

private static String[] getSplitEndpoints(CreateEtcdConfig etcdConfig) {
return etcdConfig.getEtcdServerAddress().split(",");
return etcdConfig.getEtcdServerAddress().split(";");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.eventmesh.dashboard.core.function.SDK.wrapper.NacosSDKWrapper;

import java.util.AbstractMap.SimpleEntry;
import java.util.Objects;

import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.naming.NamingService;
Expand All @@ -34,8 +35,8 @@ public class NacosSDKOperation extends AbstractSDKOperation<NacosSDKWrapper> {
@Override
public SimpleEntry<String, NacosSDKWrapper> createClient(CreateSDKConfig createClientConfig) {
SimpleEntry<String, ConfigService> configSimpleEntry = nacosConfigClientCreateOperation.createClient(createClientConfig);
SimpleEntry namingSimpleEntry = nacosNamingClientCreateOperation.createClient(createClientConfig);
if (configSimpleEntry.getKey() != namingSimpleEntry.getKey()) {
SimpleEntry<String, NamingService> namingSimpleEntry = nacosNamingClientCreateOperation.createClient(createClientConfig);
if (!Objects.equals(configSimpleEntry.getKey(), namingSimpleEntry.getKey())) {
throw new RuntimeException("Nacos config and naming server address not match");
}
NacosSDKWrapper nacosClient = new NacosSDKWrapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,28 @@
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateRedisConfig;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateSDKConfig;

import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;

public class RedisSDKOperation extends AbstractSDKOperation<StatefulRedisConnection<String, String>> {

@Override
public SimpleEntry<String, StatefulRedisConnection<String, String>> createClient(CreateSDKConfig clientConfig) {
String redisUrl = ((CreateRedisConfig) clientConfig).getRedisUrl();
RedisClient redisClient = RedisClient.create(redisUrl);
CreateRedisConfig redisConfig = (CreateRedisConfig) clientConfig;
String redisUrl = redisConfig.getRedisUrl();
String clientHost = redisUrl.split(":")[0];
int clientPort = Integer.parseInt(redisUrl.split(":")[1]);
RedisURI redisURI = RedisURI.builder()
.withHost(clientHost)
.withPort(clientPort)
.withPassword(redisConfig.getPassword())
.withTimeout(Duration.ofSeconds(redisConfig.getTimeOut()))
.build();
RedisClient redisClient = RedisClient.create(redisURI);
return new SimpleEntry<>(clientConfig.getUniqueKey(), redisClient.connect());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.core.function.SDK.operation;

import static org.apache.eventmesh.dashboard.core.function.SDK.util.RuntimeSDKOperationUtils.buildEventMeshGrpcConsumerConfig;

import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.dashboard.core.function.SDK.AbstractSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateRuntimeConfig;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateSDKConfig;

import java.util.AbstractMap.SimpleEntry;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RuntimeGrpcConsumerSDKOperation extends AbstractSDKOperation<EventMeshGrpcConsumer> {

@Override
public SimpleEntry<String, EventMeshGrpcConsumer> createClient(CreateSDKConfig clientConfig) {
final CreateRuntimeConfig runtimeConfig = (CreateRuntimeConfig) clientConfig;
final EventMeshGrpcClientConfig grpcClientConfig = buildEventMeshGrpcConsumerConfig(runtimeConfig);
EventMeshGrpcConsumer grpcConsumer = null;
try {
grpcConsumer = new EventMeshGrpcConsumer(grpcClientConfig);
grpcConsumer.init();
} catch (EventMeshException e) {
log.error("create runtime grpc Consumer client failed", e);
}
return new SimpleEntry<>(clientConfig.getUniqueKey(), grpcConsumer);
}

@Override
public void close(Object client) {
castClient(client).close();
}
}
Loading
Loading