Skip to content

Commit

Permalink
runtime tcp http grpc client impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Alonexc committed Apr 9, 2024
1 parent efdae6e commit bd1690b
Show file tree
Hide file tree
Showing 23 changed files with 1,028 additions and 6 deletions.
18 changes: 18 additions & 0 deletions eventmesh-dashboard-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@
<version>0.0.1-SNAPSHOT</version>
</dependency>

<!-- eventmesh -->
<dependency>
<groupId>org.apache.eventmesh</groupId>
<artifactId>eventmesh-sdk-java</artifactId>
<version>1.10.0-release</version>
</dependency>
<dependency>
<groupId>org.apache.eventmesh</groupId>
<artifactId>eventmesh-common</artifactId>
<version>1.10.0-release</version>
</dependency>

<!-- meta -->
<dependency>
<groupId>com.alibaba.nacos</groupId>
Expand All @@ -62,6 +74,12 @@
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.3.0</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Event Store -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,9 @@ public void deleteClient(SDKTypeEnum clientTypeEnum, String uniqueKey) {
public Object getClient(SDKTypeEnum clientTypeEnum, String uniqueKey) {
return this.clientMap.get(clientTypeEnum).get(uniqueKey);
}

// get all client
public Map<String, Object> getClients(SDKTypeEnum clientTypeEnum) {
return this.clientMap.get(clientTypeEnum);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.core.function.SDK.config;

import org.apache.eventmesh.common.protocol.tcp.UserAgent;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CreateRuntimeConfig implements CreateSDKConfig {

// 127.0.0.1:10105;127.0.0.2:10105
private String runtimeServerAddress;

// protocol example:HTTP,TCP,GRPC
private String protocol;
// for tcp:cloudevents,eventmeshmessage,openmessage
private String protocolName;

// producer or consumer
private String clientType;

// topic
private String topic;

// grpc subscriptionItem
// private SubscriptionMode mode;
// private SubscriptionType type;

// config
private String env;
private String idc;
private String ip;
private String sys;
private String pid;
private String username;
private String password;

// producer
private String producerGroup;

// consumer
private String consumerGroup;
private String subUrl;

// Agent
private UserAgent userAgent;

@Override
public String getUniqueKey() {
return runtimeServerAddress;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.eventmesh.dashboard.core.function.SDK.wrapper.NacosSDKWrapper;

import java.util.AbstractMap.SimpleEntry;
import java.util.Objects;

import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.naming.NamingService;
Expand All @@ -34,8 +35,8 @@ public class NacosSDKOperation extends AbstractSDKOperation<NacosSDKWrapper> {
@Override
public SimpleEntry<String, NacosSDKWrapper> createClient(CreateSDKConfig createClientConfig) {
SimpleEntry<String, ConfigService> configSimpleEntry = nacosConfigClientCreateOperation.createClient(createClientConfig);
SimpleEntry namingSimpleEntry = nacosNamingClientCreateOperation.createClient(createClientConfig);
if (configSimpleEntry.getKey() != namingSimpleEntry.getKey()) {
SimpleEntry<String, NamingService> namingSimpleEntry = nacosNamingClientCreateOperation.createClient(createClientConfig);
if (!Objects.equals(configSimpleEntry.getKey(), namingSimpleEntry.getKey())) {
throw new RuntimeException("Nacos config and naming server address not match");
}
NacosSDKWrapper nacosClient = new NacosSDKWrapper(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.core.function.SDK.operation;

import static org.apache.eventmesh.dashboard.core.function.SDK.util.RuntimeSDKOperationUtils.buildEventMeshGrpcConsumerConfig;

import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.dashboard.core.function.SDK.AbstractSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateRuntimeConfig;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateSDKConfig;

import java.util.AbstractMap.SimpleEntry;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RuntimeGrpcConsumerSDKOperation extends AbstractSDKOperation<EventMeshGrpcConsumer> {

@Override
public SimpleEntry<String, EventMeshGrpcConsumer> createClient(CreateSDKConfig clientConfig) {
final CreateRuntimeConfig runtimeConfig = (CreateRuntimeConfig) clientConfig;
final EventMeshGrpcClientConfig grpcClientConfig = buildEventMeshGrpcConsumerConfig(runtimeConfig);
EventMeshGrpcConsumer grpcConsumer = null;
try {
grpcConsumer = new EventMeshGrpcConsumer(grpcClientConfig);
grpcConsumer.init();
} catch (EventMeshException e) {
log.error("create runtime grpc Consumer client failed", e);
}
return new SimpleEntry<>(clientConfig.getUniqueKey(), grpcConsumer);
}

@Override
public void close(Object client) {
castClient(client).close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.core.function.SDK.operation;

import static org.apache.eventmesh.dashboard.core.function.SDK.util.RuntimeSDKOperationUtils.buildEventMeshGrpcProducerConfig;

import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.producer.EventMeshGrpcProducer;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.dashboard.core.function.SDK.AbstractSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateRuntimeConfig;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateSDKConfig;

import java.util.AbstractMap.SimpleEntry;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RuntimeGrpcProducerSDKOperation extends AbstractSDKOperation<EventMeshGrpcProducer> {

@Override
public SimpleEntry<String, EventMeshGrpcProducer> createClient(CreateSDKConfig clientConfig) {
final CreateRuntimeConfig runtimeConfig = (CreateRuntimeConfig) clientConfig;
final EventMeshGrpcClientConfig grpcClientConfig = buildEventMeshGrpcProducerConfig(runtimeConfig);
EventMeshGrpcProducer grpcProducer = null;
try {
grpcProducer = new EventMeshGrpcProducer(grpcClientConfig);
} catch (EventMeshException e) {
log.error("create runtime grpc Producer client failed", e);
}
return new SimpleEntry<>(clientConfig.getUniqueKey(), grpcProducer);
}

@Override
public void close(Object client) {
castClient(client).close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.core.function.SDK.operation;

import static org.apache.eventmesh.dashboard.core.function.SDK.util.RuntimeSDKOperationUtils.buildEventMeshHttpConsumerConfig;

import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
import org.apache.eventmesh.client.http.consumer.EventMeshHttpConsumer;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.dashboard.core.function.SDK.AbstractSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateRuntimeConfig;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateSDKConfig;

import java.util.AbstractMap.SimpleEntry;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RuntimeHttpConsumerSDKOperation extends AbstractSDKOperation<EventMeshHttpConsumer> {

@Override
public SimpleEntry<String, EventMeshHttpConsumer> createClient(CreateSDKConfig clientConfig) {
final CreateRuntimeConfig runtimeConfig = (CreateRuntimeConfig) clientConfig;
final EventMeshHttpClientConfig httpClientConfig = buildEventMeshHttpConsumerConfig(runtimeConfig);
EventMeshHttpConsumer httpConsumer = null;
try {
httpConsumer = new EventMeshHttpConsumer(httpClientConfig);
} catch (EventMeshException e) {
log.error("create runtime http Consumer client failed", e);
}
return new SimpleEntry<>(clientConfig.getUniqueKey(), httpConsumer);
}

@Override
public void close(Object client) {
castClient(client).close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.core.function.SDK.operation;

import static org.apache.eventmesh.dashboard.core.function.SDK.util.RuntimeSDKOperationUtils.buildEventMeshHttpProducerConfig;

import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.dashboard.core.function.SDK.AbstractSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateRuntimeConfig;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateSDKConfig;

import java.util.AbstractMap.SimpleEntry;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RuntimeHttpProducerSDKOperation extends AbstractSDKOperation<EventMeshHttpProducer> {

@Override
public SimpleEntry<String, EventMeshHttpProducer> createClient(CreateSDKConfig clientConfig) {
final CreateRuntimeConfig runtimeConfig = (CreateRuntimeConfig) clientConfig;
final EventMeshHttpClientConfig httpClientConfig = buildEventMeshHttpProducerConfig(runtimeConfig);
EventMeshHttpProducer httpProducer = null;
try {
httpProducer = new EventMeshHttpProducer(httpClientConfig);
} catch (EventMeshException e) {
log.error("create runtime http Producer client failed", e);
}
return new SimpleEntry<>(clientConfig.getUniqueKey(), httpProducer);
}

@Override
public void close(Object client) {
castClient(client).close();
}
}
Loading

0 comments on commit bd1690b

Please sign in to comment.