Skip to content

Commit e92640e

Browse files
author
Fred [C] Park
committed
feat: new WriteOption config for capturing backpressure data
1 parent dd37956 commit e92640e

File tree

12 files changed

+1115
-21
lines changed

12 files changed

+1115
-21
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
## 7.4.0 [unreleased]
22

3+
### Features
4+
5+
- [#848](https://github.com/influxdata/influxdb-client-java/pull/848): new WriteOption config for capturing backpressure data
6+
37
## 7.3.0 [2025-05-22]
48

59
### Features

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,8 @@ If you have Docker running, but it is not available over localhost (e.g. you are
418418
- `INFLUXDB_PORT_API`
419419
- `INFLUXDB_2_IP`
420420
- `INFLUXDB_2_PORT_API`
421+
- `INFLUXDB_2_ONBOARDING_IP`
422+
- `INFLUXDB_2_ONBOARDING_PORT`
421423

422424
```bash
423425
$ export INFLUXDB_IP=192.168.99.100

client/README.md

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,10 @@ The writes are processed in batches which are configurable by `WriteOptions`:
611611
| **exponentialBase** | the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retryInterval * exponentialBase^(attempts-1)`` and ``retryInterval * exponentialBase^(attempts)``. Example for ``retryInterval=5_000, exponentialBase=2, maxRetryDelay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]``
612612
| **bufferLimit** | the maximum number of unwritten stored points | 10000 |
613613
| **backpressureStrategy** | the strategy to deal with buffer overflow | DROP_OLDEST |
614+
| **captureBackpressureData** | whether to capture affected data points in backpressure events | false |
615+
| **concatMapPrefetch** | the number of upstream items to prefetch for the concatMapMaybe operator | 2 |
616+
617+
There is also a synchronous blocking version of `WriteApi` - [WriteApiBlocking](#writing-data-using-synchronous-blocking-api).
614618

615619
#### Backpressure
616620
The backpressure presents the problem of what to do with a growing backlog of unconsumed data points.
@@ -640,7 +644,41 @@ writeApi.listenEvents(BackpressureEvent.class, value -> {
640644
});
641645
```
642646

643-
There is also a synchronous blocking version of `WriteApi` - [WriteApiBlocking](#writing-data-using-synchronous-blocking-api).
647+
##### Backpressure Event Data Snapshots
648+
649+
When backpressure occurs, enable `captureBackpressureData` to capture a snapshot of the affected data points from the `BackpressureEvent`. The content of this snapshot depends on the configured backpressure strategy:
650+
651+
- **`DROP_OLDEST`**: The snapshot contains only the data points that will be dropped (the oldest points in the buffer). This allows you to log, persist, or handle the specific data that is being lost due to backpressure.
652+
653+
- **`DROP_LATEST`**: The snapshot contains only the newest data points that are being added to the buffer. This represents the most recent data that triggered the backpressure condition.
654+
655+
Logging dropped data points:
656+
```java
657+
WriteOptions writeOptions = WriteOptions.builder()
658+
.backpressureStrategy(BackpressureOverflowStrategy.DROP_OLDEST)
659+
.bufferLimit(1000)
660+
.captureBackpressureData(true)
661+
.build();
662+
663+
WriteApi writeApi = influxDBClient.getWriteApi(writeOptions);
664+
665+
writeApi.listenEvents(BackpressureEvent.class, backpressureEvent -> {
666+
List<String> affectedPoints = backpressureEvent.getDroppedLineProtocol();
667+
668+
if (backpressureEvent.getReason() == BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES) {
669+
logger.warn("Backpressure occurred. Affected {} data points:", affectedPoints.size());
670+
671+
// For DROP_OLDEST: these are the points that were dropped from the buffer
672+
// For DROP_LATEST: these are the newest points that triggered the condition
673+
affectedPoints.forEach(point -> logger.debug("Affected point: {}", point));
674+
675+
// Do something with affected points ie. requeue for retry
676+
requeue(affectedPoints);
677+
}
678+
});
679+
```
680+
681+
Note: Disabling `captureBackpressureData` can improve performance when backpressure data capture is not needed.
644682

645683
#### Writing data
646684

client/src/main/java/com/influxdb/client/WriteOptions.java

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
* <li>retryInterval = 5000 ms</li>
4444
* <li>jitterInterval = 0</li>
4545
* <li>bufferLimit = 10_000</li>
46+
* <li>concatMapPrefetch = 2</li>
47+
* <li>captureBackpressureData = false</li>
4648
* </ul>
4749
* <p>
4850
* The default backpressure strategy is {@link BackpressureOverflowStrategy#DROP_OLDEST}.
@@ -62,6 +64,8 @@ public final class WriteOptions implements WriteApi.RetryOptions {
6264
public static final int DEFAULT_MAX_RETRY_TIME = 180_000;
6365
public static final int DEFAULT_EXPONENTIAL_BASE = 2;
6466
public static final int DEFAULT_BUFFER_LIMIT = 10000;
67+
public static final int DEFAULT_CONCAT_MAP_PREFETCH = 2;
68+
public static final boolean DEFAULT_CAPTURE_BACKPRESSURE_DATA = false;
6569

6670
/**
6771
* Default configuration with values that are consistent with Telegraf.
@@ -77,8 +81,10 @@ public final class WriteOptions implements WriteApi.RetryOptions {
7781
private final int maxRetryTime;
7882
private final int exponentialBase;
7983
private final int bufferLimit;
84+
private final int concatMapPrefetch;
8085
private final Scheduler writeScheduler;
8186
private final BackpressureOverflowStrategy backpressureStrategy;
87+
private final boolean captureBackpressureData;
8288

8389
/**
8490
* @return the number of data point to collect in batch
@@ -171,6 +177,17 @@ public int getBufferLimit() {
171177
return bufferLimit;
172178
}
173179

180+
/**
181+
* The number of upstream items to prefetch so that fresh items are ready to be mapped when a previous
182+
* MaybeSource terminates.
183+
*
184+
* @return the prefetch value for concatMapMaybe operator
185+
* @see WriteOptions.Builder#concatMapPrefetch(int)
186+
*/
187+
public int getConcatMapPrefetch() {
188+
return concatMapPrefetch;
189+
}
190+
174191
/**
175192
* @return The scheduler which is used for write data points.
176193
* @see WriteOptions.Builder#writeScheduler(Scheduler)
@@ -189,6 +206,14 @@ public BackpressureOverflowStrategy getBackpressureStrategy() {
189206
return backpressureStrategy;
190207
}
191208

209+
/**
210+
* @return whether to capture affected data points in backpressure events
211+
* @see WriteOptions.Builder#captureBackpressureData(boolean)
212+
*/
213+
public boolean getCaptureBackpressureData() {
214+
return captureBackpressureData;
215+
}
216+
192217
private WriteOptions(@Nonnull final Builder builder) {
193218

194219
Arguments.checkNotNull(builder, "WriteOptions.Builder");
@@ -202,8 +227,10 @@ private WriteOptions(@Nonnull final Builder builder) {
202227
maxRetryTime = builder.maxRetryTime;
203228
exponentialBase = builder.exponentialBase;
204229
bufferLimit = builder.bufferLimit;
230+
concatMapPrefetch = builder.concatMapPrefetch;
205231
writeScheduler = builder.writeScheduler;
206232
backpressureStrategy = builder.backpressureStrategy;
233+
captureBackpressureData = builder.captureBackpressureData;
207234
}
208235

209236
/**
@@ -231,8 +258,10 @@ public static class Builder {
231258
private int maxRetryTime = DEFAULT_MAX_RETRY_TIME;
232259
private int exponentialBase = DEFAULT_EXPONENTIAL_BASE;
233260
private int bufferLimit = DEFAULT_BUFFER_LIMIT;
261+
private int concatMapPrefetch = DEFAULT_CONCAT_MAP_PREFETCH;
234262
private Scheduler writeScheduler = Schedulers.newThread();
235263
private BackpressureOverflowStrategy backpressureStrategy = BackpressureOverflowStrategy.DROP_OLDEST;
264+
private boolean captureBackpressureData = DEFAULT_CAPTURE_BACKPRESSURE_DATA;
236265

237266
/**
238267
* Set the number of data point to collect in batch.
@@ -339,7 +368,9 @@ public Builder maxRetryTime(final int maxRetryTime) {
339368
*/
340369
@Nonnull
341370
public Builder exponentialBase(final int exponentialBase) {
342-
Arguments.checkPositiveNumber(exponentialBase, "exponentialBase");
371+
if (exponentialBase < 2) {
372+
throw new IllegalArgumentException("Expecting a number >= 2 for exponentialBase");
373+
}
343374
this.exponentialBase = exponentialBase;
344375
return this;
345376
}
@@ -354,11 +385,27 @@ public Builder exponentialBase(final int exponentialBase) {
354385
*/
355386
@Nonnull
356387
public Builder bufferLimit(final int bufferLimit) {
357-
Arguments.checkNotNegativeNumber(bufferLimit, "bufferLimit");
388+
Arguments.checkPositiveNumber(bufferLimit, "bufferLimit");
358389
this.bufferLimit = bufferLimit;
359390
return this;
360391
}
361392

393+
/**
394+
* Set the prefetch value for the concatMapMaybe operator that processes write batches.
395+
*
396+
* The number of upstream items to prefetch so that fresh items are ready to be mapped when a previous
397+
* MaybeSource terminates.
398+
*
399+
* @param concatMapPrefetch the prefetch value for concatMapMaybe operator (must be positive)
400+
* @return {@code this}
401+
*/
402+
@Nonnull
403+
public Builder concatMapPrefetch(final int concatMapPrefetch) {
404+
Arguments.checkPositiveNumber(concatMapPrefetch, "concatMapPrefetch");
405+
this.concatMapPrefetch = concatMapPrefetch;
406+
return this;
407+
}
408+
362409
/**
363410
* Set the scheduler which is used for write data points. It is useful for disabling batch writes or
364411
* for tuning the performance. Default value is {@link Schedulers#newThread()}.
@@ -389,6 +436,25 @@ public Builder backpressureStrategy(@Nonnull final BackpressureOverflowStrategy
389436
return this;
390437
}
391438

439+
/**
440+
* Set whether to capture affected data points in backpressure events.
441+
*
442+
* When enabled, BackpressureEvent will include the specific line protocol points
443+
* that are affected by the backpressure condition:
444+
* - For DROP_OLDEST strategy: points that will be dropped
445+
* - For DROP_LATEST strategy: newest points being added
446+
*
447+
* Disabling this can improve performance when backpressure data capture is not needed.
448+
*
449+
* @param captureBackpressureData whether to capture affected data points. Default is false.
450+
* @return {@code this}
451+
*/
452+
@Nonnull
453+
public Builder captureBackpressureData(final boolean captureBackpressureData) {
454+
this.captureBackpressureData = captureBackpressureData;
455+
return this;
456+
}
457+
392458
/**
393459
* Build an instance of WriteOptions.
394460
*

client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,15 @@ public AbstractWriteClient(@Nonnull final WriteOptions writeOptions,
161161
//
162162
.lift(new BackpressureBatchesBufferStrategy(
163163
writeOptions.getBufferLimit(),
164-
() -> publish(new BackpressureEvent(BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES)),
165-
writeOptions.getBackpressureStrategy()))
164+
droppedPoints -> publish(new BackpressureEvent(
165+
BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES, droppedPoints)),
166+
writeOptions.getBackpressureStrategy(),
167+
writeOptions.getCaptureBackpressureData()))
166168
//
167-
// Use concat to process batches one by one
169+
// Use concat to process batches with configurable prefetch
168170
//
169-
.concatMapMaybe(new ToWritePointsMaybe(processorScheduler, writeOptions))
171+
.concatMapMaybe(new ToWritePointsMaybe(processorScheduler, writeOptions),
172+
writeOptions.getConcatMapPrefetch())
170173
.doFinally(() -> finished.set(true))
171174
.subscribe(responseNotification -> {
172175

0 commit comments

Comments
 (0)