diff --git a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/AclMetadata.java b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/AclMetadata.java index 8025b589..d3878dea 100644 --- a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/AclMetadata.java +++ b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/AclMetadata.java @@ -30,4 +30,9 @@ public class AclMetadata extends MetadataConfig { private String resourceType; private String resourceName; private Integer patternType; + + @Override + public String getUnique() { + return null; + } } diff --git a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ClientMetadata.java b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ClientMetadata.java index 9b2d1199..15762f1d 100644 --- a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ClientMetadata.java +++ b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ClientMetadata.java @@ -44,4 +44,9 @@ public class ClientMetadata extends MetadataConfig { * protocol used to connect to runtime. */ private String protocol; + + @Override + public String getUnique() { + return host + ":" + port; + } } diff --git a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ClusterMetadata.java b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ClusterMetadata.java index c65e27a8..585b03dc 100644 --- a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ClusterMetadata.java +++ b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ClusterMetadata.java @@ -50,4 +50,9 @@ public class ClusterMetadata extends MetadataConfig { private StoreType storeType; private String description; + + @Override + public String getUnique() { + return clusterName + "/" + registryAddress; + } } diff --git a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ConfigMetadata.java b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ConfigMetadata.java index 11397f6b..c956e142 100644 --- a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ConfigMetadata.java +++ b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ConfigMetadata.java @@ -35,4 +35,9 @@ public class ConfigMetadata extends MetadataConfig { private Integer instanceType; private Long instanceId; + + @Override + public String getUnique() { + return configKey; + } } diff --git a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ConnectionMetadata.java b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ConnectionMetadata.java index 7543f0bd..49856ead 100644 --- a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ConnectionMetadata.java +++ b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ConnectionMetadata.java @@ -66,4 +66,9 @@ public class ConnectionMetadata extends MetadataConfig { private Long groupId; private String description; + + @Override + public String getUnique() { + return getClusterId() + "/" + sourceId + "/" + sinkId + "/" + topic; + } } diff --git a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ConnectorMetadata.java b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ConnectorMetadata.java index d329ef5f..57e2e952 100644 --- a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ConnectorMetadata.java +++ b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/ConnectorMetadata.java @@ -36,4 +36,9 @@ public class ConnectorMetadata extends MetadataConfig { * @see KubernetesPodStatus */ private Integer podState; + + @Override + public String getUnique() { + return host + ":" + port; + } } diff --git a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/GroupMetadata.java b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/GroupMetadata.java index e3df7cf7..92ff3b01 100644 --- a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/GroupMetadata.java +++ b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/GroupMetadata.java @@ -31,4 +31,9 @@ public class GroupMetadata extends MetadataConfig { private Integer type; private String state; + + @Override + public String getUnique() { + return name; + } } diff --git a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/MetadataConfig.java b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/MetadataConfig.java index 880be5fd..cbcdd9a5 100644 --- a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/MetadataConfig.java +++ b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/MetadataConfig.java @@ -17,7 +17,10 @@ package org.apache.eventmesh.dashboard.common.model.metadata; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; /** * This class is used to represent a piece of metadata, which can be used in create, update or delete operations to metadata service(eventmesh meta @@ -25,10 +28,18 @@ * {@code this.setServiceTypeEnums(MetadataServiceTypeEnums.RUNTIME);} */ @Data -public class MetadataConfig { +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder +public abstract class MetadataConfig { //eventmesh registry url private String registryAddress; //cluster id in database private Long clusterId; + + /** + * @return A string that is unique to the source, usually a url + */ + public abstract String getUnique(); } diff --git a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/RegistryMetadata.java b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/RegistryMetadata.java index 56ea68a6..0ed58ebb 100644 --- a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/RegistryMetadata.java +++ b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/RegistryMetadata.java @@ -20,8 +20,12 @@ import org.apache.eventmesh.dashboard.common.enums.RecordStatus; import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data +@SuperBuilder +@NoArgsConstructor public class RegistryMetadata extends MetadataConfig { @@ -49,4 +53,9 @@ public class RegistryMetadata extends MetadataConfig { * @see RecordStatus */ private Integer status; + + @Override + public String getUnique() { + return host + ":" + port; + } } diff --git a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/RuntimeMetadata.java b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/RuntimeMetadata.java index e8d1432a..e0467514 100644 --- a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/RuntimeMetadata.java +++ b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/RuntimeMetadata.java @@ -18,14 +18,16 @@ package org.apache.eventmesh.dashboard.common.model.metadata; import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data @NoArgsConstructor @AllArgsConstructor -@Builder +@SuperBuilder +@EqualsAndHashCode(callSuper = true) public class RuntimeMetadata extends MetadataConfig { private String host; @@ -40,7 +42,12 @@ public class RuntimeMetadata extends MetadataConfig { private Long storageClusterId; - private Long startTimeStamp; + private Long startTimestamp; private String clusterName; + + @Override + public String getUnique() { + return host + ":" + port; + } } diff --git a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/TopicMetadata.java b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/TopicMetadata.java index 829ed224..20df8817 100644 --- a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/TopicMetadata.java +++ b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/metadata/TopicMetadata.java @@ -20,8 +20,12 @@ import org.apache.eventmesh.dashboard.common.enums.StoreType; import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data +@SuperBuilder +@NoArgsConstructor public class TopicMetadata extends MetadataConfig { private StoreType storeType; @@ -42,4 +46,9 @@ public class TopicMetadata extends MetadataConfig { private Integer type; private String description; + + @Override + public String getUnique() { + return topicName; + } } diff --git a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/remoting/runtime/GetRuntimeResponse.java b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/remoting/runtime/GetRuntimeResponse.java index 783a78ab..483a609f 100644 --- a/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/remoting/runtime/GetRuntimeResponse.java +++ b/eventmesh-dashboard-common/src/main/java/org/apache/eventmesh/dashboard/common/model/remoting/runtime/GetRuntimeResponse.java @@ -21,9 +21,13 @@ import java.util.List; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; @Data +@NoArgsConstructor +@AllArgsConstructor public class GetRuntimeResponse { private List runtimeMetadataList; diff --git a/eventmesh-dashboard-console/pom.xml b/eventmesh-dashboard-console/pom.xml index 94cb0fee..c5fb5ee9 100644 --- a/eventmesh-dashboard-console/pom.xml +++ b/eventmesh-dashboard-console/pom.xml @@ -153,6 +153,7 @@ **/org/apache/eventmesh/dashboard/console/integration/**/*.java + **/org/apache/eventmesh/dashboard/console/EventMeshDashboardApplicationTest.java diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/cache/ClusterCache.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/cache/ClusterCache.java new file mode 100644 index 00000000..bde19f9b --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/cache/ClusterCache.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.cache; + + +import org.apache.eventmesh.dashboard.console.entity.cluster.ClusterEntity; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import lombok.Getter; + +public class ClusterCache { + + @Getter + private static final ClusterCache INSTANCE = new ClusterCache(); + + //cluster name + private HashMap clusterNameMap = new HashMap<>(); + + private HashMap clusterIdMap = new HashMap<>(); + + private static final Object lock = new Object(); + + public ClusterEntity getClusterById(Long id) { + return INSTANCE.clusterIdMap.get(id); + } + + public ClusterEntity getClusterByName(String name) { + return INSTANCE.clusterNameMap.get(name); + } + + public ClusterEntity getClusterByRegistryAddress(String registryAddress) { + for (ClusterEntity clusterEntity : INSTANCE.clusterIdMap.values()) { + if (clusterEntity.getRegistryAddress().equals(registryAddress)) { + return clusterEntity; + } + } + return null; + } + + public List getClusters() { + return new ArrayList<>(INSTANCE.clusterIdMap.values()); + } + + public void addCluster(ClusterEntity clusterEntity) { + synchronized (lock) { + if (INSTANCE.clusterIdMap.containsKey(clusterEntity.getId()) + || INSTANCE.clusterNameMap.containsKey(clusterEntity.getName())) { + return; + } + INSTANCE.clusterIdMap.put(clusterEntity.getId(), clusterEntity); + INSTANCE.clusterNameMap.put(clusterEntity.getName(), clusterEntity); + } + } + + public void deleteClusterById(Long id) { + synchronized (lock) { + ClusterEntity clusterEntity = INSTANCE.clusterIdMap.get(id); + INSTANCE.clusterIdMap.remove(id); + INSTANCE.clusterNameMap.remove(clusterEntity.getName()); + } + } + + public void deleteClusterByName(String name) { + synchronized (lock) { + ClusterEntity clusterEntity = INSTANCE.clusterNameMap.get(name); + INSTANCE.clusterNameMap.remove(name); + INSTANCE.clusterIdMap.remove(clusterEntity.getId()); + } + } + + public void syncClusters(List clusters) { + synchronized (lock) { + INSTANCE.clusterIdMap.clear(); + INSTANCE.clusterNameMap.clear(); + for (ClusterEntity clusterEntity : clusters) { + INSTANCE.clusterIdMap.put(clusterEntity.getId(), clusterEntity); + INSTANCE.clusterNameMap.put(clusterEntity.getName(), clusterEntity); + } + } + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/cache/RuntimeCache.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/cache/RuntimeCache.java new file mode 100644 index 00000000..d179847f --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/cache/RuntimeCache.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.cache; + +import org.apache.eventmesh.dashboard.console.entity.runtime.RuntimeEntity; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class RuntimeCache { + + private static final RuntimeCache INSTANCE = new RuntimeCache(); + + public static final RuntimeCache getInstance() { + return INSTANCE; + } + + // ip:port -> runtime + private Map runtimeMap = new ConcurrentHashMap<>(); + + private RuntimeCache() { + } + + public void addRuntime(RuntimeEntity runtimeEntity) { + runtimeMap.put(runtimeEntity.getHost() + ":" + runtimeEntity.getPort(), runtimeEntity); + } + + public Collection getRuntimeList() { + return runtimeMap.values(); + } + + public void deleteRuntime(RuntimeEntity runtimeEntity) { + runtimeMap.remove(runtimeEntity.getHost() + ":" + runtimeEntity.getPort()); + } + + public void replaceAllRuntime(List runtimeEntities) { + Map newRuntimeList = new ConcurrentHashMap<>(); + runtimeEntities.forEach(runtimeEntity -> { + newRuntimeList.put(runtimeEntity.getHost() + ":" + runtimeEntity.getPort(), runtimeEntity); + }); + runtimeMap = newRuntimeList; + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/FunctionManagerConfigs.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/FunctionManagerConfigs.java new file mode 100644 index 00000000..3b2eb777 --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/FunctionManagerConfigs.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import lombok.Data; + +@Data +@Component +@ConfigurationProperties(prefix = "function") +public class FunctionManagerConfigs { + + private HealthCheck healthCheck = new HealthCheck(); + private Sync sync = new Sync(); + + @Data + public static class HealthCheck { + + private DoCheck doCheck = new DoCheck(); + private UpdateConfig updateConfig = new UpdateConfig(); + + @Data + public static class DoCheck { + + private int initialDelay = 60; + private int period = 60; + } + + @Data + public static class UpdateConfig { + + private int initialDelay = 30; + private int period = 60; + } + } + + @Data + public static class Sync { + + private boolean enable; + private ToDb toDb = new ToDb(); + + private int initialDelay = 120; + private int period = 60; + + @Data + public static class ToDb { + + private Boolean runtime; + } + } +} \ No newline at end of file diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/acl/AclEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/acl/AclEntity.java index 78b7d7da..18ca26c7 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/acl/AclEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/acl/AclEntity.java @@ -23,11 +23,13 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data @AllArgsConstructor @NoArgsConstructor @EqualsAndHashCode(callSuper = true, exclude = "status") +@SuperBuilder public class AclEntity extends BaseEntity { private static final long serialVersionUID = 6057071983428111947L; diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/base/BaseEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/base/BaseEntity.java index 5d0984d6..c7022f34 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/base/BaseEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/base/BaseEntity.java @@ -22,13 +22,21 @@ import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; /** * Base Entity provide some basic fields that every Eventmesh Dashboard Entity would have */ @Data +@AllArgsConstructor +@NoArgsConstructor +@EqualsAndHashCode(callSuper = false, exclude = {"createTime", "updateTime"}) @Schema(name = "BaseEntity", description = "Base entity") +@SuperBuilder public class BaseEntity implements Serializable { private static final long serialVersionUID = -2697805837923579585L; diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/client/ClientEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/client/ClientEntity.java index e6cec83c..61bc823f 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/client/ClientEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/client/ClientEntity.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.dashboard.console.entity.client; import org.apache.eventmesh.dashboard.common.enums.RecordStatus; +import org.apache.eventmesh.dashboard.common.model.metadata.ClientMetadata; import org.apache.eventmesh.dashboard.console.entity.base.BaseEntity; import java.sql.Timestamp; @@ -26,11 +27,15 @@ import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data @NoArgsConstructor @AllArgsConstructor +@EqualsAndHashCode(callSuper = true, exclude = "status") +@SuperBuilder public class ClientEntity extends BaseEntity { private static final long serialVersionUID = 8204133370609215856L; @@ -69,15 +74,15 @@ public class ClientEntity extends BaseEntity { /** * 0: not active, 1: active + * * @see RecordStatus */ @Schema(name = "status", defaultValue = "0", allowableValues = {"0", "1"}, description = "0:not active, 1:active") private Integer status; /** - * csv format config id list.
- * Example value: 1,2,7
- * This field is updated when the configuration is modified via the web API, but is not used during the configuration retrieval process. + * csv format config id list.
Example value: 1,2,7
This field is updated when the configuration is modified via the web API, but is not + * used during the configuration retrieval process. */ private String configIds; @@ -91,5 +96,19 @@ public class ClientEntity extends BaseEntity { public void setStatusEntity(RecordStatus status) { this.status = status.getNumber(); } + + public ClientEntity(ClientMetadata source) { + setName(source.getName()); + setPlatform(source.getPlatform()); + setLanguage(source.getLanguage()); + setPid(source.getPid()); + setHost(source.getHost()); + setPort(source.getPort()); + setClusterId(source.getClusterId()); + setProtocol(source.getProtocol()); + setDescription(""); + setConfigIds(""); + setStatus(1); + } } \ No newline at end of file diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/cluster/ClusterEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/cluster/ClusterEntity.java index eafbd2f1..e0992db4 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/cluster/ClusterEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/cluster/ClusterEntity.java @@ -17,24 +17,25 @@ package org.apache.eventmesh.dashboard.console.entity.cluster; +import org.apache.eventmesh.dashboard.common.model.metadata.ClusterMetadata; import org.apache.eventmesh.dashboard.console.entity.base.BaseEntity; -import java.sql.Timestamp; - import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data @NoArgsConstructor @AllArgsConstructor +@SuperBuilder +@EqualsAndHashCode(callSuper = true, exclude = "status") public class ClusterEntity extends BaseEntity { - private Long id; - private String name; - private String registryNameList; + private String registryAddress; private String bootstrapServers; @@ -54,12 +55,24 @@ public class ClusterEntity extends BaseEntity { private Integer status; - private Timestamp createTime; - - private Timestamp updateTime; - - /** - * @See StoreType - */ private Integer storeType; + + public ClusterEntity(ClusterMetadata source) { + if (source.getClusterName() != null && !source.getClusterName().isEmpty()) { + setAuthType(source.getAuthType()); + setBootstrapServers(source.getBootstrapServers()); + setClientProperties(source.getClientProperties()); + setRegistryAddress(source.getRegistryAddress()); + setEventmeshVersion(source.getEventmeshVersion()); + setJmxProperties(source.getJmxProperties()); + setRegProperties(source.getRegProperties()); + setDescription(source.getDescription()); + setAuthType(source.getAuthType()); + setRunState(source.getRunState()); + setStoreType(source.getStoreType().getNumber()); + setName(source.getClusterName()); + } else { + throw new RuntimeException("cluster name is empty"); + } + } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/config/ConfigEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/config/ConfigEntity.java index 40e766e8..ed612fab 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/config/ConfigEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/config/ConfigEntity.java @@ -17,17 +17,22 @@ package org.apache.eventmesh.dashboard.console.entity.config; +import org.apache.eventmesh.dashboard.common.model.metadata.ConfigMetadata; import org.apache.eventmesh.dashboard.console.entity.base.BaseEntity; import java.sql.Timestamp; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +@Data +@EqualsAndHashCode(callSuper = true) @NoArgsConstructor @AllArgsConstructor -@Data +@SuperBuilder public class ConfigEntity extends BaseEntity { private Long id; @@ -36,6 +41,9 @@ public class ConfigEntity extends BaseEntity { private String businessType; + /** + * config type 0:runtime,1:storage,2:connector,3:topic + */ private Integer instanceType; private Long instanceId; @@ -106,4 +114,22 @@ public boolean dayu(String eventmeshVersion) { } return flag; } + + public ConfigEntity(ConfigMetadata source) { + setConfigName(source.getConfigKey()); + setConfigValue(source.getConfigValue()); + setClusterId(source.getClusterId()); + setEdit(1); + setBusinessType(""); + setInstanceId(source.getInstanceId()); + setDescription(""); + setInstanceType(source.getInstanceType()); + setIsDefault(0); + setStartVersion(""); + setEndVersion(""); + setEventmeshVersion(""); + setDiffType(0); + setIsModify(0); + setStatus(1); + } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/connection/ConnectionEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/connection/ConnectionEntity.java index 8283cbbb..613f33c6 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/connection/ConnectionEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/connection/ConnectionEntity.java @@ -18,16 +18,18 @@ package org.apache.eventmesh.dashboard.console.entity.connection; import org.apache.eventmesh.dashboard.common.enums.RecordStatus; +import org.apache.eventmesh.dashboard.common.model.metadata.ConnectionMetadata; import org.apache.eventmesh.dashboard.console.entity.base.BaseEntity; import java.sql.Timestamp; -import java.util.Objects; import io.swagger.v3.oas.annotations.media.Schema; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; /** @@ -36,6 +38,8 @@ @Data @NoArgsConstructor @AllArgsConstructor +@EqualsAndHashCode(callSuper = true, exclude = {"status", "endTime"}) +@SuperBuilder public class ConnectionEntity extends BaseEntity { private static final long serialVersionUID = 6565578252656944905L; @@ -83,28 +87,20 @@ public class ConnectionEntity extends BaseEntity { private String description; - public void setDataStatus(RecordStatus dataStatus) { - this.status = dataStatus.getNumber(); + public void setStatusEnum(RecordStatus statusEnum) { + this.status = statusEnum.getNumber(); } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ConnectionEntity that = (ConnectionEntity) o; - return Objects.equals(sourceType, that.sourceType) - && Objects.equals(sourceId, that.sourceId) - - && Objects.equals(sinkType, that.sinkType) - && Objects.equals(sinkId, that.sinkId) - - && Objects.equals(runtimeId, that.runtimeId) - && Objects.equals(status, that.status) - - && Objects.equals(description, that.description); + public ConnectionEntity(ConnectionMetadata source) { + setClusterId(source.getClusterId()); + setSourceId(source.getSourceId()); + setSourceType(source.getSourceType()); + setSinkId(source.getSinkId()); + setSinkType(source.getSinkType()); + setRuntimeId(source.getRuntimeId()); + setStatus(1); + setTopic(source.getTopic()); + setGroupId(source.getGroupId()); + setDescription(source.getDescription()); } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/connector/ConnectorEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/connector/ConnectorEntity.java index 0b888be7..64655726 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/connector/ConnectorEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/connector/ConnectorEntity.java @@ -25,11 +25,15 @@ import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data @NoArgsConstructor @AllArgsConstructor +@EqualsAndHashCode(callSuper = true, exclude = "status") +@SuperBuilder public class ConnectorEntity extends BaseEntity { private static final long serialVersionUID = -8226303660232951326L; @@ -67,11 +71,11 @@ public class ConnectorEntity extends BaseEntity { */ private String configIds; - public void setDataStatus(RecordStatus dataStatus) { - this.status = dataStatus.getNumber(); + public void setStatusEnum(RecordStatus statusEnum) { + this.status = statusEnum.getNumber(); } - public void setKubernetesPodDataStatus(KubernetesPodStatus kubernetesPodDataStatus) { - this.podState = kubernetesPodDataStatus.getNumber(); + public void setKubernetesPodStatusEnum(KubernetesPodStatus kubernetesPodStatusEnum) { + this.podState = kubernetesPodStatusEnum.getNumber(); } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/group/GroupEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/group/GroupEntity.java index 52d15d20..8e255651 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/group/GroupEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/group/GroupEntity.java @@ -17,17 +17,23 @@ package org.apache.eventmesh.dashboard.console.entity.group; +import org.apache.eventmesh.dashboard.common.enums.RecordStatus; +import org.apache.eventmesh.dashboard.common.model.metadata.GroupMetadata; import org.apache.eventmesh.dashboard.console.entity.base.BaseEntity; import java.sql.Timestamp; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @NoArgsConstructor @AllArgsConstructor @Data +@SuperBuilder +@EqualsAndHashCode(callSuper = true, exclude = "status") public class GroupEntity extends BaseEntity { private Long id; @@ -50,4 +56,13 @@ public class GroupEntity extends BaseEntity { private Integer status; + public GroupEntity(GroupMetadata source) { + setClusterId(source.getClusterId()); + setName(source.getName()); + setMembers(source.getMembers()); + setType(source.getType()); + setState(source.getState()); + setStatus(RecordStatus.ACTIVE.getNumber()); + setMemberCount(source.getMemberCount()); + } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/groupmember/GroupMemberEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/groupmember/GroupMemberEntity.java index 9b56cbe1..9e2f6011 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/groupmember/GroupMemberEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/groupmember/GroupMemberEntity.java @@ -23,11 +23,15 @@ import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data @NoArgsConstructor @AllArgsConstructor +@EqualsAndHashCode(callSuper = true, exclude = "status") +@SuperBuilder public class GroupMemberEntity extends BaseEntity { private Long id; diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/health/HealthCheckResultEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/health/HealthCheckResultEntity.java index f2b65ad3..017a9470 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/health/HealthCheckResultEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/health/HealthCheckResultEntity.java @@ -23,11 +23,15 @@ import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data +@SuperBuilder @AllArgsConstructor @NoArgsConstructor +@EqualsAndHashCode(callSuper = true, exclude = "resultDesc") @Schema(name = "HealthCheckResultEntity", description = "Health check result entity") public class HealthCheckResultEntity extends BaseEntity { @@ -44,7 +48,7 @@ public class HealthCheckResultEntity extends BaseEntity { private String resultDesc; - @Schema(description = "state of a health check, 0: failed, 1: passed, 2: doing check, 3: out of time") + @Schema(description = "state of a health check, 0: failed, 1: passed, 2: doing check, 3: out of time, 4: not connected") private Integer state; } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/instanceuser/InstanceUserEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/instanceuser/InstanceUserEntity.java index 8ca85b42..bce60c15 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/instanceuser/InstanceUserEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/instanceuser/InstanceUserEntity.java @@ -23,12 +23,14 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data @AllArgsConstructor @NoArgsConstructor @EqualsAndHashCode(callSuper = true, exclude = "status") +@SuperBuilder public class InstanceUserEntity extends BaseEntity { private Integer instanceType; diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/log/LogEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/log/LogEntity.java index f2483f5f..48c0efbe 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/log/LogEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/log/LogEntity.java @@ -23,11 +23,13 @@ import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor +@EqualsAndHashCode(callSuper = true, exclude = {"endTime", "operationUser", "result"}) public class LogEntity extends BaseEntity { private Long id; diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/meta/MetaEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/meta/MetaEntity.java index fb004425..60ef0eb1 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/meta/MetaEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/meta/MetaEntity.java @@ -18,17 +18,22 @@ package org.apache.eventmesh.dashboard.console.entity.meta; import org.apache.eventmesh.dashboard.common.enums.RecordStatus; +import org.apache.eventmesh.dashboard.common.model.metadata.RegistryMetadata; import org.apache.eventmesh.dashboard.console.entity.base.BaseEntity; import io.swagger.v3.oas.annotations.media.Schema; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data @NoArgsConstructor @AllArgsConstructor +@EqualsAndHashCode(callSuper = true, exclude = "status") +@SuperBuilder public class MetaEntity extends BaseEntity { private static final long serialVersionUID = 7176263169716424469L; @@ -59,7 +64,20 @@ public class MetaEntity extends BaseEntity { @Schema(name = "status", defaultValue = "0", allowableValues = {"0", "1"}, description = "0:inactive, 1:active") private Integer status; - public void setDataStatus(RecordStatus dataStatus) { - this.status = dataStatus.getNumber(); + public void setStatusEnum(RecordStatus statusEnum) { + this.status = statusEnum.getNumber(); + } + + public MetaEntity(RegistryMetadata source) { + setHost(source.getHost()); + setPort(source.getPort()); + setClusterId(source.getClusterId()); + setName(source.getName()); + setVersion(source.getVersion()); + setParams(source.getParams()); + setRole(source.getRole()); + setStatus(RecordStatus.ACTIVE.getNumber()); + setType(source.getType()); + setUsername(source.getUsername()); } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/runtime/RuntimeEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/runtime/RuntimeEntity.java index c4a7be5e..be853792 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/runtime/RuntimeEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/runtime/RuntimeEntity.java @@ -17,22 +17,23 @@ package org.apache.eventmesh.dashboard.console.entity.runtime; +import org.apache.eventmesh.dashboard.common.model.metadata.RuntimeMetadata; import org.apache.eventmesh.dashboard.console.entity.base.BaseEntity; import java.sql.Timestamp; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data @NoArgsConstructor @AllArgsConstructor - +@EqualsAndHashCode(callSuper = true, exclude = "status") +@SuperBuilder public class RuntimeEntity extends BaseEntity { - - private Long id; - private Long clusterId; private String host; @@ -54,4 +55,16 @@ public class RuntimeEntity extends BaseEntity { private Timestamp updateTime; private String endpointMap; + + public RuntimeEntity(RuntimeMetadata source) { + setHost(source.getHost()); + setPort(source.getPort()); + setClusterId(source.getClusterId()); + setStatus(1); + setEndpointMap(source.getEndpointMap()); + setJmxPort(source.getJmxPort()); + setRack(source.getRack()); + setStorageClusterId(source.getStorageClusterId()); + setStartTimestamp(source.getStartTimestamp()); + } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/storage/StoreEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/storage/StoreEntity.java index 78156f50..ba0781e0 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/storage/StoreEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/storage/StoreEntity.java @@ -24,20 +24,23 @@ import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data @NoArgsConstructor @AllArgsConstructor +@EqualsAndHashCode(callSuper = true, exclude = "status") +@SuperBuilder public class StoreEntity extends BaseEntity { - private Long id; - private Long clusterId; - private Integer storeId; - - private String storeType; + /** + * @see org.apache.eventmesh.dashboard.common.enums.StoreType + */ + private Integer storeType; private String host; diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/topic/TopicEntity.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/topic/TopicEntity.java index 85ae80e6..531e63ac 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/topic/TopicEntity.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/entity/topic/TopicEntity.java @@ -17,17 +17,24 @@ package org.apache.eventmesh.dashboard.console.entity.topic; +import org.apache.eventmesh.dashboard.common.model.metadata.TopicMetadata; import org.apache.eventmesh.dashboard.console.entity.base.BaseEntity; import java.sql.Timestamp; +import io.swagger.v3.oas.annotations.media.Schema; + import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data @NoArgsConstructor @AllArgsConstructor +@EqualsAndHashCode(callSuper = true, exclude = "status") +@SuperBuilder public class TopicEntity extends BaseEntity { private Long id; @@ -36,10 +43,15 @@ public class TopicEntity extends BaseEntity { private String topicName; - private String storageId; + private Long storageId; + @Schema(description = "time to live in milliseconds, -2 unknown, -1 no limit;", example = "1000") private Long retentionMs; + /** + * topic type, 0: normal, 1: EventMesh internal; + */ + @Schema(description = "topic type, 0: normal, 1: EventMesh internal;", example = "0") private Integer type; private String description; @@ -51,4 +63,15 @@ public class TopicEntity extends BaseEntity { private Integer status; private Integer createProgress; + + public TopicEntity(TopicMetadata source) { + setClusterId(source.getClusterId()); + setTopicName(source.getTopicName()); + setStorageId(source.getStorageId()); + setRetentionMs(source.getRetentionMs()); + setType(source.getType()); + setDescription(source.getDescription()); + setStatus(1); + setCreateProgress(1); + } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/CheckResultCache.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/CheckResultCache.java index 132e46b5..69358641 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/CheckResultCache.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/CheckResultCache.java @@ -33,15 +33,20 @@ public class CheckResultCache { - private static final HashMap> cacheMap = new HashMap<>(); + @Getter + private static final CheckResultCache INSTANCE = new CheckResultCache(); + + private final HashMap> cacheMap = new HashMap<>(); - public static Integer getLastHealthyCheckResult(String type, Long typeId) { + public Integer getLastHealthyCheckResult(String type, Long typeId) { if (!Objects.isNull(cacheMap.get(type)) && !Objects.isNull(cacheMap.get(type).get(typeId))) { return cacheMap.get(type).get(typeId).getStatus().getNumber(); } return HealthCheckStatus.CHECKING.getNumber(); } + private CheckResultCache() { + } public void update(String type, Long typeId, HealthCheckStatus status, String resultDesc, Long latency) { HashMap subMap = cacheMap.get(type); diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/HealthExecutor.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/HealthExecutor.java index 46cecbc1..08d46daa 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/HealthExecutor.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/HealthExecutor.java @@ -93,7 +93,7 @@ public void onFail(Exception e) { } /** - * this function should be called before any actual execute behaviour.
It will check the execution result of the last check cycle in the + * this function should be called before any actual execute behaviour.

It will check the execution result of the last check cycle in the * memory cache, set tasks that haven't finished status to time out and update the database. */ public void startExecute() { @@ -112,7 +112,7 @@ public void startExecute() { } /** - * this function should be called after all actual execute behaviour.
It will insert the result of this check cycle into the database. At this + * this function should be called after all actual execute behaviour.

It will insert the result of this check cycle into the database. At this * point the status of the tasks may be CHECKING, they will be updated on the next startExecute. */ public void endExecute() { @@ -133,6 +133,7 @@ public void endExecute() { */ private void addToResultList(CheckResult result, ArrayList resultList) { HealthCheckResultEntity newEntity = new HealthCheckResultEntity(); + newEntity.setClusterId(result.getConfig().getClusterId()); newEntity.setType(HealthCheckType.toNumber(result.getConfig().getHealthCheckResourceType())); newEntity.setTypeId(result.getConfig().getInstanceId()); diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/HealthService.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/HealthService.java index b07fd64b..17e17b86 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/HealthService.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/HealthService.java @@ -17,15 +17,22 @@ package org.apache.eventmesh.dashboard.console.function.health; +import org.apache.eventmesh.dashboard.common.constant.health.HealthCheckTypeConstant; +import org.apache.eventmesh.dashboard.common.enums.StoreType; +import org.apache.eventmesh.dashboard.console.entity.health.HealthCheckResultEntity; +import org.apache.eventmesh.dashboard.console.entity.storage.StoreEntity; import org.apache.eventmesh.dashboard.console.function.health.CheckResultCache.CheckResult; import org.apache.eventmesh.dashboard.console.function.health.annotation.HealthCheckType; import org.apache.eventmesh.dashboard.console.function.health.check.AbstractHealthCheckService; import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; import org.apache.eventmesh.dashboard.console.function.health.check.impl.storage.RedisCheck; +import org.apache.eventmesh.dashboard.console.function.health.check.impl.storage.rocketmq4.Rocketmq4BrokerCheck; +import org.apache.eventmesh.dashboard.console.function.health.check.impl.storage.rocketmq4.Rocketmq4NameServerCheck; +import org.apache.eventmesh.dashboard.console.service.DataServiceWrapper; import org.apache.eventmesh.dashboard.console.service.health.HealthDataService; import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,12 +42,14 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import javax.validation.constraints.NotNull; + import lombok.extern.slf4j.Slf4j; /** - * HealthService is the manager of all health check services. It is responsible for creating, deleting and executing health check services.
In - * this class there is a {@link HealthExecutor} which is used to execute health check services, and also a map to store all health check services. - * when the function executeAll is called, health check service will be executed by {@link HealthExecutor}. + * HealthService is the manager of all health check services. It is responsible for creating, deleting and executing health check services.

In this + * class there is a {@link HealthExecutor} which is used to execute health check services, and also a map to store all health check services. when the + * function executeAll is called, health check service will be executed by {@link HealthExecutor}. */ @Slf4j public class HealthService { @@ -48,7 +57,7 @@ public class HealthService { private HealthExecutor healthExecutor; /** - * class cache used to build healthCheckService instance.
key: HealthCheckObjectConfig.SimpleClassName value: HealthCheckService + * class cache used to build healthCheckService instance.

key: HealthCheckObjectConfig.SimpleClassName value: HealthCheckService * * @see HealthCheckObjectConfig */ @@ -56,6 +65,8 @@ public class HealthService { static { setClassCache(RedisCheck.class); + setClassCache(Rocketmq4BrokerCheck.class); + setClassCache(Rocketmq4NameServerCheck.class); } private static void setClassCache(Class clazz) { @@ -63,7 +74,7 @@ private static void setClassCache(Class clazz) { } /** - * This map is used to store HealthExecutor.
Outside key is Type(runtime, storage etc.), inside key is the id of type instance(runtimeId, + * This map is used to store HealthExecutor.

Outside key is Type(runtime, storage etc.), inside key is the id of type instance(runtimeId, * storageId etc.). * * @see AbstractHealthCheckService @@ -98,22 +109,23 @@ public void insertCheckService(HealthCheckObjectConfig config) { for (Entry> entry : HEALTH_CHECK_CLASS_CACHE.entrySet()) { Class clazz = entry.getValue(); HealthCheckType annotation = clazz.getAnnotation(HealthCheckType.class); - if (annotation != null && annotation.type().equals(config.getHealthCheckResourceType()) && annotation.subType() + if (Objects.isNull(annotation)) { + continue; + } + if (annotation.type().equals(config.getHealthCheckResourceType()) && annotation.subType() .equals(config.getHealthCheckResourceSubType())) { healthCheckService = createCheckService(clazz, config); - break; } } } + // if all above creation method failed + if (Objects.isNull(healthCheckService)) { + throw new RuntimeException("No construct method of Health Check Service is found, config is {}" + config); + } + insertCheckService(healthCheckService); } catch (Exception e) { log.error("create healthCheckService failed, healthCheckObjectConfig:{}", config, e); } - - // if all above creation method failed - if (Objects.isNull(healthCheckService)) { - throw new RuntimeException("No construct method of Health Check Service is found, config is {}" + config); - } - insertCheckService(healthCheckService); } public void insertCheckService(AbstractHealthCheckService checkService) { @@ -134,6 +146,10 @@ public void deleteCheckService(String resourceType, Long resourceId) { } } + public void replaceCheckService(List configList) { + checkServiceMap.clear(); + insertCheckService(configList); + } public void createExecutor(HealthDataService dataService, CheckResultCache cache) { healthExecutor = new HealthExecutor(); @@ -142,21 +158,30 @@ public void createExecutor(HealthDataService dataService, CheckResultCache cache } public void executeAll() { - healthExecutor.startExecute(); + try { - checkServiceMap.forEach((type, subMap) -> { - subMap.forEach((typeId, healthCheckService) -> { - healthExecutor.execute(healthCheckService); + healthExecutor.startExecute(); + + checkServiceMap.forEach((type, subMap) -> { + subMap.forEach((typeId, healthCheckService) -> { + healthExecutor.execute(healthCheckService); + }); }); - }); + } catch (Exception e) { + log.error("execute health check failed", e); + } healthExecutor.endExecute(); } - private AbstractHealthCheckService createCheckService(Class clazz, HealthCheckObjectConfig config) - throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { - Constructor constructor = clazz.getConstructor(HealthCheckObjectConfig.class); - return (AbstractHealthCheckService) constructor.newInstance(config); + @NotNull + private AbstractHealthCheckService createCheckService(Class clazz, HealthCheckObjectConfig config) { + try { + Constructor constructor = clazz.getConstructor(HealthCheckObjectConfig.class); + return (AbstractHealthCheckService) constructor.newInstance(config); + } catch (Exception e) { + throw new RuntimeException("createCheckService failed", e); + } } /** @@ -165,16 +190,104 @@ private AbstractHealthCheckService createCheckService(Class clazz, HealthChec * @param initialDelay unit is second * @param period unit is second */ - public void startScheduledExecution(long initialDelay, long period) { + public void startScheduledExecution(long initialDelay, int period) { if (scheduledExecutor == null) { - scheduledExecutor = new ScheduledThreadPoolExecutor(1); + scheduledExecutor = new ScheduledThreadPoolExecutor(2); } scheduledExecutor.scheduleAtFixedRate(this::executeAll, initialDelay, period, TimeUnit.SECONDS); } + public void startScheduledUpdateConfig(int initialDelay, int period, DataServiceWrapper dataServiceWrapper) { + if (scheduledExecutor == null) { + scheduledExecutor = new ScheduledThreadPoolExecutor(2); + } + scheduledExecutor.scheduleAtFixedRate(() -> this.updateHealthCheckConfigs(dataServiceWrapper), initialDelay, + period, TimeUnit.SECONDS); + } + public void stopScheduledExecution() { if (scheduledExecutor != null) { scheduledExecutor.shutdown(); } } + + public void updateHealthCheckConfigs(DataServiceWrapper dataServiceWrapper) { + try { + List checkConfigs = new ArrayList<>(); + List checkResultEntities = new ArrayList<>(); + //TODO add health check service, only storage check is usable for now + + // List clusters = properties.getDataServiceContainer().getClusterDataService().selectAll(); + // for (ClusterEntity cluster : clusters) { + // checkConfigs.add(HealthCheckObjectConfig.builder() + // .instanceId(cluster.getId()) + // .healthCheckResourceType(HealthCheckTypeConstant.HEALTH_CHECK_TYPE_CLUSTER) + // .connectUrl(cluster.getRegistryAddress()) + // .build()); + // checkResultEntities.add(HealthCheckResultEntity.builder() + // .clusterId(cluster.getId()) + // .type(1) + // .typeId(cluster.getId()) + // .state(4) + // .resultDesc("initializing check client") + // .build()); + // } + // + // List runtimes = properties.getDataServiceContainer().getRuntimeDataService().selectAll(); + // for (RuntimeEntity runtime : runtimes) { + // checkConfigs.add(HealthCheckObjectConfig.builder() + // .instanceId(runtime.getId()) + // .healthCheckResourceType(HealthCheckTypeConstant.HEALTH_CHECK_TYPE_RUNTIME) + // .connectUrl(runtime.getHost() + ":" + runtime.getPort()) + // .build()); + // checkResultEntities.add(HealthCheckResultEntity.builder() + // .clusterId(runtime.getClusterId()) + // .type(2) + // .typeId(runtime.getId()) + // .state(4) + // .resultDesc("initializing check client") + // .build()); + // } + // + // List topics = properties.getDataServiceContainer().getTopicDataService().selectAll(); + // for (TopicEntity topic : topics) { + // checkConfigs.add(HealthCheckObjectConfig.builder() + // .instanceId(topic.getId()) + // .healthCheckResourceType(HealthCheckTypeConstant.HEALTH_CHECK_TYPE_TOPIC) + // .build()); + // checkResultEntities.add(HealthCheckResultEntity.builder() + // .clusterId(topic.getClusterId()) + // .type(3) + // .typeId(topic.getId()) + // .state(4) + // .resultDesc("initializing check client") + // .build()); + // } + + List stores = dataServiceWrapper.getStoreDataService().selectAll(); + for (StoreEntity store : stores) { + checkConfigs.add(HealthCheckObjectConfig.builder() + .instanceId(store.getId()) + .clusterId(store.getClusterId()) + .healthCheckResourceType(HealthCheckTypeConstant.HEALTH_CHECK_TYPE_STORAGE) + .healthCheckResourceSubType( + StoreType.fromNumber(store.getStoreType()).toString()) + .host(store.getHost()) + .port(store.getPort()) + .build()); + checkResultEntities.add(HealthCheckResultEntity.builder() + .clusterId(store.getClusterId()) + .type(4) + .typeId(store.getId()) + .state(4) + .resultDesc("initializing check client") + .build()); + } + + dataServiceWrapper.getHealthDataService().batchInsertNewCheckResult(checkResultEntities); + this.replaceCheckService(checkConfigs); + } catch (Exception e) { + log.error("updateHealthCheckConfigs error", e); + } + } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/callback/HealthCheckCallback.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/callback/HealthCheckCallback.java index 541d30c6..86e92c23 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/callback/HealthCheckCallback.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/callback/HealthCheckCallback.java @@ -20,7 +20,7 @@ import org.apache.eventmesh.dashboard.console.function.health.HealthExecutor; /** - * Callback used by HealthService.doCheck to notify the caller of the result of the health check.
+ * Callback used by HealthService.doCheck to notify the caller of the result of the health check.

* @see HealthExecutor */ public interface HealthCheckCallback { diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/HealthCheckService.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/HealthCheckService.java index 1461d384..5aa15e11 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/HealthCheckService.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/HealthCheckService.java @@ -21,14 +21,14 @@ import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; /** - * Health check service interface.
+ * Health check service interface.

* To add a new check service, extend the {@link AbstractHealthCheckService}. * @see AbstractHealthCheckService */ public interface HealthCheckService { /** - * Do the health check.
+ * Do the health check.

* To implement a new check service, add the necessary logic to call the success and fail functions of the callback. * @param callback The behaviour of the callback is defined as a lambda function when used. Please refer to {@link HealthExecutor} for usage. */ diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/config/HealthCheckObjectConfig.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/config/HealthCheckObjectConfig.java index 79e8c826..5ca93091 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/config/HealthCheckObjectConfig.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/config/HealthCheckObjectConfig.java @@ -19,16 +19,24 @@ import java.util.Properties; +import lombok.AllArgsConstructor; +import lombok.Builder.Default; import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data +@NoArgsConstructor +@AllArgsConstructor +@SuperBuilder public class HealthCheckObjectConfig { private Long instanceId; private String healthCheckResourceType; - private String healthCheckResourceSubType; + @Default + private String healthCheckResourceSubType = ""; private String simpleClassName; @@ -54,15 +62,10 @@ public class HealthCheckObjectConfig { //mysql, redis private String database; + @Default private Long requestTimeoutMillis = 100000L; + @Default private RocketmqConfig rocketmqConfig = new RocketmqConfig(); - @Data - public class RocketmqConfig { - - private String nameServerUrl; - private String brokerUrl; - private String endPoint; - } } \ No newline at end of file diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/HealthCheckConfig.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/config/RocketmqConfig.java similarity index 76% rename from eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/HealthCheckConfig.java rename to eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/config/RocketmqConfig.java index bbf05719..e49d5152 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/HealthCheckConfig.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/config/RocketmqConfig.java @@ -15,17 +15,18 @@ * limitations under the License. */ -package org.apache.eventmesh.dashboard.console.config; - -import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; - -import java.util.List; +package org.apache.eventmesh.dashboard.console.function.health.check.config; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; @Data +@NoArgsConstructor @AllArgsConstructor -public class HealthCheckConfig { - private List checkObjectConfigList; +public class RocketmqConfig { + + private String brokerUrl; + private String nameServerUrl; + private String endPoint; } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosConfigCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosConfigCheck.java index 1a2b0b1e..850738b7 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosConfigCheck.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosConfigCheck.java @@ -37,6 +37,9 @@ import lombok.extern.slf4j.Slf4j; +/** + * Interface to check the state of nacos + */ @Slf4j @HealthCheckType(type = HEALTH_CHECK_TYPE_META, subType = HEALTH_CHECK_SUBTYPE_NACOS_CONFIG) public class NacosConfigCheck extends AbstractHealthCheckService { diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosRegisterCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosNamingServiceCheck.java similarity index 89% rename from eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosRegisterCheck.java rename to eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosNamingServiceCheck.java index 5d53b6fe..0313cee9 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosRegisterCheck.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosNamingServiceCheck.java @@ -37,14 +37,17 @@ import lombok.extern.slf4j.Slf4j; +/** + * not used for now, recommend to use NacosConfigCheck to check health status of nacos + */ @Slf4j @HealthCheckType(type = HEALTH_CHECK_TYPE_META, subType = HEALTH_CHECK_SUBTYPE_NACOS_REGISTRY) -public class NacosRegisterCheck extends AbstractHealthCheckService { +public class NacosNamingServiceCheck extends AbstractHealthCheckService { private NamingService namingService; - public NacosRegisterCheck(HealthCheckObjectConfig healthCheckObjectConfig) { + public NacosNamingServiceCheck(HealthCheckObjectConfig healthCheckObjectConfig) { super(healthCheckObjectConfig); } @@ -60,6 +63,8 @@ public void doCheck(HealthCheckCallback callback) { } } catch (NacosException e) { callback.onFail(e); + } finally { + destroy(); } }); } @@ -72,7 +77,7 @@ public void init() { namingService = NamingFactory.createNamingService(properties); namingService.registerInstance(NACOS_CHECK_SERVICE_NAME, "11.11.11.11", 8888, NACOS_CHECK_SERVICE_CLUSTER_NAME); } catch (NacosException e) { - log.error("NacosRegisterCheck init failed", e); + log.error("NacosRegistryCheck init failed", e); } } @@ -81,7 +86,7 @@ public void destroy() { try { namingService.deregisterInstance(NACOS_CHECK_SERVICE_NAME, "11.11.11.11", 8888, NACOS_CHECK_SERVICE_CLUSTER_NAME); } catch (NacosException e) { - log.error("NacosRegisterCheck destroy failed", e); + log.error("NacosRegistryCheck destroy failed", e); } } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/RedisCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/RedisCheck.java index 5be24f2d..3cad4553 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/RedisCheck.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/RedisCheck.java @@ -17,28 +17,30 @@ package org.apache.eventmesh.dashboard.console.function.health.check.impl.storage; -import static org.apache.eventmesh.dashboard.common.constant.health.HealthCheckTypeConstant.HEALTH_CHECK_TYPE_STORAGE; - +import org.apache.eventmesh.dashboard.common.constant.health.HealthCheckTypeConstant; import org.apache.eventmesh.dashboard.console.function.health.annotation.HealthCheckType; import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; import org.apache.eventmesh.dashboard.console.function.health.check.AbstractHealthCheckService; import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; +import org.apache.eventmesh.dashboard.core.function.SDK.SDKManager; +import org.apache.eventmesh.dashboard.core.function.SDK.SDKTypeEnum; +import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateRedisConfig; import java.time.Duration; import java.util.Objects; -import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; import io.lettuce.core.RedisURI.Builder; +import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; import lombok.extern.slf4j.Slf4j; @Slf4j -@HealthCheckType(type = HEALTH_CHECK_TYPE_STORAGE, subType = "redis") +@HealthCheckType(type = HealthCheckTypeConstant.HEALTH_CHECK_TYPE_STORAGE, subType = HealthCheckTypeConstant.HEALTH_CHECK_SUBTYPE_REDIS) public class RedisCheck extends AbstractHealthCheckService { - private RedisClient redisClient; + private CreateRedisConfig sdkConfig; public RedisCheck(HealthCheckObjectConfig healthCheckObjectConfig) { super(healthCheckObjectConfig); @@ -47,7 +49,9 @@ public RedisCheck(HealthCheckObjectConfig healthCheckObjectConfig) { @Override public void doCheck(HealthCheckCallback callback) { try { - RedisAsyncCommands commands = redisClient.connect().async(); + StatefulRedisConnection connection = (StatefulRedisConnection) SDKManager.getInstance() + .getClient(SDKTypeEnum.STORAGE_REDIS, sdkConfig.getUniqueKey()); + RedisAsyncCommands commands = connection.async(); commands.ping().thenAccept(result -> { callback.onSuccess(); }).exceptionally(e -> { @@ -67,6 +71,7 @@ public void doCheck(HealthCheckCallback callback) { @Override public void init() { String redisUrl; + sdkConfig = new CreateRedisConfig(); if (Objects.nonNull(getConfig().getConnectUrl()) && !getConfig().getConnectUrl().isEmpty()) { redisUrl = getConfig().getConnectUrl(); } else { @@ -82,13 +87,12 @@ public void init() { } redisUrl = builder.build().toString(); } - redisClient = RedisClient.create(redisUrl); + sdkConfig.setRedisUrl(redisUrl); + SDKManager.getInstance().createClient(SDKTypeEnum.STORAGE_REDIS, sdkConfig); } @Override public void destroy() { - if (redisClient != null) { - redisClient.shutdown(); - } + SDKManager.getInstance().deleteClient(SDKTypeEnum.STORAGE_REDIS, sdkConfig.getUniqueKey()); } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4BrokerCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4BrokerCheck.java index b60a73ce..73d466b9 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4BrokerCheck.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4BrokerCheck.java @@ -20,21 +20,24 @@ import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; import org.apache.eventmesh.dashboard.console.function.health.check.AbstractHealthCheckService; import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; +import org.apache.eventmesh.dashboard.core.function.SDK.SDKManager; +import org.apache.eventmesh.dashboard.core.function.SDK.SDKTypeEnum; +import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateRocketmqConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.util.Objects; + import lombok.extern.slf4j.Slf4j; @Slf4j public class Rocketmq4BrokerCheck extends AbstractHealthCheckService { - private RemotingClient remotingClient; + private CreateRocketmqConfig config; public Rocketmq4BrokerCheck(HealthCheckObjectConfig healthCheckObjectConfig) { @@ -45,7 +48,8 @@ public Rocketmq4BrokerCheck(HealthCheckObjectConfig healthCheckObjectConfig) { public void doCheck(HealthCheckCallback callback) { try { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null); - remotingClient.invokeAsync(getConfig().getRocketmqConfig().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis(), + RemotingClient client = (RemotingClient) SDKManager.getInstance().getClient(SDKTypeEnum.STORAGE_ROCKETMQ_REMOTING, config.getUniqueKey()); + client.invokeAsync(getConfig().getRocketmqConfig().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis(), new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { @@ -65,28 +69,30 @@ public void operationComplete(ResponseFuture responseFuture) { @Override public void init() { - if (getConfig().getRocketmqConfig().getBrokerUrl() == null || getConfig().getRocketmqConfig().getBrokerUrl().isEmpty()) { - throw new IllegalArgumentException("RocketmqCheck failed. BrokerUrl is null."); - } + setBrokerUrl(); - NettyClientConfig config = new NettyClientConfig(); - config.setUseTLS(false); - remotingClient = new NettyRemotingClient(config); - remotingClient.start(); + config = new CreateRocketmqConfig(); + config.setBrokerUrl(getConfig().getRocketmqConfig().getBrokerUrl()); + SDKManager.getInstance().createClient(SDKTypeEnum.STORAGE_ROCKETMQ_REMOTING, config); + } - if (getConfig().getConnectUrl() == null || getConfig().getConnectUrl().isEmpty()) { - if (getConfig().getHost() != null && getConfig().getPort() != null) { - getConfig().setConnectUrl(getConfig().getHost() + ":" + getConfig().getPort()); - } + private void setBrokerUrl() { + if (Objects.nonNull(getConfig().getRocketmqConfig()) && Objects.nonNull(getConfig().getRocketmqConfig().getBrokerUrl())) { + return; } - - if (getConfig().getConnectUrl() == null) { - log.error("RocketmqCheck failed. ConnectUrl is null."); + if (Objects.nonNull(getConfig().getConnectUrl()) && !getConfig().getConnectUrl().isEmpty()) { + getConfig().getRocketmqConfig().setBrokerUrl(getConfig().getConnectUrl()); + return; + } + if (Objects.nonNull(getConfig().getHost()) && Objects.nonNull(getConfig().getPort())) { + getConfig().getRocketmqConfig().setBrokerUrl(getConfig().getHost() + ":" + getConfig().getPort()); + return; } + throw new RuntimeException("RocketmqNameServerCheck init failed, brokerUrl is empty"); } @Override public void destroy() { - + SDKManager.getInstance().deleteClient(SDKTypeEnum.STORAGE_ROCKETMQ_REMOTING, config.getUniqueKey()); } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4NameServerCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4NameServerCheck.java index 7ba3667e..00e7739a 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4NameServerCheck.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4NameServerCheck.java @@ -22,22 +22,25 @@ import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; import org.apache.eventmesh.dashboard.console.function.health.check.AbstractHealthCheckService; import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; +import org.apache.eventmesh.dashboard.core.function.SDK.SDKManager; +import org.apache.eventmesh.dashboard.core.function.SDK.SDKTypeEnum; +import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateRocketmqConfig; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.util.Objects; + import lombok.extern.slf4j.Slf4j; @Slf4j @HealthCheckType(type = HealthCheckTypeConstant.HEALTH_CHECK_TYPE_STORAGE, subType = HealthCheckTypeConstant.HEALTH_CHECK_SUBTYPE_ROCKETMQ) public class Rocketmq4NameServerCheck extends AbstractHealthCheckService { - private RemotingClient remotingClient; + private CreateRocketmqConfig config; public Rocketmq4NameServerCheck(HealthCheckObjectConfig healthCheckObjectConfig) { super(healthCheckObjectConfig); @@ -47,7 +50,8 @@ public Rocketmq4NameServerCheck(HealthCheckObjectConfig healthCheckObjectConfig) public void doCheck(HealthCheckCallback callback) { try { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_NAMESRV_CONFIG, null); - remotingClient.invokeAsync(getConfig().getRocketmqConfig().getNameServerUrl(), request, getConfig().getRequestTimeoutMillis(), + RemotingClient client = (RemotingClient) SDKManager.getInstance().getClient(SDKTypeEnum.STORAGE_ROCKETMQ_REMOTING, config.getUniqueKey()); + client.invokeAsync(getConfig().getRocketmqConfig().getNameServerUrl(), request, getConfig().getRequestTimeoutMillis(), new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { @@ -67,15 +71,26 @@ public void operationComplete(ResponseFuture responseFuture) { @Override public void init() { - if (getConfig().getRocketmqConfig().getNameServerUrl() == null || getConfig().getRocketmqConfig().getNameServerUrl().isEmpty()) { - throw new RuntimeException("RocketmqNameServerCheck init failed, nameServerUrl is empty"); - } + setNameServerUrl(); - NettyClientConfig config = new NettyClientConfig(); - config.setUseTLS(false); - remotingClient = new NettyRemotingClient(config); - remotingClient.start(); + config = new CreateRocketmqConfig(); + config.setNameServerUrl(getConfig().getRocketmqConfig().getNameServerUrl()); + SDKManager.getInstance().createClient(SDKTypeEnum.STORAGE_ROCKETMQ_REMOTING, config); + } + private void setNameServerUrl() { + if (Objects.nonNull(getConfig().getRocketmqConfig().getNameServerUrl())) { + return; + } + if (Objects.nonNull(getConfig().getConnectUrl()) && !getConfig().getConnectUrl().isEmpty()) { + getConfig().getRocketmqConfig().setNameServerUrl(getConfig().getConnectUrl()); + return; + } + if (Objects.nonNull(getConfig().getHost()) && Objects.nonNull(getConfig().getPort())) { + getConfig().getRocketmqConfig().setNameServerUrl(getConfig().getHost() + ":" + getConfig().getPort()); + return; + } + throw new RuntimeException("RocketmqNameServerCheck init failed, NameServerUrl is empty"); } @Override diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4TopicCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4TopicCheck.java index 6656589c..1acbe934 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4TopicCheck.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4TopicCheck.java @@ -134,6 +134,7 @@ public void init() { remotingClient.start(); //TODO there are many functions that can be reused, they should be collected in a util module + //TODO: refactor all health check to use client manager //this function that create topics can be reused try { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/MetadataManager.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/MetadataManager.java new file mode 100644 index 00000000..261d429a --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/MetadataManager.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata; + +import org.apache.eventmesh.dashboard.common.model.metadata.MetadataConfig; +import org.apache.eventmesh.dashboard.console.entity.base.BaseEntity; +import org.apache.eventmesh.dashboard.console.function.metadata.MetadataServiceWrapper.SingleMetadataServiceWrapper; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.MetadataHandlerWrapper; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.SyncDataServiceWrapper; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.validation.constraints.NotNull; + +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +/** + * MetadataManager is a manager for metadata service, it will sync the data between cluster service and database. database should be empty when this + * manager booted + */ +@Slf4j +public class MetadataManager { + + @Setter + private Boolean toDbSync = true; + + private final ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(2); + + private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(32, 32, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), + new ThreadFactory() { + final AtomicInteger counter = new AtomicInteger(0); + + @Override + public Thread newThread(@NotNull Runnable r) { + return new Thread(r, "metadata-manager-" + counter.incrementAndGet()); + } + }); + /** + * singleton id for service wrapper map, even if the cache is not on, the id should be increased. + */ + private static final AtomicLong staticServiceId = new AtomicLong(0); + + private static final ConcurrentHashMap firstRunToDb = new ConcurrentHashMap<>(); + + private final Map metaDataServiceWrapperMap = new ConcurrentHashMap<>(); + + private final Map> cacheData = new ConcurrentHashMap<>(); + + + public void init(Integer initialDelay, Integer period) { + + scheduledExecutorService.scheduleAtFixedRate(() -> MetadataManager.this.run(toDbSync, true), initialDelay, period, TimeUnit.SECONDS); + } + + + /** + * entrance of a sync scheduled task + * + * @param metaDataServiceWrapper + */ + public void addMetadataService(MetadataServiceWrapper metaDataServiceWrapper) { + Long cacheId = staticServiceId.incrementAndGet(); + metaDataServiceWrapper.setCacheId(cacheId); + metaDataServiceWrapperMap.put(cacheId, metaDataServiceWrapper); + } + + public void run() { + metaDataServiceWrapperMap.forEach(this::handlers); + } + + public void run(Boolean toDbOn, Boolean toServiceOn) { + try { + metaDataServiceWrapperMap.forEach((cacheId, metaDataServiceWrapper) -> handlers(cacheId, metaDataServiceWrapper, toDbOn, toServiceOn)); + } catch (Exception e) { + log.error("metadata manager run error", e); + } + } + + public void handlers(Long cacheId, MetadataServiceWrapper metaDataServiceWrapper, Boolean toDbOn, Boolean toServiceOn) { + this.threadPoolExecutor.execute(() -> { + try { + if (toDbOn) { + this.handler(cacheId, metaDataServiceWrapper.getDbToService(), true); + } + if (toServiceOn) { + this.handler(cacheId, metaDataServiceWrapper.getServiceToDb(), false); + } + } catch (Throwable e) { + log.error("metadata manager handler error", e); + } + }); + } + + public void handlers(Long cacheId, MetadataServiceWrapper metaDataServiceWrapper) { + handlers(cacheId, metaDataServiceWrapper, true, true); + } + + public void handler(Long cacheID, SingleMetadataServiceWrapper singleMetadataServiceWrapper, boolean isDbToService) { + if (singleMetadataServiceWrapper == null) { + return; + } + + try { + List newObjectList = (List) singleMetadataServiceWrapper.getSyncService().getData(); + if (newObjectList.isEmpty()) { + return; + } + + //if cache is false, we don't need to compare the data + // full volume updates + if (!singleMetadataServiceWrapper.getCache()) { + singleMetadataServiceWrapper.getHandler() + .replaceMetadata(newObjectList); + return; + } + + List cacheDataList = cacheData.get(cacheID); + //update old cache + cacheData.put(cacheID, newObjectList); + + Map newObjectMap = getUniqueKeyMap(newObjectList); + Map oldObjectMap = getUniqueKeyMap(cacheDataList); + + //these three List are in target type + List toUpdate = new ArrayList<>(); + List toDelete = new ArrayList<>(); + List toInsert; + + for (Entry entry : oldObjectMap.entrySet()) { + + Object serviceObject = newObjectMap.remove(entry.getKey()); + //if new Data don't have a key in oldMap, + if (serviceObject == null) { + toDelete.add(entry.getValue()); + } else { + //primary id, creat time and update time should not be compared + //if not equal, we need to update fields except unique key(they are equal) + //cause entry is from the oldMap, it should contain the primary key. + if (!serviceObject.equals(entry.getValue())) { + toUpdate.add(entry.getValue()); + } + } + } + + toInsert = new ArrayList<>(newObjectMap.values()); + + //if target is db, we use handler to provide transaction + if (!isDbToService) { + firstRunToDb.putIfAbsent(cacheID, false); + singleMetadataServiceWrapper.getHandler().handleAllObject(toInsert, toUpdate, toDelete); + //if target is eventmesh, we just use that 3 basic method + } else { + toInsert.forEach(singleMetadataServiceWrapper.getHandler()::addMetadataObject); + toUpdate.forEach(singleMetadataServiceWrapper.getHandler()::updateMetadataObject); + toDelete.forEach(singleMetadataServiceWrapper.getHandler()::deleteMetadataObject); + } + } catch (Throwable e) { + log.error("metadata manager handler error", e); + } + } + + public void setUpSyncMetadataManager(SyncDataServiceWrapper syncDataServiceWrapper, MetadataHandlerWrapper metadataHandlerWrapper) { + MetadataServiceWrapper metadataServiceWrapper = new MetadataServiceWrapper(); + SingleMetadataServiceWrapper singleMetadataServiceWrapper = SingleMetadataServiceWrapper.builder() + .syncService(syncDataServiceWrapper.getRuntimeSyncFromClusterService()) + .handler(metadataHandlerWrapper.getRuntimeMetadataHandlerToDb()).build(); + metadataServiceWrapper.setServiceToDb(singleMetadataServiceWrapper); + this.addMetadataService(metadataServiceWrapper); + } + + private Map getUniqueKeyMap(List list) { + Map map = new HashMap<>(); + if (Objects.nonNull(list) && !list.isEmpty()) { + Object firstItem = list.get(0); + if (firstItem instanceof MetadataConfig) { + for (Object item : list) { + MetadataConfig metadataItem = (MetadataConfig) item; + map.put(metadataItem.getUnique(), metadataItem); + } + } else if (firstItem instanceof BaseEntity) { + for (Object item : list) { + BaseEntity baseEntityItem = (BaseEntity) item; + //TODO we don't have db2service method and getUniqueKeyMap from entity is not used + + // map.put(baseEntityItem.getUniqueKey(), baseEntityItem); + } + } + } + return map; + } + + //TODO if database is modified by other service, we need to update the cache +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/MetadataServiceWrapper.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/MetadataServiceWrapper.java new file mode 100644 index 00000000..c323cdf8 --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/MetadataServiceWrapper.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata; + +import org.apache.eventmesh.dashboard.console.function.metadata.handler.MetadataHandler; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.SyncDataService; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Builder.Default; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +public class MetadataServiceWrapper { + + private SingleMetadataServiceWrapper dbToService; + + private SingleMetadataServiceWrapper serviceToDb; + + private Long cacheId; + + @Data + @AllArgsConstructor + @NoArgsConstructor + @Builder + public static class SingleMetadataServiceWrapper { + + /** + * true -> incremental updates false -> full volume updates + * @See MetadataManager + */ + @Default + private Boolean cache = true; + + /** + * syncService is the source of metadata + */ + private SyncDataService syncService; + + /** + * handler is the target of metadata, it will process the metadata from syncService + */ + private MetadataHandler handler; + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/MetadataHandler.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/MetadataHandler.java new file mode 100644 index 00000000..f0715249 --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/MetadataHandler.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.handler; + +import java.util.List; + +/** + * @param metadata type or entity type, {@code } is the source type of handler, there should be a converter in the handler to convert + * {@code } to the target type.

method in this interface should be implemented as async method, if the method is eventmesh manage + * operation. + */ +public interface MetadataHandler { + + + default void handleAll(List addData, List updateData, List deleteData) { + if (addData != null) { + addData.forEach(this::addMetadata); + } + if (updateData != null) { + updateData.forEach(this::updateMetadata); + } + if (deleteData != null) { + deleteData.forEach(this::deleteMetadata); + } + } + + default void handleAllObject(List addData, List updateData, List deleteData) { + handleAll((List) addData, (List) updateData, (List) deleteData); + } + + //metaData: topic, center, etc. add meta is to create a topic. + void addMetadata(S meta); + + default void addMetadata(List meta) { + if (meta != null) { + meta.forEach(this::addMetadata); + } + } + + default void addMetadataObject(Object meta) { + addMetadata((S) meta); + } + + default void addMetadataObject(List meta) { + if (meta != null) { + meta.forEach(t -> addMetadata((S) t)); + } + } + + default void replaceMetadata(List meta) { + if (meta != null) { + deleteMetadata((List) meta); + addMetadataObject(meta); + } + } + + default void updateMetadata(S meta) { + this.addMetadata(meta); + } + + /** + * If this handler is db handler, do implement this method to improve performance + * + * @param meta + */ + default void updateMetadata(List meta) { + if (meta != null) { + meta.forEach(this::updateMetadata); + } + } + + default void updateMetadataObject(Object meta) { + this.addMetadata((S) meta); + } + + void deleteMetadata(S meta); + + default void deleteMetadata(List meta) { + if (meta != null) { + meta.forEach(this::deleteMetadata); + } + } + + default void deleteMetadataObject(Object meta) { + deleteMetadata((S) meta); + } + + +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/MetadataHandlerWrapper.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/MetadataHandlerWrapper.java new file mode 100644 index 00000000..5722081a --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/MetadataHandlerWrapper.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.handler; + +import org.apache.eventmesh.dashboard.console.function.metadata.handler.db.ClusterMetadataHandlerToDbImpl; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.db.ConfigMetadataHandlerToDbImpl; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.db.ConnectionMetadataHandlerToDbImpl; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.db.GroupMetadataHandlerToDbImpl; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.db.RegistryMetadataHandlerToDbImpl; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.db.RuntimeMetadataHandlerToDbImpl; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.db.TopicMetadataHandlerToDbImpl; +import org.apache.eventmesh.dashboard.console.spring.support.FunctionManagerLoader; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import lombok.Getter; + +/** + * MetadataHandlerWrapper is a wrapper class for all metadata handlers. It is used to inject all metadata handlers. + * + * @see FunctionManagerLoader + */ +@Getter +@Component +public class MetadataHandlerWrapper { + + @Autowired + private ClusterMetadataHandlerToDbImpl clusterMetadataHandlerToDb; + @Autowired + private ConfigMetadataHandlerToDbImpl configMetadataHandlerToDb; + @Autowired + private ConnectionMetadataHandlerToDbImpl connectionMetadataHandlerToDb; + @Autowired + private GroupMetadataHandlerToDbImpl groupMetadataHandlerToDb; + @Autowired + private RegistryMetadataHandlerToDbImpl registryMetadataHandlerToDb; + @Autowired + private RuntimeMetadataHandlerToDbImpl runtimeMetadataHandlerToDb; + @Autowired + private TopicMetadataHandlerToDbImpl topicMetadataHandlerToDb; +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/ClusterMetadataHandlerToDbImpl.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/ClusterMetadataHandlerToDbImpl.java new file mode 100644 index 00000000..eae42b4f --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/ClusterMetadataHandlerToDbImpl.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.handler.db; + +import org.apache.eventmesh.dashboard.common.model.metadata.ClusterMetadata; +import org.apache.eventmesh.dashboard.console.entity.cluster.ClusterEntity; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.MetadataHandler; +import org.apache.eventmesh.dashboard.console.service.cluster.ClusterService; + +import java.util.List; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class ClusterMetadataHandlerToDbImpl implements MetadataHandler { + + @Autowired + private ClusterService clusterService; + + @Override + public void addMetadata(ClusterMetadata meta) { + clusterService.addCluster(new ClusterEntity(meta)); + } + + @Override + public void addMetadata(List metadataList) { + List entityList = metadataList.stream() + .map(ClusterEntity::new) + .collect(Collectors.toList()); + clusterService.batchInsert(entityList); + } + + @Override + public void deleteMetadata(ClusterMetadata meta) { + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/ConfigMetadataHandlerToDbImpl.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/ConfigMetadataHandlerToDbImpl.java new file mode 100644 index 00000000..5711ca16 --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/ConfigMetadataHandlerToDbImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.handler.db; + +import org.apache.eventmesh.dashboard.common.model.metadata.ConfigMetadata; +import org.apache.eventmesh.dashboard.console.entity.config.ConfigEntity; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.MetadataHandler; +import org.apache.eventmesh.dashboard.console.service.config.ConfigService; + +import java.util.List; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +public class ConfigMetadataHandlerToDbImpl implements MetadataHandler { + + @Autowired + private ConfigService configService; + + + @Override + public void addMetadata(ConfigMetadata meta) { + configService.addConfig(new ConfigEntity(meta)); + } + + @Override + public void addMetadata(List meta) { + List entityList = meta.stream() + .map(ConfigEntity::new) + .collect(Collectors.toList()); + configService.batchInsert(entityList); + } + + @Override + public void deleteMetadata(ConfigMetadata meta) { + configService.deleteConfig(new ConfigEntity(meta)); + } +} + diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/ConnectionMetadataHandlerToDbImpl.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/ConnectionMetadataHandlerToDbImpl.java new file mode 100644 index 00000000..fb143158 --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/ConnectionMetadataHandlerToDbImpl.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.handler.db; + +import org.apache.eventmesh.dashboard.common.enums.RecordStatus; +import org.apache.eventmesh.dashboard.common.model.metadata.ConnectionMetadata; +import org.apache.eventmesh.dashboard.console.entity.client.ClientEntity; +import org.apache.eventmesh.dashboard.console.entity.connection.ConnectionEntity; +import org.apache.eventmesh.dashboard.console.entity.connector.ConnectorEntity; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.MetadataHandler; +import org.apache.eventmesh.dashboard.console.service.client.ClientDataService; +import org.apache.eventmesh.dashboard.console.service.connection.ConnectionDataService; +import org.apache.eventmesh.dashboard.console.service.connector.ConnectorDataService; + +import java.util.List; +import java.util.Objects; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import lombok.extern.slf4j.Slf4j; + +@Service +@Slf4j +public class ConnectionMetadataHandlerToDbImpl implements MetadataHandler { + + @Autowired + private ClientDataService clientDataService; + + @Autowired + private ConnectionDataService connectionService; + + @Autowired + private ConnectorDataService connectorDataService; + + @Override + public void addMetadata(ConnectionMetadata meta) { + if (Objects.equals(meta.getSinkType(), "connector")) { + ConnectorEntity query = ConnectorEntity.builder() + .host(meta.getSinkHost()) + .port(meta.getSinkPort()) + .build(); + List sink = connectorDataService.selectByHostPort(query); + if (sink.size() == 1) { + meta.setSinkId(sink.get(0).getId()); + } else if (sink.isEmpty()) { + log.info("sink connector not found, sinkHost:{}, sinkPort:{}.creating one", meta.getSinkHost(), meta.getSinkPort()); + ConnectorEntity connectorEntity = new ConnectorEntity(meta.getClusterId(), meta.getSinkName(), "", "", 1, meta.getSinkHost(), + meta.getSinkPort(), 4, ""); + connectorDataService.createConnector(connectorEntity); + meta.setSinkId(connectorEntity.getId()); + } else { + log.error("more than 1 sink connector active, sinkHost:{}, sinkPort:{}", meta.getSinkHost(), meta.getSinkPort()); + } + + } else if (Objects.equals(meta.getSinkType(), "client")) { + ClientEntity query = new ClientEntity(); + query.setHost(meta.getSinkHost()); + query.setPort(meta.getSinkPort()); + List sink = clientDataService.selectByHostPort(query); + if (sink.size() == 1) { + meta.setSinkId(sink.get(0).getId()); + } else if (sink.isEmpty()) { + log.info("sink client not found, sinkHost:{}, sinkPort:{}.creating one", meta.getSinkHost(), meta.getSinkPort()); + ClientEntity clientEntity = new ClientEntity(); + clientEntity.setStatusEntity(RecordStatus.ACTIVE); + clientEntity.setName(""); + clientEntity.setPlatform(""); + clientEntity.setLanguage(""); + clientEntity.setPid(0L); + clientEntity.setProtocol(""); + clientEntity.setConfigIds(""); + clientEntity.setDescription(""); + clientEntity.setClusterId(0L); + clientEntity.setHost(meta.getSinkHost()); + clientEntity.setPort(meta.getSinkPort()); + clientDataService.addClient(clientEntity); + meta.setSinkId(clientEntity.getId()); + } else { + log.error("more than 1 sink client active, sinkHost:{}, sinkPort:{}", meta.getSinkHost(), meta.getSinkPort()); + } + } + + if (Objects.equals(meta.getSourceType(), "connector")) { + ConnectorEntity query = ConnectorEntity.builder() + .host(meta.getSourceHost()) + .port(meta.getSourcePort()) + .build(); + List source = connectorDataService.selectByHostPort(query); + if (source.size() == 1) { + meta.setSourceId(source.get(0).getId()); + } else if (source.isEmpty()) { + log.info("source connector not found, sourceHost:{}, sourcePort:{}.creating one", meta.getSourceHost(), meta.getSourcePort()); + ConnectorEntity connectorEntity = new ConnectorEntity(meta.getClusterId(), meta.getSourceName(), "", "", 1, meta.getSourceHost(), + meta.getSourcePort(), 4, ""); + connectorDataService.createConnector(connectorEntity); + meta.setSourceId(connectorEntity.getId()); + } else { + log.error("more than 1 source connector active, sourceHost:{}, sourcePort:{}", meta.getSourceHost(), meta.getSourcePort()); + } + } + + if (Objects.equals(meta.getSourceType(), "client")) { + ClientEntity query = new ClientEntity(); + query.setHost(meta.getSourceHost()); + query.setPort(meta.getSourcePort()); + List source = clientDataService.selectByHostPort(query); + if (source.size() == 1) { + meta.setSourceId(source.get(0).getId()); + } else if (source.isEmpty()) { + log.info("source client not found, sourceHost:{}, sourcePort:{}.creating one", meta.getSourceHost(), meta.getSourcePort()); + + ClientEntity clientEntity = new ClientEntity(); + clientEntity.setStatusEntity(RecordStatus.ACTIVE); + clientEntity.setName(""); + clientEntity.setPlatform(""); + clientEntity.setLanguage(""); + clientEntity.setPid(0L); + clientEntity.setProtocol(""); + clientEntity.setConfigIds(""); + clientEntity.setDescription(""); + clientEntity.setClusterId(0L); + clientEntity.setHost(meta.getSinkHost()); + clientEntity.setPort(meta.getSinkPort()); + clientDataService.addClient(clientEntity); + meta.setSourceId(clientEntity.getId()); + } else { + log.error("more than 1 source client active, sourceHost:{}, sourcePort:{}", meta.getSourceHost(), meta.getSourcePort()); + } + } + + connectionService.insert(new ConnectionEntity(meta)); + } + + @Override + public void deleteMetadata(ConnectionMetadata meta) { + + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/GroupMetadataHandlerToDbImpl.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/GroupMetadataHandlerToDbImpl.java new file mode 100644 index 00000000..30914558 --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/GroupMetadataHandlerToDbImpl.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.handler.db; + +import org.apache.eventmesh.dashboard.common.model.metadata.GroupMetadata; +import org.apache.eventmesh.dashboard.console.entity.group.GroupEntity; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.MetadataHandler; +import org.apache.eventmesh.dashboard.console.service.group.GroupService; + +import java.util.List; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class GroupMetadataHandlerToDbImpl implements MetadataHandler { + + @Autowired + GroupService groupService; + + @Override + public void addMetadata(GroupMetadata meta) { + meta.setMemberCount(0); + GroupEntity groupEntity = new GroupEntity(meta); + groupService.addGroup(groupEntity); + } + + @Override + public void addMetadata(List metadata) { + List entityList = metadata.stream().map(GroupEntity::new).collect(Collectors.toList()); + groupService.batchInsert(entityList); + } + + @Override + public void deleteMetadata(GroupMetadata meta) { + + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/RegistryMetadataHandlerToDbImpl.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/RegistryMetadataHandlerToDbImpl.java new file mode 100644 index 00000000..16c3307a --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/RegistryMetadataHandlerToDbImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.handler.db; + +import org.apache.eventmesh.dashboard.common.model.metadata.RegistryMetadata; +import org.apache.eventmesh.dashboard.console.entity.meta.MetaEntity; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.MetadataHandler; +import org.apache.eventmesh.dashboard.console.service.registry.RegistryDataService; + +import java.util.List; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +public class RegistryMetadataHandlerToDbImpl implements MetadataHandler { + + @Autowired + private RegistryDataService registryDataService; + + @Override + public void addMetadata(RegistryMetadata meta) { + registryDataService.insert(new MetaEntity(meta)); + } + + @Override + public void addMetadata(List meta) { + List entityList = meta.stream() + .map(MetaEntity::new) + .collect(Collectors.toList()); + registryDataService.batchInsert(entityList); + } + + @Override + public void deleteMetadata(RegistryMetadata meta) { + registryDataService.deactivate(new MetaEntity(meta)); + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/RuntimeMetadataHandlerToDbImpl.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/RuntimeMetadataHandlerToDbImpl.java new file mode 100644 index 00000000..d8df9f2a --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/RuntimeMetadataHandlerToDbImpl.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.handler.db; + +import org.apache.eventmesh.dashboard.common.model.metadata.RuntimeMetadata; +import org.apache.eventmesh.dashboard.console.cache.ClusterCache; +import org.apache.eventmesh.dashboard.console.cache.RuntimeCache; +import org.apache.eventmesh.dashboard.console.entity.cluster.ClusterEntity; +import org.apache.eventmesh.dashboard.console.entity.runtime.RuntimeEntity; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.MetadataHandler; +import org.apache.eventmesh.dashboard.console.service.cluster.ClusterService; +import org.apache.eventmesh.dashboard.console.service.runtime.RuntimeService; + +import java.util.Objects; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +public class RuntimeMetadataHandlerToDbImpl implements MetadataHandler { + + @Autowired + RuntimeService runtimeService; + + @Autowired + ClusterService clusterService; + + @Override + public void addMetadata(RuntimeMetadata meta) { + ClusterEntity cluster = ClusterCache.getINSTANCE().getClusterByRegistryAddress(meta.getRegistryAddress()); + if (Objects.isNull(cluster)) { + log.info("new cluster detected syncing runtime, adding cluster to db, cluster:{}", meta.getClusterName()); + ClusterEntity clusterEntity = new ClusterEntity(); + clusterEntity.setId(0L); + clusterEntity.setName(meta.getClusterName()); + clusterEntity.setRegistryAddress(meta.getRegistryAddress()); + clusterEntity.setBootstrapServers(""); + clusterEntity.setEventmeshVersion(""); + clusterEntity.setClientProperties(""); + clusterEntity.setJmxProperties(""); + clusterEntity.setRegProperties(""); + clusterEntity.setDescription(""); + clusterEntity.setAuthType(0); + clusterEntity.setRunState(0); + clusterEntity.setStoreType(0); + + clusterService.addCluster(clusterEntity); + } else { + cluster.setName(meta.getClusterName()); + clusterService.addCluster(cluster); + } + if (Objects.isNull(meta.getClusterId())) { + meta.setClusterId(ClusterCache.getINSTANCE().getClusterByName(meta.getClusterName()).getId()); + } + runtimeService.addRuntime(new RuntimeEntity(meta)); + RuntimeCache.getInstance().addRuntime(new RuntimeEntity(meta)); + } + + @Override + public void deleteMetadata(RuntimeMetadata meta) { + runtimeService.deactivate(new RuntimeEntity(meta)); + RuntimeCache.getInstance().deleteRuntime(new RuntimeEntity(meta)); + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/TopicMetadataHandlerToDbImpl.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/TopicMetadataHandlerToDbImpl.java new file mode 100644 index 00000000..b15bfa8b --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/handler/db/TopicMetadataHandlerToDbImpl.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.handler.db; + +import org.apache.eventmesh.dashboard.common.model.metadata.TopicMetadata; +import org.apache.eventmesh.dashboard.console.entity.storage.StoreEntity; +import org.apache.eventmesh.dashboard.console.entity.topic.TopicEntity; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.MetadataHandler; +import org.apache.eventmesh.dashboard.console.service.store.StoreService; +import org.apache.eventmesh.dashboard.console.service.topic.TopicService; + +import java.net.URI; +import java.util.Objects; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +public class TopicMetadataHandlerToDbImpl implements MetadataHandler { + + @Autowired + private TopicService topicService; + + @Autowired + private StoreService storeService; + + @Override + public void addMetadata(TopicMetadata meta) { + if (Objects.nonNull(meta.getConnectionUrl())) { + URI uri = URI.create(meta.getConnectionUrl()); + if (Objects.nonNull(uri.getHost()) && uri.getPort() != -1) { + StoreEntity store = storeService.selectByHostPort(uri.getHost(), uri.getPort()); + if (Objects.nonNull(store)) { + meta.setStorageId(store.getId()); + } + } + } + + topicService.addTopic(new TopicEntity(meta)); + } + + @Override + public void deleteMetadata(TopicMetadata meta) { + topicService.deleteTopic(null); + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/SyncDataService.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/SyncDataService.java new file mode 100644 index 00000000..76776e3a --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/SyncDataService.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.syncservice; + +import java.util.List; + +/** + * interface to get data from different sources, including database or eventmesh cluster + * + * @param Entity or Metadata + */ +public interface SyncDataService { + + public List getData(); +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/SyncDataServiceWrapper.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/SyncDataServiceWrapper.java new file mode 100644 index 00000000..11bd912d --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/SyncDataServiceWrapper.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.syncservice; + +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.cluster.AclSyncFromClusterService; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.cluster.ConfigSyncFromClusterService; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.cluster.GroupSyncFromClusterService; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.cluster.InstanceUserFromClusterService; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.cluster.RuntimeSyncFromClusterService; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.cluster.TopicSyncFromClusterService; +import org.apache.eventmesh.dashboard.console.spring.support.FunctionManagerLoader; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import lombok.Getter; + +/** + * SyncDataServiceWrapper is a wrapper class for all sync services. It is used to inject all sync services. + * + * @see FunctionManagerLoader + */ +@Getter +@Component +public class SyncDataServiceWrapper { + + @Autowired + private AclSyncFromClusterService aclSyncFromClusterService; + @Autowired + private ConfigSyncFromClusterService configSyncFromClusterService; + @Autowired + private GroupSyncFromClusterService groupSyncFromClusterService; + @Autowired + private RuntimeSyncFromClusterService runtimeSyncFromClusterService; + @Autowired + private InstanceUserFromClusterService instanceUserFromClusterService; + @Autowired + private TopicSyncFromClusterService topicSyncFromClusterService; +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/AclSyncFromClusterService.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/AclSyncFromClusterService.java new file mode 100644 index 00000000..97e43a3c --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/AclSyncFromClusterService.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.syncservice.cluster; + +import org.apache.eventmesh.dashboard.common.model.metadata.AclMetadata; +import org.apache.eventmesh.dashboard.common.model.remoting.acl.GetAclsRequest; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.SyncDataService; +import org.apache.eventmesh.dashboard.service.remoting.AclRemotingService; + +import java.util.List; + +import org.springframework.stereotype.Service; + +import lombok.Setter; + +@Service +public class AclSyncFromClusterService implements SyncDataService { + + @Setter + private AclRemotingService aclRemotingService; + + @Override + public List getData() { + GetAclsRequest getAclsRequest = new GetAclsRequest(); + // aclRemotingService.getAllAcls(getAclsRequest). + return null; + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/ConfigSyncFromClusterService.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/ConfigSyncFromClusterService.java new file mode 100644 index 00000000..6cbf9584 --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/ConfigSyncFromClusterService.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.syncservice.cluster; + +import org.apache.eventmesh.dashboard.common.model.metadata.ConfigMetadata; +import org.apache.eventmesh.dashboard.common.model.remoting.config.GetConfigRequest; +import org.apache.eventmesh.dashboard.common.model.remoting.config.GetConfigResponse; +import org.apache.eventmesh.dashboard.console.cache.ClusterCache; +import org.apache.eventmesh.dashboard.console.cache.RuntimeCache; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.SyncDataService; +import org.apache.eventmesh.dashboard.console.service.store.StoreService; +import org.apache.eventmesh.dashboard.service.remoting.ConfigRemotingService; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +public class ConfigSyncFromClusterService implements SyncDataService { + + @Autowired + private StoreService storeDataService; + + @Setter + ConfigRemotingService configRemotingService; + + @Override + public List getData() { + List> futures = new ArrayList<>(); + futures.add(getConfigsFromRegistry()); + futures.add(getConfigsFromRuntime()); + futures.add(getConfigsFromKafka()); + CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + + CompletableFuture> allConfigMetadataFuture = allFutures.thenApply(v -> + futures.stream() + .map(future -> { + try { + return future.get().getConfigMetadataList(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }) + .flatMap(List::stream) + .collect(Collectors.toList()) + ); + + return allConfigMetadataFuture.join(); + } + + private CompletableFuture getConfigsFromRegistry() { + GetConfigRequest registryRequest = new GetConfigRequest(); + List registryList = new ArrayList<>(); + ClusterCache.getINSTANCE().getClusters().forEach(clusterEntity -> registryList.add(clusterEntity.getRegistryAddress())); + registryRequest.setRegistryAddressList(registryList); + return configRemotingService.getConfigsFromRegistry(registryRequest).getGetConfigResponseFuture(); + } + + private CompletableFuture getConfigsFromRuntime() { + GetConfigRequest runtimeRequest = new GetConfigRequest(); + List runtimeAddressList = new ArrayList<>(); + RuntimeCache.getInstance().getRuntimeList().forEach(runtimeEntity -> { + runtimeAddressList.add(runtimeEntity.getHost() + ":" + runtimeEntity.getPort()); + }); + runtimeRequest.setRuntimeAddressList(runtimeAddressList); + return configRemotingService.getConfigsFromRuntime(runtimeRequest).getGetConfigResponseFuture(); + } + + private CompletableFuture getConfigsFromKafka() { + GetConfigRequest brokerRequest = new GetConfigRequest(); + return null; + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/GroupSyncFromClusterService.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/GroupSyncFromClusterService.java new file mode 100644 index 00000000..13dc104a --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/GroupSyncFromClusterService.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.syncservice.cluster; + +import org.apache.eventmesh.dashboard.common.model.metadata.GroupMetadata; +import org.apache.eventmesh.dashboard.common.model.remoting.group.GetGroupsRequest; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.SyncDataService; +import org.apache.eventmesh.dashboard.service.remoting.GroupRemotingService; + +import java.util.List; + +import org.springframework.stereotype.Service; + +import lombok.Setter; + +@Service +public class GroupSyncFromClusterService implements SyncDataService { + + @Setter + private GroupRemotingService groupRemotingService; + + @Override + public List getData() { + GetGroupsRequest getGroupsRequest = new GetGroupsRequest(); + try { + return null; + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/InstanceUserFromClusterService.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/InstanceUserFromClusterService.java new file mode 100644 index 00000000..88b4808d --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/InstanceUserFromClusterService.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.syncservice.cluster; + +import org.springframework.stereotype.Service; + +@Service +public class InstanceUserFromClusterService { + +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/RuntimeSyncFromClusterService.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/RuntimeSyncFromClusterService.java new file mode 100644 index 00000000..728ca17e --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/RuntimeSyncFromClusterService.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.syncservice.cluster; + +import org.apache.eventmesh.dashboard.common.model.metadata.RuntimeMetadata; +import org.apache.eventmesh.dashboard.common.model.remoting.runtime.GetRuntimeRequest; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.SyncDataService; +import org.apache.eventmesh.dashboard.console.service.cluster.ClusterService; +import org.apache.eventmesh.dashboard.core.meta.runtime.NacosRuntimeCore; +import org.apache.eventmesh.dashboard.service.remoting.MetaRemotingService; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +public class RuntimeSyncFromClusterService implements SyncDataService { + + private final MetaRemotingService metaRemotingService = new NacosRuntimeCore(); + + @Autowired + private ClusterService clusterDataService; + + @Override + public List getData() { + List requestList = new ArrayList<>(); + ConcurrentLinkedDeque runtimeMetadata = new ConcurrentLinkedDeque<>(); + clusterDataService.selectAll().forEach( + clusterEntity -> { + GetRuntimeRequest request = new GetRuntimeRequest(); + request.setRegistryAddress(clusterEntity.getRegistryAddress()); + requestList.add(request); + } + ); + if (requestList.isEmpty()) { + return new ArrayList<>(); + } + CountDownLatch countDownLatch = new CountDownLatch(requestList.size()); + ForkJoinPool taskThreadPool = new ForkJoinPool(requestList.size()); + + try { + taskThreadPool.submit(() -> + requestList.parallelStream().forEach(request -> { + metaRemotingService.getRuntime(request).getFuture() + .whenComplete((result, ex) -> { + if (Objects.isNull(result)) { + log.error("Error occurred while getting topics", ex); + } + runtimeMetadata.addAll(result.getRuntimeMetadataList()); + countDownLatch.countDown(); + }); + }) + ).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Error occurred while executing parallel stream", e); + } finally { + taskThreadPool.shutdown(); + } + + try { + countDownLatch.await(); + return new ArrayList<>(runtimeMetadata); + } catch (InterruptedException e) { + log.error("sync topic data from runtime failed", e); + return new ArrayList<>(); + } + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/TopicSyncFromClusterService.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/TopicSyncFromClusterService.java new file mode 100644 index 00000000..2c8b28fc --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/cluster/TopicSyncFromClusterService.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.metadata.syncservice.cluster; + +import org.apache.eventmesh.dashboard.common.model.metadata.TopicMetadata; +import org.apache.eventmesh.dashboard.common.model.remoting.topic.GetTopicsRequest; +import org.apache.eventmesh.dashboard.console.cache.RuntimeCache; +import org.apache.eventmesh.dashboard.console.entity.runtime.RuntimeEntity; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.SyncDataService; +import org.apache.eventmesh.dashboard.service.remoting.TopicRemotingService; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; + +import org.springframework.stereotype.Service; + +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +public class TopicSyncFromClusterService implements SyncDataService { + + @Setter + TopicRemotingService topicRemotingService; + + public List getData() { + ConcurrentLinkedDeque topicList = new ConcurrentLinkedDeque<>(); + Collection runtimeList = RuntimeCache.getInstance().getRuntimeList(); + + CountDownLatch countDownLatch = new CountDownLatch(runtimeList.size()); + ForkJoinPool taskThreadPool = new ForkJoinPool(runtimeList.size()); + try { + taskThreadPool.submit(() -> + runtimeList.parallelStream().forEach(runtimeEntity -> { + GetTopicsRequest getTopicsRequest = new GetTopicsRequest(); + getTopicsRequest.setRuntimeHost(runtimeEntity.getHost()); + getTopicsRequest.setRuntimePort(runtimeEntity.getPort()); + topicRemotingService.getAllTopics(getTopicsRequest).getGetTopicsResponseFuture() + .whenComplete((result, ex) -> { + if (Objects.isNull(result)) { + log.error("Error occurred while getting topics", ex); + } + result.getTopicMetadataList().forEach(topic -> { + topic.setRuntimeId(runtimeEntity.getId()); + topicList.add(topic); + }); + countDownLatch.countDown(); + }); + }) + ).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Error occurred while executing parallel stream", e); + } finally { + taskThreadPool.shutdown(); + } + + try { + countDownLatch.await(); + return new ArrayList<>(topicList); + } catch (InterruptedException e) { + log.error("sync topic data from runtime failed", e); + return new ArrayList<>(); + } + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/service/connection/ConnectionService.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/db/TopicSyncFromDbService.java similarity index 51% rename from eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/service/connection/ConnectionService.java rename to eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/db/TopicSyncFromDbService.java index 3faedd68..4cbd1737 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/service/connection/ConnectionService.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/metadata/syncservice/db/TopicSyncFromDbService.java @@ -15,10 +15,12 @@ * limitations under the License. */ -package org.apache.eventmesh.dashboard.console.service.connection; +package org.apache.eventmesh.dashboard.console.function.metadata.syncservice.db; -import org.apache.eventmesh.dashboard.console.entity.connection.ConnectionEntity; -import org.apache.eventmesh.dashboard.console.service.connection.impl.ConnectionDataServiceDatabaseImpl; + +import org.apache.eventmesh.dashboard.console.entity.topic.TopicEntity; +import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.SyncDataService; +import org.apache.eventmesh.dashboard.console.service.topic.TopicService; import java.util.List; @@ -29,22 +31,13 @@ @Slf4j @Service -public class ConnectionService { - @Autowired - ConnectionDataService metaConnectionService; +public class TopicSyncFromDbService implements SyncDataService { @Autowired - ConnectionDataServiceDatabaseImpl databaseConnectionService; - - public void syncConnection() { - try { - List connectionEntityList = metaConnectionService.getAllConnections(); - databaseConnectionService.replaceAllConnections(connectionEntityList); - } catch (Exception e) { - log.error("sync connection info from {} to {} failed for reason:{}.", - metaConnectionService.getClass().getSimpleName(), - databaseConnectionService.getClass().getSimpleName(), - e.getMessage()); - } + private TopicService topicDataService; + + @Override + public List getData() { + return topicDataService.selectAll(); } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/optration/TopicMetadataHandlerToClusterImpl.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/optration/TopicMetadataHandlerToClusterImpl.java new file mode 100644 index 00000000..6b89a58f --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/optration/TopicMetadataHandlerToClusterImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.optration; + +import org.apache.eventmesh.dashboard.common.model.metadata.TopicMetadata; +import org.apache.eventmesh.dashboard.common.model.remoting.topic.CreateTopicRequest; +import org.apache.eventmesh.dashboard.console.entity.topic.TopicEntity; +import org.apache.eventmesh.dashboard.console.function.metadata.handler.MetadataHandler; +import org.apache.eventmesh.dashboard.service.remoting.TopicRemotingService; + +import org.springframework.stereotype.Service; + +import lombok.Setter; + +@Service +public class TopicMetadataHandlerToClusterImpl implements MetadataHandler { + + @Setter + private TopicRemotingService topicRemotingService; + + @Override + public void addMetadata(TopicEntity meta) { + TopicMetadata topicMetadata = new TopicMetadata(); + topicMetadata.setStoreAddress(""); + topicMetadata.setConnectionUrl(""); + topicMetadata.setTopicName(meta.getTopicName()); + topicMetadata.setRuntimeId(0L); + topicMetadata.setStorageId(meta.getStorageId()); + topicMetadata.setRetentionMs(0L); + topicMetadata.setType(meta.getType()); + topicMetadata.setDescription(meta.getDescription()); + topicMetadata.setRegistryAddress(""); + topicMetadata.setClusterId(0L); + + topicRemotingService.createTopic(new CreateTopicRequest(topicMetadata)); + } + + @Override + public void deleteMetadata(TopicEntity meta) { + + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/client/ClientMapper.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/client/ClientMapper.java index 94e3eda2..a2692848 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/client/ClientMapper.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/client/ClientMapper.java @@ -65,10 +65,14 @@ public interface ClientMapper { + "`status`, `config_ids`, `description`) " + "VALUES (#{clusterId}, #{name}, #{platform}," + "#{language}, #{pid}, #{host}, #{port}, #{protocol}," - + "#{status}, #{configIds}, #{description})") - void insert(ClientEntity clientEntity); + + "#{status}, #{configIds}, #{description})" + + "ON DUPLICATE KEY UPDATE `status` = 1, `pid` = #{pid}, `config_ids` = #{configIds}, `host` = #{host}, `port` = #{port}") + Long insert(ClientEntity clientEntity); @Update("UPDATE `client` SET status = 0, end_time = NOW() WHERE id = #{id}") void deactivate(ClientEntity clientEntity); + @Update("UPDATE `client` SET status = 0, end_time = NOW() WHERE `host` = #{host} AND `port` = #{port}") + void deActiveByHostPort(ClientEntity clientEntity); + } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/cluster/ClusterMapper.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/cluster/ClusterMapper.java index 7e1c3af3..e9850e57 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/cluster/ClusterMapper.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/cluster/ClusterMapper.java @@ -41,10 +41,10 @@ public interface ClusterMapper { @Insert({ ""}) @@ -54,17 +54,18 @@ public interface ClusterMapper { @Select("SELECT * FROM cluster WHERE id=#{id} AND status=1") ClusterEntity selectClusterById(ClusterEntity cluster); - @Insert("INSERT INTO cluster (name, registry_name_list, bootstrap_servers, eventmesh_version, client_properties, " - + "jmx_properties, reg_properties, description, auth_type, run_state,store_type) VALUES (#{name},#{registryNameList}," + @Insert("INSERT INTO cluster (name, registry_address, bootstrap_servers, eventmesh_version, client_properties, " + + "jmx_properties, reg_properties, description, auth_type, run_state,store_type) VALUES (#{name},#{registryAddress}," + "#{bootstrapServers},#{eventmeshVersion},#{clientProperties},#{jmxProperties},#{regProperties},#{description},#{authType}," - + "#{runState},#{storeType})") + + "#{runState},#{storeType})" + + "ON DUPLICATE KEY UPDATE status = 1") @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") void addCluster(ClusterEntity cluster); @Update("UPDATE cluster SET name =#{name},reg_properties=#{regProperties},bootstrap_servers=#{bootstrapServers}," + "eventmesh_version=#{eventmeshVersion},client_properties=#{clientProperties},jmx_properties=#{jmxProperties}," + "reg_properties=#{regProperties},description=#{description},auth_type=#{authType},run_state=#{runState} ," - + "registry_name_list=#{registryNameList} WHERE id=#{id}") + + "registry_address=#{registryAddress} WHERE id=#{id}") void updateClusterById(ClusterEntity cluster); @Update("UPDATE cluster SET status=0 WHERE id=#{id}") diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/config/ConfigMapper.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/config/ConfigMapper.java index 838e95f8..2a6c9a1c 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/config/ConfigMapper.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/config/ConfigMapper.java @@ -61,6 +61,9 @@ public interface ConfigMapper { @Select("SELECT * FROM config WHERE status=1 AND is_default=0") List selectAll(); + @Select("SELECT * FROM config WHERE instance_type=#{instanceType} AND instance_id=#{instanceId}") + List selectConfigsByInstance(ConfigEntity configEntity); + @Insert({ ""}) void batchInsert(List connectionEntityList); diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/connector/ConnectorMapper.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/connector/ConnectorMapper.java index 5958b6a4..910884fa 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/connector/ConnectorMapper.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/connector/ConnectorMapper.java @@ -34,7 +34,7 @@ public interface ConnectorMapper { @Select("SELECT * FROM connector WHERE status=1") - ConnectorEntity selectAll(); + List selectAll(); @Select("SELECT * FROM connector WHERE id = #{id} AND status=1") ConnectorEntity selectById(ConnectorEntity connectorEntity); @@ -47,8 +47,9 @@ public interface ConnectorMapper { @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") @Insert("INSERT INTO connector (cluster_id,name, class_name, type, status, pod_state, config_ids, host, port) " - + "VALUES (#{clusterId}, #{name}, #{className}, #{type}, #{status}, #{podState}, #{configIds}, #{host}, #{port})") - void insert(ConnectorEntity connectorEntity); + + "VALUES (#{clusterId}, #{name}, #{className}, #{type}, #{status}, #{podState}, #{configIds}, #{host}, #{port})" + + "ON DUPLICATE KEY UPDATE status = 1, pod_state = #{podState}, config_ids = #{configIds}, host = #{host}, port = #{port}") + Long insert(ConnectorEntity connectorEntity); @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") @Insert({ @@ -66,7 +67,7 @@ public interface ConnectorMapper { void active(ConnectorEntity connectorEntity); @Update("UPDATE connector SET status = 0 WHERE id = #{id}") - void deactivate(ConnectorEntity connectorEntity); + void deActive(ConnectorEntity connectorEntity); @Update("UPDATE connector SET pod_state = #{podState} WHERE id = #{id}") void updatePodState(ConnectorEntity connectorEntity); diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/health/HealthCheckResultMapper.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/health/HealthCheckResultMapper.java index b14b1a16..59b5fb56 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/health/HealthCheckResultMapper.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/health/HealthCheckResultMapper.java @@ -61,22 +61,35 @@ List selectByClusterIdAndCreateTimeRange(@Param("cluste @Insert({ "" }) void batchInsert(List healthCheckResultEntityList); + @Insert({ + "" + }) + void insertNewChecks(List healthCheckResultEntityList); + @Update("UPDATE health_check_result SET state = #{state}, result_desc = #{resultDesc} WHERE id = #{id}") void update(HealthCheckResultEntity healthCheckResultEntity); @@ -92,10 +105,13 @@ List selectByClusterIdAndCreateTimeRange(@Param("cluste @Select({ "" }) diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/meta/MetaMapper.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/meta/MetaMapper.java index c3102724..6d4f8d6a 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/meta/MetaMapper.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/meta/MetaMapper.java @@ -40,11 +40,11 @@ public interface MetaMapper { ""}) @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") - void batchInsert(List metaEntities); + List batchInsert(List metaEntities); @Select("SELECT * FROM meta WHERE id = #{id}") MetaEntity selectById(MetaEntity metaEntity); @@ -54,8 +54,8 @@ public interface MetaMapper { @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") @Insert("INSERT INTO meta (name, type, version, cluster_id, host, port, role, username, params, status)" - + " VALUES ( #{name}, #{type}, #{version}, #{clusterId}, #{host}, #{port}, #{role}, #{username}, #{params}, #{status})") - void insert(MetaEntity metaEntity); + + " VALUES ( #{name}, #{type}, #{version}, #{clusterId}, #{host}, #{port}, #{role}, #{username}, #{params}, 1)") + Long insert(MetaEntity metaEntity); @Update("UPDATE meta SET status = 0 WHERE id = #{id}") void deactivate(MetaEntity metaEntity); diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/runtime/RuntimeMapper.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/runtime/RuntimeMapper.java index c6453309..c1e910f5 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/runtime/RuntimeMapper.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/runtime/RuntimeMapper.java @@ -44,14 +44,15 @@ public interface RuntimeMapper { ""}) @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") void batchInsert(List runtimeEntities); @Insert("INSERT INTO runtime (cluster_id, host, storage_cluster_id, port, jmx_port, start_timestamp, rack, status, " - + "endpoint_map) VALUES(#{clusterId},#{host},#{storageClusterId},#{port},#{jmxPort},#{startTimestamp},#{rack},#{status},#{endpointMap})") + + "endpoint_map) VALUES(#{clusterId},#{host},#{storageClusterId},#{port},#{jmxPort},NOW(),#{rack},#{status},#{endpointMap})" + + " ON DUPLICATE KEY UPDATE status=1,start_timestamp = now()") @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") void addRuntime(RuntimeEntity runtimeEntity); diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/storage/StoreMapper.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/storage/StoreMapper.java index 5e5ee418..2449a106 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/storage/StoreMapper.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/mapper/storage/StoreMapper.java @@ -39,6 +39,9 @@ public interface StoreMapper { @Select("SELECT * FROM store WHERE id=#{id} AND status=1") StoreEntity selectById(StoreEntity storeEntity); + @Select("SELECT * FROM store WHERE host=#{host} AND port=#{port} AND status=1 LIMIT 1") + StoreEntity selectByHostPort(StoreEntity storeEntity); + @Insert({ "