Skip to content

Commit d285a69

Browse files
authored
BE: issue 1090 Added topics info to connector endpoint (#1442)
1 parent 400e56c commit d285a69

File tree

4 files changed

+18
-4
lines changed

4 files changed

+18
-4
lines changed

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
import io.kafbat.ui.config.ClustersProperties;
44
import io.kafbat.ui.connect.model.ClusterInfo;
5+
import io.kafbat.ui.connect.model.Connector;
56
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
67
import io.kafbat.ui.connect.model.ConnectorTask;
8+
import io.kafbat.ui.connect.model.ConnectorTopics;
79
import io.kafbat.ui.connect.model.ExpandedConnector;
810
import io.kafbat.ui.connect.model.NewConnector;
911
import io.kafbat.ui.model.ConnectDTO;
@@ -42,6 +44,14 @@ default ClusterInfo toClient(KafkaConnectState state) {
4244
@Mapping(target = "connect", ignore = true)
4345
ConnectorDTO fromClient(io.kafbat.ui.connect.model.Connector connector);
4446

47+
default ConnectorDTO fromClient(Connector connector, ConnectorTopics topics) {
48+
ConnectorDTO connectorDto = this.fromClient(connector);
49+
if (topics != null) {
50+
return connectorDto.topics(topics.getTopics());
51+
}
52+
return connectorDto;
53+
}
54+
4555
ConnectorStatusDTO fromClient(ConnectorStatusConnector connectorStatus);
4656

4757
@Mapping(target = "status", ignore = true)
@@ -177,6 +187,4 @@ default KafkaConnectState.ConnectorState toScrapeState(InternalConnectorInfo con
177187
connector.getTopics()
178188
);
179189
}
180-
181-
182190
}

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,9 @@ private Mono<Boolean> connectorExists(KafkaCluster cluster, String connectName,
226226
public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName,
227227
String connectorName) {
228228
return api(cluster, connectName)
229-
.mono(client -> client.getConnector(connectorName)
230-
.map(kafkaConnectMapper::fromClient)
229+
.mono(client ->
230+
Mono.zip(client.getConnector(connectorName), getConnectorTopics(cluster, connectName, connectorName))
231+
.map(t -> kafkaConnectMapper.fromClient(t.getT1(), t.getT2()))
231232
.flatMap(connector ->
232233
client.getConnectorStatus(connector.getName())
233234
// status request can return 404 if tasks not assigned yet

contract-typespec/api/kafka-connect.tsp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ model Connector {
225225
type: ConnectorType;
226226
status: ConnectorStatus;
227227
connect: string;
228+
topics?: string[];
228229
}
229230

230231
enum ConnectorAction {

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3674,6 +3674,10 @@ components:
36743674
$ref: '#/components/schemas/ConnectorStatus'
36753675
connect:
36763676
type: string
3677+
topics:
3678+
type: array
3679+
items:
3680+
type: string
36773681
required:
36783682
- type
36793683
- status

0 commit comments

Comments
 (0)