Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 7.4.0 [unreleased]

### Features

- [#848](https://github.com/influxdata/influxdb-client-java/pull/848): new WriteOption config for capturing backpressure data

## 7.3.0 [2025-05-22]

### Features
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ If you have Docker running, but it is not available over localhost (e.g. you are
- `INFLUXDB_PORT_API`
- `INFLUXDB_2_IP`
- `INFLUXDB_2_PORT_API`
- `INFLUXDB_2_ONBOARDING_IP`
- `INFLUXDB_2_ONBOARDING_PORT`

```bash
$ export INFLUXDB_IP=192.168.99.100
Expand Down
40 changes: 39 additions & 1 deletion client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,10 @@ The writes are processed in batches which are configurable by `WriteOptions`:
| **exponentialBase** | the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retryInterval * exponentialBase^(attempts-1)`` and ``retryInterval * exponentialBase^(attempts)``. Example for ``retryInterval=5_000, exponentialBase=2, maxRetryDelay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]``
| **bufferLimit** | the maximum number of unwritten stored points | 10000 |
| **backpressureStrategy** | the strategy to deal with buffer overflow | DROP_OLDEST |
| **captureBackpressureData** | whether to capture affected data points in backpressure events | false |
| **concatMapPrefetch** | the number of upstream items to prefetch for the concatMapMaybe operator | 2 |

There is also a synchronous blocking version of `WriteApi` - [WriteApiBlocking](#writing-data-using-synchronous-blocking-api).

#### Backpressure
The backpressure presents the problem of what to do with a growing backlog of unconsumed data points.
Expand Down Expand Up @@ -640,7 +644,41 @@ writeApi.listenEvents(BackpressureEvent.class, value -> {
});
```

There is also a synchronous blocking version of `WriteApi` - [WriteApiBlocking](#writing-data-using-synchronous-blocking-api).
##### Backpressure Event Data Snapshots

When backpressure occurs, enable `captureBackpressureData` to capture a snapshot of the affected data points from the `BackpressureEvent`. The content of this snapshot depends on the configured backpressure strategy:

- **`DROP_OLDEST`**: The snapshot contains only the data points that will be dropped (the oldest points in the buffer). This allows you to log, persist, or handle the specific data that is being lost due to backpressure.

- **`DROP_LATEST`**: The snapshot contains only the newest data points that are being added to the buffer. This represents the most recent data that triggered the backpressure condition.

Logging dropped data points:
```java
WriteOptions writeOptions = WriteOptions.builder()
.backpressureStrategy(BackpressureOverflowStrategy.DROP_OLDEST)
.bufferLimit(1000)
.captureBackpressureData(true)
.build();

WriteApi writeApi = influxDBClient.getWriteApi(writeOptions);

writeApi.listenEvents(BackpressureEvent.class, backpressureEvent -> {
List<String> affectedPoints = backpressureEvent.getDroppedLineProtocol();

if (backpressureEvent.getReason() == BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES) {
logger.warn("Backpressure occurred. Affected {} data points:", affectedPoints.size());

// For DROP_OLDEST: these are the points that were dropped from the buffer
// For DROP_LATEST: these are the newest points that triggered the condition
affectedPoints.forEach(point -> logger.debug("Affected point: {}", point));

// Do something with affected points ie. requeue for retry
requeue(affectedPoints);
}
});
```

Note: Disabling `captureBackpressureData` can improve performance when backpressure data capture is not needed.

#### Writing data

Expand Down
70 changes: 68 additions & 2 deletions client/src/main/java/com/influxdb/client/WriteOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
* <li>retryInterval = 5000 ms</li>
* <li>jitterInterval = 0</li>
* <li>bufferLimit = 10_000</li>
* <li>concatMapPrefetch = 2</li>
* <li>captureBackpressureData = false</li>
* </ul>
* <p>
* The default backpressure strategy is {@link BackpressureOverflowStrategy#DROP_OLDEST}.
Expand All @@ -62,6 +64,8 @@ public final class WriteOptions implements WriteApi.RetryOptions {
public static final int DEFAULT_MAX_RETRY_TIME = 180_000;
public static final int DEFAULT_EXPONENTIAL_BASE = 2;
public static final int DEFAULT_BUFFER_LIMIT = 10000;
public static final int DEFAULT_CONCAT_MAP_PREFETCH = 2;
public static final boolean DEFAULT_CAPTURE_BACKPRESSURE_DATA = false;

/**
* Default configuration with values that are consistent with Telegraf.
Expand All @@ -77,8 +81,10 @@ public final class WriteOptions implements WriteApi.RetryOptions {
private final int maxRetryTime;
private final int exponentialBase;
private final int bufferLimit;
private final int concatMapPrefetch;
private final Scheduler writeScheduler;
private final BackpressureOverflowStrategy backpressureStrategy;
private final boolean captureBackpressureData;

/**
* @return the number of data point to collect in batch
Expand Down Expand Up @@ -171,6 +177,17 @@ public int getBufferLimit() {
return bufferLimit;
}

/**
* The number of upstream items to prefetch so that fresh items are ready to be mapped when a previous
* MaybeSource terminates.
*
* @return the prefetch value for concatMapMaybe operator
* @see WriteOptions.Builder#concatMapPrefetch(int)
*/
public int getConcatMapPrefetch() {
return concatMapPrefetch;
}

/**
* @return The scheduler which is used for write data points.
* @see WriteOptions.Builder#writeScheduler(Scheduler)
Expand All @@ -189,6 +206,14 @@ public BackpressureOverflowStrategy getBackpressureStrategy() {
return backpressureStrategy;
}

/**
* @return whether to capture affected data points in backpressure events
* @see WriteOptions.Builder#captureBackpressureData(boolean)
*/
public boolean getCaptureBackpressureData() {
return captureBackpressureData;
}

private WriteOptions(@Nonnull final Builder builder) {

Arguments.checkNotNull(builder, "WriteOptions.Builder");
Expand All @@ -202,8 +227,10 @@ private WriteOptions(@Nonnull final Builder builder) {
maxRetryTime = builder.maxRetryTime;
exponentialBase = builder.exponentialBase;
bufferLimit = builder.bufferLimit;
concatMapPrefetch = builder.concatMapPrefetch;
writeScheduler = builder.writeScheduler;
backpressureStrategy = builder.backpressureStrategy;
captureBackpressureData = builder.captureBackpressureData;
}

/**
Expand Down Expand Up @@ -231,8 +258,10 @@ public static class Builder {
private int maxRetryTime = DEFAULT_MAX_RETRY_TIME;
private int exponentialBase = DEFAULT_EXPONENTIAL_BASE;
private int bufferLimit = DEFAULT_BUFFER_LIMIT;
private int concatMapPrefetch = DEFAULT_CONCAT_MAP_PREFETCH;
private Scheduler writeScheduler = Schedulers.newThread();
private BackpressureOverflowStrategy backpressureStrategy = BackpressureOverflowStrategy.DROP_OLDEST;
private boolean captureBackpressureData = DEFAULT_CAPTURE_BACKPRESSURE_DATA;

/**
* Set the number of data point to collect in batch.
Expand Down Expand Up @@ -339,7 +368,9 @@ public Builder maxRetryTime(final int maxRetryTime) {
*/
@Nonnull
public Builder exponentialBase(final int exponentialBase) {
Arguments.checkPositiveNumber(exponentialBase, "exponentialBase");
if (exponentialBase < 2) {
throw new IllegalArgumentException("Expecting a number >= 2 for exponentialBase");
}
this.exponentialBase = exponentialBase;
return this;
}
Expand All @@ -354,11 +385,27 @@ public Builder exponentialBase(final int exponentialBase) {
*/
@Nonnull
public Builder bufferLimit(final int bufferLimit) {
Arguments.checkNotNegativeNumber(bufferLimit, "bufferLimit");
Arguments.checkPositiveNumber(bufferLimit, "bufferLimit");
this.bufferLimit = bufferLimit;
return this;
}

/**
* Set the prefetch value for the concatMapMaybe operator that processes write batches.
*
* The number of upstream items to prefetch so that fresh items are ready to be mapped when a previous
* MaybeSource terminates.
*
* @param concatMapPrefetch the prefetch value for concatMapMaybe operator (must be positive)
* @return {@code this}
*/
@Nonnull
public Builder concatMapPrefetch(final int concatMapPrefetch) {
Arguments.checkPositiveNumber(concatMapPrefetch, "concatMapPrefetch");
this.concatMapPrefetch = concatMapPrefetch;
return this;
}

/**
* Set the scheduler which is used for write data points. It is useful for disabling batch writes or
* for tuning the performance. Default value is {@link Schedulers#newThread()}.
Expand Down Expand Up @@ -389,6 +436,25 @@ public Builder backpressureStrategy(@Nonnull final BackpressureOverflowStrategy
return this;
}

/**
* Set whether to capture affected data points in backpressure events.
*
* When enabled, BackpressureEvent will include the specific line protocol points
* that are affected by the backpressure condition:
* - For DROP_OLDEST strategy: points that will be dropped
* - For DROP_LATEST strategy: newest points being added
*
* Disabling this can improve performance when backpressure data capture is not needed.
*
* @param captureBackpressureData whether to capture affected data points. Default is false.
* @return {@code this}
*/
@Nonnull
public Builder captureBackpressureData(final boolean captureBackpressureData) {
this.captureBackpressureData = captureBackpressureData;
return this;
}

/**
* Build an instance of WriteOptions.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,15 @@ public AbstractWriteClient(@Nonnull final WriteOptions writeOptions,
//
.lift(new BackpressureBatchesBufferStrategy(
writeOptions.getBufferLimit(),
() -> publish(new BackpressureEvent(BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES)),
writeOptions.getBackpressureStrategy()))
droppedPoints -> publish(new BackpressureEvent(
BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES, droppedPoints)),
writeOptions.getBackpressureStrategy(),
writeOptions.getCaptureBackpressureData()))
//
// Use concat to process batches one by one
// Use concat to process batches with configurable prefetch
//
.concatMapMaybe(new ToWritePointsMaybe(processorScheduler, writeOptions))
.concatMapMaybe(new ToWritePointsMaybe(processorScheduler, writeOptions),
writeOptions.getConcatMapPrefetch())
.doFinally(() -> finished.set(true))
.subscribe(responseNotification -> {

Expand Down
Loading