{ + /** + * Timestamp represents the time of the current data point, in milliseconds. + *
+ * When to set: + *
{@code
+ * // use `default` group
+ * client = new BanyanDBClient("127.0.0.1", 17912);
+ * // to send any request, a connection to the server must be estabilished
+ * client.connect();
+ * }
+ */
+@Slf4j
+public class BanyanDBClient implements Closeable {
+ public static final ZonedDateTime DEFAULT_EXPIRE_AT = ZonedDateTime.of(2099, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
+ private final String[] targets;
+ /**
+ * Options for server connection.
+ */
+ @Getter
+ private final Options options;
+ /**
+ * gRPC connection.
+ */
+ @Getter
+ private volatile Channel channel;
+ /**
+ * gRPC client stub
+ */
+ @Getter
+ private StreamServiceGrpc.StreamServiceStub streamServiceStub;
+ /**
+ * gRPC client stub
+ */
+ @Getter
+ private MeasureServiceGrpc.MeasureServiceStub measureServiceStub;
+ /**
+ * gRPC client stub
+ */
+ @Getter
+ private TraceServiceGrpc.TraceServiceStub traceServiceStub;
+ /**
+ * gRPC future stub.
+ */
+ @Getter
+ private StreamServiceGrpc.StreamServiceBlockingStub streamServiceBlockingStub;
+ /**
+ * gRPC future stub.
+ */
+ @Getter
+ private MeasureServiceGrpc.MeasureServiceBlockingStub measureServiceBlockingStub;
+ /**
+ * gRPC future stub.
+ */
+ @Getter
+ private TraceServiceGrpc.TraceServiceBlockingStub traceServiceBlockingStub;
+ /**
+ * The connection status.
+ */
+ private volatile boolean isConnected = false;
+ /**
+ * A lock to control the race condition in establishing and disconnecting network connection.
+ */
+ private final ReentrantLock connectionEstablishLock;
+
+ /**
+ * Create a BanyanDB client instance with a default options.
+ *
+ * @param targets server targets
+ */
+ public BanyanDBClient(String... targets) {
+ this(targets, new Options());
+ }
+
+ /**
+ * Create a BanyanDB client instance with a customized options.
+ *
+ * @param targets server targets
+ * @param options customized options
+ */
+ public BanyanDBClient(String[] targets, Options options) {
+ String[] tt = Preconditions.checkNotNull(targets, "targets");
+ checkState(tt.length > 0, "targets' size must be more than 1");
+ tt = Arrays.stream(tt).filter(t -> !Strings.isNullOrEmpty(t)).toArray(size -> new String[size]);
+ checkState(tt.length > 0, "valid targets' size must be more than 1");
+ this.targets = tt;
+ this.options = options;
+ this.connectionEstablishLock = new ReentrantLock();
+ }
+
+ /**
+ * Construct a connection to the server.
+ *
+ * @throws IOException thrown if fail to create a connection
+ */
+ public void connect() throws IOException {
+ connectionEstablishLock.lock();
+ try {
+ if (!isConnected) {
+ URI[] addresses = new URI[this.targets.length];
+ for (int i = 0; i < this.targets.length; i++) {
+ addresses[i] = URI.create("//" + this.targets[i]);
+ }
+ Channel rawChannel = ChannelManager.create(this.options.buildChannelManagerSettings(),
+ new DefaultChannelFactory(addresses, this.options));
+ Channel interceptedChannel = rawChannel;
+ // register auth interceptor
+ String username = options.getUsername();
+ String password = options.getPassword();
+ if (StringUtil.isNotBlank(username) && StringUtil.isNotBlank(password)) {
+ interceptedChannel = ClientInterceptors.intercept(rawChannel,
+ new AuthInterceptor(username, password));
+ }
+ // Ensure this.channel is assigned only once.
+ this.channel = interceptedChannel;
+ streamServiceBlockingStub = StreamServiceGrpc.newBlockingStub(this.channel);
+ measureServiceBlockingStub = MeasureServiceGrpc.newBlockingStub(this.channel);
+ traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(this.channel);
+ streamServiceStub = StreamServiceGrpc.newStub(this.channel);
+ measureServiceStub = MeasureServiceGrpc.newStub(this.channel);
+ traceServiceStub = TraceServiceGrpc.newStub(this.channel);
+ isConnected = true;
+ }
+ } finally {
+ connectionEstablishLock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ void connect(Channel channel) {
+ connectionEstablishLock.lock();
+ try {
+ if (!isConnected) {
+ this.channel = channel;
+ streamServiceBlockingStub = StreamServiceGrpc.newBlockingStub(this.channel);
+ measureServiceBlockingStub = MeasureServiceGrpc.newBlockingStub(this.channel);
+ traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(this.channel);
+ streamServiceStub = StreamServiceGrpc.newStub(this.channel);
+ measureServiceStub = MeasureServiceGrpc.newStub(this.channel);
+ traceServiceStub = TraceServiceGrpc.newStub(this.channel);
+ isConnected = true;
+ }
+ } finally {
+ connectionEstablishLock.unlock();
+ }
+ }
+
+ /**
+ * Build a MeasureWrite request.
+ *
+ * @param group the group of the measure
+ * @param name the name of the measure
+ * @param timestamp the timestamp of the measure
+ * @return the request to be built
+ */
+ public MeasureWrite createMeasureWrite(String group, String name, long timestamp) throws BanyanDBException {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
+ return new MeasureWrite(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build(), timestamp);
+ }
+
+ /**
+ * Build a StreamWrite request.
+ *
+ * @param group the group of the stream
+ * @param name the name of the stream
+ * @param elementId the primary key of the stream
+ * @return the request to be built
+ */
+ public StreamWrite createStreamWrite(String group, String name, final String elementId) throws BanyanDBException {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
+ return new StreamWrite(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build(), elementId);
+ }
+
+ /**
+ * Build a TraceWrite request without initial timestamp.
+ *
+ * @param group the group of the trace
+ * @param name the name of the trace
+ * @return the request to be built
+ */
+ public TraceWrite createTraceWrite(String group, String name) throws BanyanDBException {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
+ return new TraceWrite(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build());
+ }
+
+ /**
+ * Query streams according to given conditions
+ *
+ * @param streamQuery condition for query
+ * @return hint streams.
+ */
+ public StreamQueryResponse query(StreamQuery streamQuery) throws BanyanDBException {
+ checkState(this.streamServiceStub != null, "stream service is null");
+
+ for (String group : streamQuery.groups) {
+ final BanyandbStream.QueryResponse response =
+ HandleExceptionsWith.callAndTranslateApiException(() ->
+ this.streamServiceBlockingStub
+ .withDeadlineAfter(
+ this.getOptions().getDeadline(),
+ TimeUnit.SECONDS
+ )
+ .query(streamQuery.build()));
+ return new StreamQueryResponse(response);
+ }
+ throw new RuntimeException("No metadata found for the query");
+ }
+
+ /**
+ * Query TopN according to given conditions
+ *
+ * @param topNQuery condition for query
+ * @return hint topN.
+ */
+ public TopNQueryResponse query(TopNQuery topNQuery) throws BanyanDBException {
+ checkState(this.measureServiceStub != null, "measure service is null");
+
+ final BanyandbMeasure.TopNResponse response = HandleExceptionsWith.callAndTranslateApiException(() ->
+ this.measureServiceBlockingStub
+ .withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
+ .topN(topNQuery.build()));
+ return new TopNQueryResponse(response);
+ }
+
+ /**
+ * Query measures according to given conditions
+ *
+ * @param measureQuery condition for query
+ * @return hint measures.
+ */
+ public MeasureQueryResponse query(MeasureQuery measureQuery) throws BanyanDBException {
+ checkState(this.streamServiceStub != null, "measure service is null");
+ for (String group : measureQuery.groups) {
+ final BanyandbMeasure.QueryResponse response =
+ HandleExceptionsWith.callAndTranslateApiException(() ->
+ this.measureServiceBlockingStub
+ .withDeadlineAfter(
+ this.getOptions()
+ .getDeadline(),
+ TimeUnit.SECONDS
+ )
+ .query(
+ measureQuery.build()));
+ return new MeasureQueryResponse(response);
+ }
+ throw new RuntimeException("No metadata found for the query");
+ }
+
+ /**
+ * Query traces according to given conditions
+ *
+ * @param traceQuery condition for query
+ * @return trace query response.
+ */
+ public TraceQueryResponse query(TraceQuery traceQuery) throws BanyanDBException {
+ checkState(this.traceServiceStub != null, "trace service is null");
+
+ for (String group : traceQuery.groups) {
+ final BanyandbTrace.QueryResponse response =
+ HandleExceptionsWith.callAndTranslateApiException(() ->
+ this.traceServiceBlockingStub
+ .withDeadlineAfter(
+ this.getOptions().getDeadline(),
+ TimeUnit.SECONDS
+ )
+ .query(traceQuery.build()));
+ return new TraceQueryResponse(response);
+
+ }
+ throw new RuntimeException("No metadata found for the query");
+ }
+
+ /**
+ * Define a new group and attach to the current client.
+ *
+ * @param group the group to be created
+ * @return a grouped client
+ */
+ public Group define(Group group) throws BanyanDBException {
+ GroupMetadataRegistry registry = new GroupMetadataRegistry(checkNotNull(this.channel));
+ registry.create(group);
+ return registry.get(null, group.getMetadata().getName());
+ }
+
+ /**
+ * Define a new stream
+ *
+ * @param stream the stream to be created
+ */
+ public void define(Stream stream) throws BanyanDBException {
+ StreamMetadataRegistry streamRegistry = new StreamMetadataRegistry(checkNotNull(this.channel));
+ long modRevision = streamRegistry.create(stream);
+ stream = stream.toBuilder().setMetadata(stream.getMetadata().toBuilder().setModRevision(modRevision)).build();
+ }
+
+ /**
+ * Define a new stream with index rules,
+ * @param stream the stream to be created
+ * @param indexRules the index rules to be created
+ */
+ public void define(Stream stream, List