2525import java .util .List ;
2626import java .util .concurrent .TimeUnit ;
2727import java .util .concurrent .atomic .AtomicInteger ;
28+ import java .util .function .Consumer ;
2829
2930import com .influxdb .client .domain .WritePrecision ;
3031import com .influxdb .client .internal .AbstractWriteClient ;
@@ -57,7 +58,7 @@ public void testBackwardCompatibility() {
5758 BackpressureOverflowStrategy .DROP_OLDEST )))
5859 .subscribe (testSubscriber );
5960
60- testSubscriber .request (10 );
61+ testSubscriber .request (1 );
6162 testSubscriber .awaitDone (5 , TimeUnit .SECONDS ).assertNoErrors ();
6263
6364 Assertions .assertThat (backpressureCount .get ()).isGreaterThan (0 );
@@ -106,7 +107,7 @@ public void testDropOldestVsDropLatest_CompareStrategies() {
106107 true )))
107108 .subscribe (oldestSubscriber );
108109
109- oldestSubscriber .request (1 ); // Only process 1 batch, causing backpressure
110+ oldestSubscriber .request (1 );
110111 oldestSubscriber .awaitDone (5 , TimeUnit .SECONDS ).assertNoErrors ();
111112
112113 // Test DROP_LATEST
@@ -124,7 +125,7 @@ public void testDropOldestVsDropLatest_CompareStrategies() {
124125 true )))
125126 .subscribe (latestSubscriber );
126127
127- latestSubscriber .request (1 ); // Only process 1 batch, causing backpressure
128+ latestSubscriber .request (1 );
128129 latestSubscriber .awaitDone (5 , TimeUnit .SECONDS ).assertNoErrors ();
129130
130131 // Extract and verify the exact batch numbers captured
@@ -172,6 +173,68 @@ public void testDropOldestVsDropLatest_CompareStrategies() {
172173 }
173174 }
174175
176+ @ Test
177+ public void testCaptureBatchReturnsEmptyList () {
178+ int bufferSize = 1 ;
179+ List <String > capturedData = new ArrayList <>();
180+
181+ TestSubscriber <AbstractWriteClient .BatchWriteItem > testSubscriber = createTestSubscriber ();
182+
183+ Flowable <AbstractWriteClient .BatchWriteItem > emptyBatchFlow = Flowable .unsafeCreate (s -> {
184+ BooleanSubscription bs = new BooleanSubscription ();
185+ s .onSubscribe (bs );
186+
187+ WriteParameters writeParameters = new WriteParameters ("test-bucket" , "test-org" , WritePrecision .NS );
188+ AbstractWriteClient .BatchWriteDataGrouped emptyData = new AbstractWriteClient .BatchWriteDataGrouped (writeParameters );
189+ AbstractWriteClient .BatchWriteItem emptyBatch = new AbstractWriteClient .BatchWriteItem (writeParameters , emptyData );
190+
191+ // Send multiple batches to trigger overflow
192+ s .onNext (emptyBatch );
193+ s .onNext (emptyBatch );
194+
195+ if (!bs .isCancelled ()) {
196+ s .onComplete ();
197+ }
198+ });
199+
200+ Flowable .fromPublisher (
201+ emptyBatchFlow
202+ .lift (new BackpressureBatchesBufferStrategy (
203+ bufferSize ,
204+ capturedData ::addAll ,
205+ BackpressureOverflowStrategy .DROP_OLDEST ,
206+ true ))) // capture enabled
207+ .subscribe (testSubscriber );
208+
209+ testSubscriber .request (1 );
210+ testSubscriber .awaitDone (5 , TimeUnit .SECONDS ).assertNoErrors ();
211+
212+ Assertions .assertThat (capturedData ).isEmpty ();
213+ }
214+
215+ @ Test
216+ public void testOnOverflowExceptionHandling () {
217+ int bufferSize = 1 ;
218+ TestSubscriber <AbstractWriteClient .BatchWriteItem > testSubscriber = createTestSubscriber ();
219+
220+ Consumer <List <String >> throwingConsumer = capturedData -> {
221+ throw new RuntimeException ("Test exception in onOverflow" );
222+ };
223+ Flowable .fromPublisher (
224+ createSequentialBatches (3 )
225+ .lift (new BackpressureBatchesBufferStrategy (
226+ bufferSize ,
227+ throwingConsumer ,
228+ BackpressureOverflowStrategy .DROP_OLDEST ,
229+ true )))
230+ .subscribe (testSubscriber );
231+
232+ testSubscriber .request (1 );
233+ testSubscriber .awaitDone (5 , TimeUnit .SECONDS );
234+
235+ testSubscriber .assertError (RuntimeException .class );
236+ }
237+
175238 /**
176239 * Extract batch numbers from line protocol strings for verification
177240 */
@@ -180,11 +243,11 @@ private List<Integer> extractBatchNumbers(List<String> lineProtocolPoints) {
180243 .map (line -> {
181244 try {
182245 int batchStart = line .indexOf ("batch=" ) + 6 ;
183- if (batchStart == 5 ) return -1 ; // "batch=" not found
184-
246+ if (batchStart == 5 ) return -1 ;
247+
185248 int batchEnd = line .indexOf (" " , batchStart );
186249 if (batchEnd == -1 ) batchEnd = line .length ();
187-
250+
188251 String batchNumStr = line .substring (batchStart , batchEnd );
189252 return Integer .parseInt (batchNumStr );
190253 } catch (Exception e ) {
@@ -217,16 +280,14 @@ private Flowable<AbstractWriteClient.BatchWriteItem> createSequentialBatches(int
217280 return Flowable .unsafeCreate (s -> {
218281 BooleanSubscription bs = new BooleanSubscription ();
219282 s .onSubscribe (bs );
220-
283+
221284 for (int batchNum = 1 ; batchNum <= batchCount && !bs .isCancelled (); batchNum ++) {
222285 WriteParameters writeParameters = new WriteParameters ("test-bucket" , "test-org" , WritePrecision .NS );
223286 AbstractWriteClient .BatchWriteDataGrouped data = new AbstractWriteClient .BatchWriteDataGrouped (writeParameters );
224-
287+
225288 // Single point per batch with clear batch identification
226289 data .append (String .format ("sequential,batch=%d value=%d %d" , batchNum , batchNum * 100 , batchNum * 1000000L ));
227-
228290 s .onNext (new AbstractWriteClient .BatchWriteItem (writeParameters , data ));
229-
230291 // Small delay to ensure predictable ordering
231292 try {
232293 Thread .sleep (1 );
0 commit comments