Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update interactive queries API according to KIP-992 and bump to Kafka v3.8.1 #308

Merged
merged 14 commits into from
Feb 3, 2025
24 changes: 19 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ Kstreamplify adds extra features to Kafka Streams, simplifying development so yo
* [Uncaught Exception Handler](#uncaught-exception-handler)
* [Web Services](#web-services)
* [Topology](#topology)
* [State Stores](#state-stores)
* [Interactive Queries](#interactive-queries)
* [Kubernetes](#kubernetes)
* [TopicWithSerde API](#topicwithserde-api)
* [Declaration](#declaration)
* [Prefix](#prefix)
* [Remapping](#remapping)
* [Interactive Queries](#interactive-queries)
* [Interactive Queries](#interactive-queries-1)
* [Configuration](#configuration)
* [Services](#services)
* [Web Services](#web-services-1)
* [Hooks](#hooks)
* [On Start](#on-start)
* [Deduplication](#deduplication)
Expand Down Expand Up @@ -326,15 +327,17 @@ topology:
path: 'custom-topology'
```

### State Stores
### Interactive Queries

A list of endpoints to query the state stores of your Kafka Streams application is available.
It uses [interactive queries](https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html) and
handle state stores being on different Kafka Streams instances by providing an [RPC layer](https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html#adding-an-rpc-layer-to-your-application).

Here is the list of supported state store types:
- Key-Value store
- Timestamped Key-Value store
- Window store
- Timestamped Window store

Only state stores with String keys are supported.

Expand Down Expand Up @@ -465,7 +468,7 @@ kafka:
```

2. The value of a default environment variable named `APPLICATION_SERVER`.
3. `localhost`.
3. `localhost:<serverPort>`.

### Services

Expand All @@ -477,11 +480,22 @@ public class MyService {
@Autowired
KeyValueStoreService keyValueStoreService;

@Autowired
TimestampedKeyValueStoreService timestampedKeyValueStoreService;

@Autowired
WindowStoreService windowStoreService;

@Autowired
TimestampedWindowStoreService timestampedWindowStoreService;
}
```

### Web Services

The web services layer provides a set of endpoints to query the state stores of your Kafka Streams application.
Check the [Interactive Queries Web Services](#interactive-queries) section for more information.

## Hooks

Kstreamplify offers the flexibility to execute custom code through hooks.
Expand All @@ -508,7 +522,7 @@ and within a specified time frame.
All deduplication methods return a `KStream<String, ProcessingResult<V,V2>` so you can redirect the result to the
`TopologyErrorHandler#catchErrors()`.

**Note**: Only streams with String keys and Avro values are supported.
Only streams with String keys and Avro values are supported.

### By Key

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer;
import com.michelin.kstreamplify.service.KubernetesService;
import com.michelin.kstreamplify.service.TopologyService;
import com.michelin.kstreamplify.service.interactivequeries.KeyValueStoreService;
import com.michelin.kstreamplify.service.interactivequeries.WindowStoreService;
import com.michelin.kstreamplify.service.interactivequeries.keyvalue.KeyValueStoreService;
import com.michelin.kstreamplify.service.interactivequeries.keyvalue.TimestampedKeyValueStoreService;
import com.michelin.kstreamplify.service.interactivequeries.window.TimestampedWindowStoreService;
import com.michelin.kstreamplify.service.interactivequeries.window.WindowStoreService;
import com.michelin.kstreamplify.store.StreamsMetadata;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
Expand All @@ -60,14 +62,16 @@ public class KafkaStreamsHttpServer {
private static final String DEFAULT_STORE_PATH = "store";
private static final String DEFAULT_KEY_VALUE_STORE_PATH = "key-value";
private static final String DEFAULT_WINDOW_STORE_PATH = "window";
private static final String TIME_FROM_REQUEST_PARAM = "timeFrom";
private static final String TIME_TO_REQUEST_PARAM = "timeTo";
private static final String START_TIME_REQUEST_PARAM = "startTime";
private static final String END_TIME_REQUEST_PARAM = "endTime";
private final KafkaStreamsInitializer kafkaStreamsInitializer;
private final ObjectMapper objectMapper;
private final KubernetesService kubernetesService;
private final TopologyService topologyService;
private final KeyValueStoreService keyValueStoreService;
private final KeyValueStoreService keyValueService;
private final TimestampedKeyValueStoreService timestampedKeyValueService;
private final WindowStoreService windowStoreService;
private final TimestampedWindowStoreService timestampedWindowStoreService;

/**
* The HTTP server.
Expand All @@ -84,8 +88,10 @@ public KafkaStreamsHttpServer(KafkaStreamsInitializer kafkaStreamsInitializer) {
this.objectMapper = new ObjectMapper();
this.kubernetesService = new KubernetesService(kafkaStreamsInitializer);
this.topologyService = new TopologyService(kafkaStreamsInitializer);
this.keyValueStoreService = new KeyValueStoreService(kafkaStreamsInitializer);
this.keyValueService = new KeyValueStoreService(kafkaStreamsInitializer);
this.timestampedKeyValueService = new TimestampedKeyValueStoreService(kafkaStreamsInitializer);
this.windowStoreService = new WindowStoreService(kafkaStreamsInitializer);
this.timestampedWindowStoreService = new TimestampedWindowStoreService(kafkaStreamsInitializer);
}

/**
Expand Down Expand Up @@ -173,13 +179,13 @@ private void createStoreEndpoints() {

private Object getResponseForStoreEndpoints(HttpExchange exchange) {
if (exchange.getRequestURI().toString().equals("/" + DEFAULT_STORE_PATH)) {
return keyValueStoreService.getStateStores();
return keyValueService.getStateStores();
}

String store;
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH + "/metadata/.*")) {
store = parsePathParam(exchange, 3);
return keyValueStoreService.getStreamsMetadataForStore(store)
return keyValueService.getStreamsMetadataForStore(store)
.stream()
.map(streamsMetadata -> new StreamsMetadata(
streamsMetadata.stateStoreNames(),
Expand All @@ -188,57 +194,131 @@ private Object getResponseForStoreEndpoints(HttpExchange exchange) {
.toList();
}

// Get all on local host for key-value store
if (exchange.getRequestURI().toString()
.matches("/" + DEFAULT_STORE_PATH + "/" + DEFAULT_KEY_VALUE_STORE_PATH + "/local/.*")) {
store = parsePathParam(exchange, 4);
return keyValueStoreService.getAllOnLocalHost(store);
return keyValueService.getAllOnLocalInstance(store);
}

// Get all on local host for timestamped key-value store
if (exchange.getRequestURI().toString()
.matches("/" + DEFAULT_STORE_PATH + "/" + DEFAULT_KEY_VALUE_STORE_PATH + "/timestamped/local/.*")) {
store = parsePathParam(exchange, 5);
return timestampedKeyValueService.getAllOnLocalInstance(store);
}

// Get all on local host for window store
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_WINDOW_STORE_PATH + "/local/.*")) {
store = parsePathParam(exchange, 4);
Instant instantFrom = parseRequestParam(exchange, TIME_FROM_REQUEST_PARAM)
Instant instantFrom = parseRequestParam(exchange, START_TIME_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.EPOCH);

Instant instantTo = parseRequestParam(exchange, END_TIME_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.now());
return windowStoreService.getAllOnLocalInstance(store, instantFrom, instantTo);
}

// Get all on local host for timestamped window store
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_WINDOW_STORE_PATH + "/timestamped/local/.*")) {
store = parsePathParam(exchange, 5);
Instant instantFrom = parseRequestParam(exchange, START_TIME_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.EPOCH);

Instant instantTo = parseRequestParam(exchange, TIME_TO_REQUEST_PARAM)
Instant instantTo = parseRequestParam(exchange, END_TIME_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.now());
return windowStoreService.getAllOnLocalHost(store, instantFrom, instantTo);
return timestampedWindowStoreService.getAllOnLocalInstance(store, instantFrom, instantTo);
}

store = parsePathParam(exchange, 3);
// Get by key for timestamped key-value store
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_KEY_VALUE_STORE_PATH + "/timestamped/.*/.*")) {
store = parsePathParam(exchange, 4);
String key = parsePathParam(exchange, 5);
return timestampedKeyValueService.getByKey(store, key);
}

// Get all for timestamped key-value store
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_KEY_VALUE_STORE_PATH + "/timestamped/.*")) {
store = parsePathParam(exchange, 4);
return timestampedKeyValueService.getAll(store);
}

// Get by key for key-value store
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_KEY_VALUE_STORE_PATH + "/.*/.*")) {
store = parsePathParam(exchange, 3);
String key = parsePathParam(exchange, 4);
return keyValueStoreService.getByKey(store, key);
return keyValueService.getByKey(store, key);
}

// Get all for key-value store
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_KEY_VALUE_STORE_PATH + "/.*")) {
return keyValueStoreService.getAll(store);
store = parsePathParam(exchange, 3);
return keyValueService.getAll(store);
}

// Get by key for timestamped window store
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_WINDOW_STORE_PATH + "/timestamped/.*/.*")) {
store = parsePathParam(exchange, 4);
String key = parsePathParam(exchange, 5);
Instant instantFrom = parseRequestParam(exchange, START_TIME_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.EPOCH);

Instant instantTo = parseRequestParam(exchange, END_TIME_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.now());
return timestampedWindowStoreService.getByKey(store, key, instantFrom, instantTo);
}

// Get all for timestamped window store
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_WINDOW_STORE_PATH + "/timestamped/.*")) {
store = parsePathParam(exchange, 4);
Instant instantFrom = parseRequestParam(exchange, START_TIME_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.EPOCH);

Instant instantTo = parseRequestParam(exchange, END_TIME_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.now());
return timestampedWindowStoreService.getAll(store, instantFrom, instantTo);
}

// Get by key for window store
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_WINDOW_STORE_PATH + "/.*/.*")) {
store = parsePathParam(exchange, 3);
String key = parsePathParam(exchange, 4);
Instant instantFrom = parseRequestParam(exchange, TIME_FROM_REQUEST_PARAM)
Instant instantFrom = parseRequestParam(exchange, START_TIME_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.EPOCH);

Instant instantTo = parseRequestParam(exchange, TIME_TO_REQUEST_PARAM)
Instant instantTo = parseRequestParam(exchange, END_TIME_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.now());
return windowStoreService.getByKey(store, key, instantFrom, instantTo);
}

// Get all for window store
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_WINDOW_STORE_PATH + "/.*")) {
Instant instantFrom = parseRequestParam(exchange, TIME_FROM_REQUEST_PARAM)
store = parsePathParam(exchange, 3);
Instant instantFrom = parseRequestParam(exchange, START_TIME_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.EPOCH);

Instant instantTo = parseRequestParam(exchange, TIME_TO_REQUEST_PARAM)
Instant instantTo = parseRequestParam(exchange, END_TIME_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.now());
return windowStoreService.getAll(store, instantFrom, instantTo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
Expand All @@ -50,21 +49,19 @@
*/
@Slf4j
@AllArgsConstructor
abstract class InteractiveQueriesService {
public abstract class CommonStoreService {
private static final String STREAMS_NOT_STARTED = "Cannot process request while instance is in %s state";
protected static final String UNKNOWN_STATE_STORE = "State store %s not found";
private final ObjectMapper objectMapper = new ObjectMapper();
private final HttpClient httpClient;

@Getter
protected final KafkaStreamsInitializer kafkaStreamsInitializer;

/**
* Constructor.
*
* @param kafkaStreamsInitializer The Kafka Streams initializer
*/
protected InteractiveQueriesService(KafkaStreamsInitializer kafkaStreamsInitializer) {
protected CommonStoreService(KafkaStreamsInitializer kafkaStreamsInitializer) {
this.kafkaStreamsInitializer = kafkaStreamsInitializer;
this.httpClient = HttpClient.newHttpClient();
}
Expand All @@ -77,7 +74,7 @@ protected InteractiveQueriesService(KafkaStreamsInitializer kafkaStreamsInitiali
public Set<String> getStateStores() {
checkStreamsRunning();

final Collection<org.apache.kafka.streams.StreamsMetadata> metadata = kafkaStreamsInitializer
final Collection<StreamsMetadata> metadata = kafkaStreamsInitializer
.getKafkaStreams()
.metadataForAllStreamsClients();

Expand Down Expand Up @@ -203,4 +200,11 @@ private void checkStreamsRunning() {
throw new StreamsNotStartedException(String.format(STREAMS_NOT_STARTED, state));
}
}

/**
* The path for RPC.
*
* @return The path
*/
protected abstract String path();
}
Loading