diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 42771a1b148..995d07347d3 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -753,16 +753,22 @@ public void ack(final long firstAckSeqNum, final int ackCount) throws IOExceptio // as a first implementation we assume that all batches are created from the same page lock.lock(); try { + boolean wasFull = isFull(); + if (containsSeq(headPage, firstAckSeqNum)) { this.headPage.ack(firstAckSeqNum, ackCount, this.checkpointMaxAcks); } else { final int resultIndex = binaryFindPageForSeqnum(firstAckSeqNum); if (tailPages.get(resultIndex).ack(firstAckSeqNum, ackCount, this.checkpointMaxAcks)) { this.tailPages.remove(resultIndex); - notFull.signalAll(); } this.headPage.checkpoint(); } + + // If we were full before the ack and are not full anymore, signal that we might not be full + if (wasFull && !isFull()) { + notFull.signalAll(); + } } finally { lock.unlock(); }