Skip to content

hstreamdb/hstreamdb-java

Repository files navigation

hstreamdb-java

Build Status Maven Central javadoc Snapshot Artifacts javadoc

This is the official Java client library for HStreamDB.

Please use the latest released version.

The latest release is v0.17.0, which requires HStreamDB v0.17.0 .

Content

Installation

The library artifact is published in Maven central, available at hstreamdb-java.

Maven

For Maven Users, the library can be included easily like this:

<dependencies>
  <dependency>
    <groupId>io.hstream</groupId>
    <artifactId>hstreamdb-java</artifactId>
    <version>0.17.0</version>
  </dependency>
</dependencies>

Gradle

For Gradle Users, the library can be included easily like this:

implementation 'io.hstream:hstreamdb-java:0.17.0'

Example Usage

Here we will show you some simple examples to use hstreamdb-client, you can find the complete examples in hstreamdb-java-examples, for more detail on introduction and usage, please check the guides.

Connect to HStreamDB

import io.hstream.*;

public class ConnectExample {
    public static void main(String[] args) throws Exception {
        final String serviceUrl = "localhost:6570";
        HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
        System.out.println("Connected");
        client.close();
    }
}

Work with Streams

// get a list of streams
for(Stream stream: client.listStreams()) {
  System.out.println(stream.getStreamName());
}


// create a new stream
client.createStream("test_stream");

// create a new stream with 5 replicas
client.createStream("test_stream", 5);


// delete a stream
client.deleteStream("test_stream");

Write Data to a Stream

Producer producer = client.newProducer().stream("test_stream").build();

// write raw records
Random random = new Random();
byte[] rawRecord = new byte[100];
random.nextBytes(rawRecord);
Record recordR = Record.newBuilder().rawRecord(rawRecord).build();
CompletableFuture<String> future = producer.write(recordR);

// write hRecords
HRecord hRecord = HRecord.newBuilder()
        .put("key1", 10)
        .put("key2", "hello")
        .put("key3", true)
        .build();
Record recordH = Record.newBuilder().hRecord(hRecord).build();
CompletableFuture<String> future = producer.write(recordH);

// buffered writes
BatchSetting batchSetting =
    BatchSetting.newBuilder()
        .recordCountLimit(100)
        .bytesLimit(4096)
        .ageLimit(100)
        .build();

FlowControlSetting flowControlSetting =
    FlowControlSetting.newBuilder()
        .bytesLimit(40960)
        .build();

BufferedProducer batchedProducer =
    client.newBufferedProducer().stream("test_stream")
            .batchSetting(batchSetting)
            .flowControlSetting(flowControlSetting)
            .build();

for(int i = 0; i < 1000; ++i) {
    random.nextBytes(rawRecord);
    Record recordB = Record.newBuilder().rawRecord(rawRecord).build();
    batchedProducer.write(recordB);
}
// flush and close batchedProducer
batchedProducer.close();

Please do not write both binary data and hrecord in one stream.

Consume Data from a Subscription

// first, create a subscription for the stream
Subscription subscription = 
    Subscription
        .newBuilder()
        .subscription("my_subscription")
        .stream("my_stream")
        .ackTimeoutSeconds(600)
        .build();
client.createSubscription(subscription);

// second, create a consumer attach to the subscription
Consumer consumer =
    client
        .newConsumer()
        .subscription("my_subscription")
        .rawRecordReceiver(
            ((receivedRawRecord, responder) -> {
                System.out.println(receivedRawRecord.getRecordId());
                responder.ack();
            }))
        .build();

// third, start the consumer
consumer.startAsync().awaitRunning();
System.out.println("the consumer is started");

Read Data from a Stream Shard

Reader reader =
    client
        .newReader()
        .readerId("my_readerId")
        .streamName("my_stream")
        .shardId("my_shardId")
        .build();


List<Record> records = reader.read(10).join();

reader.close();