Skip to content

Commit e767cb1

Browse files
authored
BE: issue #1090 added topics connect endpoint (#1435)
1 parent 0af059b commit e767cb1

File tree

11 files changed

+258
-30
lines changed

11 files changed

+258
-30
lines changed

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.kafbat.ui.api.TopicsApi;
1212
import io.kafbat.ui.config.ClustersProperties;
1313
import io.kafbat.ui.mapper.ClusterMapper;
14+
import io.kafbat.ui.model.FullConnectorInfoDTO;
1415
import io.kafbat.ui.model.InternalTopic;
1516
import io.kafbat.ui.model.InternalTopicConfig;
1617
import io.kafbat.ui.model.PartitionsIncreaseDTO;
@@ -28,6 +29,8 @@
2829
import io.kafbat.ui.model.TopicUpdateDTO;
2930
import io.kafbat.ui.model.TopicsResponseDTO;
3031
import io.kafbat.ui.model.rbac.AccessContext;
32+
import io.kafbat.ui.model.rbac.permission.ConnectAction;
33+
import io.kafbat.ui.service.KafkaConnectService;
3134
import io.kafbat.ui.service.TopicsService;
3235
import io.kafbat.ui.service.analyze.TopicAnalysisService;
3336
import io.kafbat.ui.service.mcp.McpTool;
@@ -55,6 +58,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M
5558
private final TopicAnalysisService topicAnalysisService;
5659
private final ClusterMapper clusterMapper;
5760
private final ClustersProperties clustersProperties;
61+
private final KafkaConnectService kafkaConnectService;
5862

5963
@Override
6064
public Mono<ResponseEntity<TopicDTO>> createTopic(
@@ -370,4 +374,23 @@ private Comparator<InternalTopic> getComparatorForTopic(
370374
default -> defaultComparator;
371375
};
372376
}
377+
378+
@Override
379+
public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getTopicConnectors(String clusterName,
380+
String topicName,
381+
ServerWebExchange exchange) {
382+
var context = AccessContext.builder()
383+
.cluster(clusterName)
384+
.topicActions(topicName, VIEW)
385+
.operationName("getTopicConnectors")
386+
.operationParams(topicName)
387+
.build();
388+
389+
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getTopicConnectors(getCluster(clusterName), topicName)
390+
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName));
391+
392+
return validateAccess(context)
393+
.then(Mono.just(ResponseEntity.ok(job)))
394+
.doOnEach(sig -> audit(context, sig));
395+
}
373396
}

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.kafbat.ui.model.TaskIdDTO;
1919
import io.kafbat.ui.model.TaskStatusDTO;
2020
import io.kafbat.ui.model.connect.InternalConnectorInfo;
21+
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.Objects;
@@ -31,6 +32,12 @@
3132
public interface KafkaConnectMapper {
3233
NewConnector toClient(io.kafbat.ui.model.NewConnectorDTO newConnector);
3334

35+
default ClusterInfo toClient(KafkaConnectState state) {
36+
ClusterInfo clusterInfo = new ClusterInfo();
37+
clusterInfo.setVersion(state.getVersion());
38+
return clusterInfo;
39+
}
40+
3441
@Mapping(target = "status", ignore = true)
3542
@Mapping(target = "connect", ignore = true)
3643
ConnectorDTO fromClient(io.kafbat.ui.connect.model.Connector connector);
@@ -153,4 +160,23 @@ default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo
153160
.tasksCount(tasks.size())
154161
.failedTasksCount(failedTasksCount);
155162
}
163+
164+
default KafkaConnectState toScrapeState(ConnectDTO connect, List<InternalConnectorInfo> connectors) {
165+
return KafkaConnectState.builder()
166+
.name(connect.getName())
167+
.version(connect.getVersion().orElse("Unknown"))
168+
.connectors(connectors.stream().map(this::toScrapeState).toList())
169+
.build();
170+
}
171+
172+
default KafkaConnectState.ConnectorState toScrapeState(InternalConnectorInfo connector) {
173+
return new KafkaConnectState.ConnectorState(
174+
connector.getConnector().getName(),
175+
connector.getConnector().getType(),
176+
connector.getConnector().getStatus(),
177+
connector.getTopics()
178+
);
179+
}
180+
181+
156182
}

api/src/main/java/io/kafbat/ui/model/Statistics.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package io.kafbat.ui.model;
22

33
import io.kafbat.ui.service.ReactiveAdminClient;
4+
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
45
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
56
import java.util.List;
7+
import java.util.Map;
68
import java.util.function.UnaryOperator;
79
import java.util.stream.Stream;
810
import lombok.Builder;
@@ -19,6 +21,7 @@ public class Statistics implements AutoCloseable {
1921
ReactiveAdminClient.ClusterDescription clusterDescription;
2022
Metrics metrics;
2123
ScrapedClusterState clusterState;
24+
Map<String, KafkaConnectState> connectStates;
2225

2326
public static Statistics empty() {
2427
return builder()
@@ -28,6 +31,7 @@ public static Statistics empty() {
2831
.clusterDescription(ReactiveAdminClient.ClusterDescription.empty())
2932
.metrics(Metrics.empty())
3033
.clusterState(ScrapedClusterState.empty())
34+
.connectStates(Map.of())
3135
.build();
3236
}
3337

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package io.kafbat.ui.service;
22

3-
import com.github.benmanes.caffeine.cache.AsyncCache;
4-
import com.github.benmanes.caffeine.cache.Caffeine;
53
import io.kafbat.ui.config.ClustersProperties;
64
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
75
import io.kafbat.ui.connect.model.ClusterInfo;
@@ -25,12 +23,15 @@
2523
import io.kafbat.ui.model.FullConnectorInfoDTO;
2624
import io.kafbat.ui.model.KafkaCluster;
2725
import io.kafbat.ui.model.NewConnectorDTO;
26+
import io.kafbat.ui.model.Statistics;
2827
import io.kafbat.ui.model.TaskDTO;
2928
import io.kafbat.ui.model.TaskIdDTO;
3029
import io.kafbat.ui.model.connect.InternalConnectorInfo;
3130
import io.kafbat.ui.service.index.KafkaConnectNgramFilter;
31+
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
3232
import io.kafbat.ui.util.ReactiveFailover;
3333
import jakarta.validation.Valid;
34+
import java.util.HashMap;
3435
import java.util.List;
3536
import java.util.Map;
3637
import java.util.Optional;
@@ -49,17 +50,16 @@ public class KafkaConnectService {
4950
private final KafkaConnectMapper kafkaConnectMapper;
5051
private final KafkaConfigSanitizer kafkaConfigSanitizer;
5152
private final ClustersProperties clustersProperties;
52-
private final AsyncCache<String, ClusterInfo> cacheClusterInfo;
53+
private final StatisticsCache statisticsCache;
5354

5455
public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
5556
KafkaConfigSanitizer kafkaConfigSanitizer,
56-
ClustersProperties clustersProperties) {
57+
ClustersProperties clustersProperties,
58+
StatisticsCache statisticsCache) {
5759
this.kafkaConnectMapper = kafkaConnectMapper;
5860
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
5961
this.clustersProperties = clustersProperties;
60-
this.cacheClusterInfo = Caffeine.newBuilder()
61-
.expireAfterWrite(clustersProperties.getCache().getConnectClusterCacheExpiry())
62-
.buildAsync();
62+
this.statisticsCache = statisticsCache;
6363
}
6464

6565
public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
@@ -89,14 +89,17 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
8989
}
9090
}
9191

92-
private Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectName) {
93-
return Mono.fromFuture(cacheClusterInfo.get(connectName, (t, e) ->
94-
api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo)
95-
.onErrorResume(th -> {
96-
log.error("Error on collecting cluster info", th);
97-
return Mono.just(new ClusterInfo());
98-
}).toFuture()
99-
));
92+
public Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectName) {
93+
KafkaConnectState state = statisticsCache.get(cluster).getConnectStates().get(connectName);
94+
if (state != null) {
95+
return Mono.just(kafkaConnectMapper.toClient(state));
96+
} else {
97+
return api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo)
98+
.onErrorResume(th -> {
99+
log.error("Error on collecting cluster info", th);
100+
return Mono.just(new ClusterInfo());
101+
});
102+
}
100103
}
101104

102105
private Flux<InternalConnectorInfo> getConnectConnectors(
@@ -134,6 +137,33 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
134137
.flatMapMany(Flux::fromIterable);
135138
}
136139

140+
public Flux<KafkaConnectState> scrapeAllConnects(KafkaCluster cluster) {
141+
142+
Optional<List<ClustersProperties.@Valid ConnectCluster>> connectClusters =
143+
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect());
144+
145+
return Flux.fromIterable(connectClusters.orElse(List.of())).flatMap(c ->
146+
getClusterInfo(cluster, c.getName()).map(info ->
147+
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false)
148+
).onErrorResume((t) -> Mono.just(new ConnectDTO().name(c.getName())))
149+
).flatMap(connect ->
150+
getConnectorsWithErrorsSuppress(cluster, connect.getName())
151+
.onErrorResume(t -> Mono.just(Map.of()))
152+
.flatMapMany(connectors ->
153+
Flux.fromIterable(connectors.entrySet())
154+
.flatMap(e ->
155+
getConnectorTopics(
156+
cluster,
157+
connect.getName(),
158+
e.getKey()
159+
).map(topics ->
160+
kafkaConnectMapper.fromClient(connect.getName(), e.getValue(), topics.getTopics())
161+
)
162+
)
163+
).collectList().map(connectors -> kafkaConnectMapper.toScrapeState(connect, connectors))
164+
);
165+
}
166+
137167
private List<FullConnectorInfoDTO> filterConnectors(
138168
List<FullConnectorInfoDTO> connectors,
139169
String search,
@@ -349,4 +379,30 @@ public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName
349379
.formatted(connectorName, connectName));
350380
});
351381
}
382+
383+
public Flux<FullConnectorInfoDTO> getTopicConnectors(KafkaCluster cluster, String topicName) {
384+
Map<String, KafkaConnectState> connectStates = this.statisticsCache.get(cluster).getConnectStates();
385+
Map<String, List<String>> filteredConnects = new HashMap<>();
386+
for (Map.Entry<String, KafkaConnectState> entry : connectStates.entrySet()) {
387+
List<KafkaConnectState.ConnectorState> connectors =
388+
entry.getValue().getConnectors().stream().filter(c -> c.topics().contains(topicName)).toList();
389+
if (!connectors.isEmpty()) {
390+
filteredConnects.put(entry.getKey(), connectors.stream().map(KafkaConnectState.ConnectorState::name).toList());
391+
}
392+
}
393+
394+
return Flux.fromIterable(filteredConnects.entrySet())
395+
.flatMap(entry ->
396+
getConnectorsWithErrorsSuppress(cluster, entry.getKey())
397+
.map(connectors ->
398+
connectors.entrySet()
399+
.stream()
400+
.filter(c -> entry.getValue().contains(c.getKey()))
401+
.map(c -> kafkaConnectMapper.fromClient(entry.getKey(), c.getValue(), null))
402+
.map(kafkaConnectMapper::fullConnectorInfo)
403+
.toList()
404+
)
405+
).flatMap(Flux::fromIterable);
406+
407+
}
352408
}

api/src/main/java/io/kafbat/ui/service/StatisticsService.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import io.kafbat.ui.model.Metrics;
99
import io.kafbat.ui.model.ServerStatusDTO;
1010
import io.kafbat.ui.model.Statistics;
11+
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
1112
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
1213
import java.util.List;
14+
import java.util.stream.Collectors;
1315
import lombok.RequiredArgsConstructor;
1416
import lombok.extern.slf4j.Slf4j;
1517
import org.springframework.stereotype.Service;
@@ -21,6 +23,7 @@
2123
public class StatisticsService {
2224

2325
private final AdminClientService adminClientService;
26+
private final KafkaConnectService kafkaConnectService;
2427
private final FeatureService featureService;
2528
private final StatisticsCache cache;
2629
private final ClustersProperties clustersProperties;
@@ -38,19 +41,22 @@ private Mono<Statistics> getStatistics(KafkaCluster cluster) {
3841
.then(
3942
Mono.zip(
4043
featureService.getAvailableFeatures(ac, cluster, description),
41-
loadClusterState(description, ac)
44+
loadClusterState(description, ac),
45+
loadKafkaConnects(cluster)
4246
).flatMap(t ->
4347
scrapeMetrics(cluster, t.getT2(), description)
44-
.map(metrics -> createStats(description, t.getT1(), t.getT2(), metrics, ac)))))
45-
.doOnError(e ->
48+
.map(metrics -> createStats(description, t.getT1(), t.getT2(), t.getT3(), metrics, ac))
49+
)
50+
)
51+
).doOnError(e ->
4652
log.error("Failed to collect cluster {} info", cluster.getName(), e))
4753
.onErrorResume(t -> Mono.just(Statistics.statsUpdateError(t))));
4854
}
4955

5056
private Statistics createStats(ClusterDescription description,
5157
List<ClusterFeature> features,
5258
ScrapedClusterState scrapedClusterState,
53-
Metrics metrics,
59+
List<KafkaConnectState> connects, Metrics metrics,
5460
ReactiveAdminClient ac) {
5561
return Statistics.builder()
5662
.status(ServerStatusDTO.ONLINE)
@@ -59,6 +65,11 @@ private Statistics createStats(ClusterDescription description,
5965
.metrics(metrics)
6066
.features(features)
6167
.clusterState(scrapedClusterState)
68+
.connectStates(
69+
connects.stream().collect(
70+
Collectors.toMap(KafkaConnectState::getName, c -> c)
71+
)
72+
)
6273
.build();
6374
}
6475

@@ -74,4 +85,8 @@ private Mono<Metrics> scrapeMetrics(KafkaCluster cluster,
7485
.scrape(clusterState, clusterDescription.getNodes());
7586
}
7687

88+
private Mono<List<KafkaConnectState>> loadKafkaConnects(KafkaCluster cluster) {
89+
return kafkaConnectService.scrapeAllConnects(cluster).collectList();
90+
}
91+
7792
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.kafbat.ui.service.metrics.scrape;
2+
3+
import io.kafbat.ui.model.ConnectorStatusDTO;
4+
import io.kafbat.ui.model.ConnectorTypeDTO;
5+
import java.time.Instant;
6+
import java.util.List;
7+
import lombok.Builder;
8+
import lombok.RequiredArgsConstructor;
9+
import lombok.Value;
10+
11+
@Builder(toBuilder = true)
12+
@RequiredArgsConstructor
13+
@Value
14+
public class KafkaConnectState {
15+
Instant scrapeFinishedAt;
16+
String name;
17+
String version;
18+
List<ConnectorState> connectors;
19+
20+
public record ConnectorState(String name,
21+
ConnectorTypeDTO connectorType,
22+
ConnectorStatusDTO status,
23+
List<String> topics) {}
24+
}

0 commit comments

Comments
 (0)