From 114ac44e8068a7cccc5cf3bd942f565c3660b918 Mon Sep 17 00:00:00 2001 From: yezhu Date: Wed, 18 Oct 2017 14:46:42 -0700 Subject: [PATCH 1/4] Test branch --- examples/src/main/java/kafka/examples/SimpleConsumerDemo.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java index 28bdb1638c5b4..1128522d28e34 100644 --- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java +++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java @@ -14,6 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +/* +* Modify for test +*/ package kafka.examples; import kafka.api.FetchRequest; From 83e1779168e671a4f02369b39cf31de110a4db51 Mon Sep 17 00:00:00 2001 From: yezhu Date: Wed, 18 Oct 2017 18:15:56 -0700 Subject: [PATCH 2/4] Broker Interceptor --- .../record/AbstractLegacyRecordBatch.java | 5 + .../kafka/common/record/BufferSupplier.java | 96 +++++++++++++++++++ .../common/record/DefaultRecordBatch.java | 13 +++ .../common/record/FileLogInputStream.java | 6 ++ .../kafka/common/record/RecordBatch.java | 3 + config/server.properties | 2 +- .../main/scala/kafka/server/KafkaApis.scala | 6 +- .../scala/kafka/server/ReplicaManager.scala | 29 +++++- .../bin/Windows-producer-consumer-demo.bat | 20 ++++ examples/bin/Windows-simple-consumer-demo.bat | 20 ++++ .../java/kafka/examples/KafkaProperties.java | 2 +- .../main/java/kafka/examples/Producer.java | 2 +- 12 files changed, 197 insertions(+), 7 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java create mode 100644 examples/bin/Windows-producer-consumer-demo.bat create mode 100644 examples/bin/Windows-simple-consumer-demo.bat diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 6deeb52407cae..e53a9a504a7ec 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -218,6 +218,11 @@ public Iterator iterator() { return Collections.singletonList(this).iterator(); } + @Override + public Iterator streamingIterator(BufferSupplier bufferSupplier) { + return iterator(); + } + static void writeHeader(ByteBuffer buffer, long offset, int size) { buffer.putLong(offset); buffer.putInt(size); diff --git a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java new file mode 100644 index 0000000000000..2e09f7d1a2cc3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.record; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + +/** + * Simple non-threadsafe interface for caching byte buffers. This is suitable for simple cases like ensuring that + * a given KafkaConsumer reuses the same decompression buffer when iterating over fetched records. For small record + * batches, allocating a potentially large buffer (64 KB for LZ4) will dominate the cost of decompressing and + * iterating over the records in the batch. + */ +public abstract class BufferSupplier implements AutoCloseable { + + public static final BufferSupplier NO_CACHING = new BufferSupplier() { + @Override + public ByteBuffer get(int capacity) { + return ByteBuffer.allocate(capacity); + } + + @Override + public void release(ByteBuffer buffer) {} + + @Override + public void close() {} + }; + + public static BufferSupplier create() { + return new DefaultSupplier(); + } + + /** + * Supply a buffer with the required capacity. This may return a cached buffer or allocate a new instance. + */ + public abstract ByteBuffer get(int capacity); + + /** + * Return the provided buffer to be reused by a subsequent call to `get`. + */ + public abstract void release(ByteBuffer buffer); + + /** + * Release all resources associated with this supplier. + */ + public abstract void close(); + + private static class DefaultSupplier extends BufferSupplier { + // We currently use a single block size, so optimise for that case + private final Map> bufferMap = new HashMap<>(1); + + @Override + public ByteBuffer get(int size) { + Deque bufferQueue = bufferMap.get(size); + if (bufferQueue == null || bufferQueue.isEmpty()) + return ByteBuffer.allocate(size); + else + return bufferQueue.pollFirst(); + } + + @Override + public void release(ByteBuffer buffer) { + buffer.clear(); + Deque bufferQueue = bufferMap.get(buffer.capacity()); + if (bufferQueue == null) { + // We currently keep a single buffer in flight, so optimise for that case + bufferQueue = new ArrayDeque<>(1); + bufferMap.put(buffer.capacity(), bufferQueue); + } + bufferQueue.addLast(buffer); + } + + @Override + public void close() { + bufferMap.clear(); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 1de568e222e03..ac0e59063b02b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -202,6 +202,11 @@ public int partitionLeaderEpoch() { return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET); } + private Iterator compressedIterator(BufferSupplier bufferSupplier) { + // No needed right now + return null; + } + private Iterator compressedIterator() { ByteBuffer buffer = this.buffer.duplicate(); buffer.position(RECORDS_OFFSET); @@ -272,6 +277,14 @@ public Iterator iterator() { return uncompressedIterator(); } + @Override + public Iterator streamingIterator(BufferSupplier bufferSupplier) { + if (isCompressed()) + return compressedIterator(bufferSupplier); + else + return uncompressedIterator(); + } + @Override public void setLastOffset(long offset) { buffer.putLong(BASE_OFFSET_OFFSET, offset - lastOffsetDelta()); diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java index 59055ed2754c5..f9f4268c5fde8 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java @@ -223,6 +223,12 @@ public Iterator iterator() { return underlying.iterator(); } + @Override + public Iterator streamingIterator(BufferSupplier bufferSupplier) { + loadUnderlyingRecordBatch(); + return underlying.streamingIterator(bufferSupplier); + } + @Override public boolean isValid() { loadUnderlyingRecordBatch(); diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 1cfb7f8302790..d8ab5320b5428 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; import java.nio.ByteBuffer; +import java.util.Iterator; /** * A record batch is a container for records. In old versions of the record format (versions 0 and 1), @@ -200,4 +201,6 @@ public interface RecordBatch extends Iterable { */ int partitionLeaderEpoch(); + Iterator streamingIterator(BufferSupplier decompressionBufferSupplier); + } diff --git a/config/server.properties b/config/server.properties index 37b5bb3436758..d6360c8263d8f 100644 --- a/config/server.properties +++ b/config/server.properties @@ -60,7 +60,7 @@ socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files -log.dirs=/tmp/kafka-logs +log.dirs=C:\kafka_2.11-0.9.0.0\kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index defbf34365658..ad18c3a6f6cb2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -442,6 +442,9 @@ class KafkaApis(val requestChannel: RequestChannel, val versionId = request.header.apiVersion val clientId = request.header.clientId + var result = clientId.split('_'); + var eventPattern = result(1); + val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.fetchData.asScala.toSeq.partition { case (tp, _) => authorize(request.session, Describe, new Resource(auth.Topic, tp.topic)) && metadataCache.contains(tp.topic) } @@ -540,7 +543,8 @@ class KafkaApis(val requestChannel: RequestChannel, versionId <= 2, authorizedRequestInfo, replicationQuota(fetchRequest), - sendResponseCallback) + sendResponseCallback, + eventPattern) } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8f67425208c0a..123f73d1c2541 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -39,6 +39,8 @@ import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, St import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Time import org.apache.kafka.common.requests.FetchRequest.PartitionData +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.record.BufferSupplier import scala.collection._ import scala.collection.JavaConverters._ @@ -140,6 +142,8 @@ class ReplicaManager(val config: KafkaConfig, private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis()) private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis()) + private var decompressionBufferSupplier = BufferSupplier.create(); + val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce]( purgatoryName = "Produce", localBrokerId, config.producerPurgatoryPurgeIntervalRequests) val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch]( @@ -558,7 +562,8 @@ class ReplicaManager(val config: KafkaConfig, hardMaxBytesLimit: Boolean, fetchInfos: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota = UnboundedQuota, - responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) { + responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, + eventPattern: String = null) { val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) @@ -571,7 +576,8 @@ class ReplicaManager(val config: KafkaConfig, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, - quota = quota) + quota = quota, + eventPattern) // if the fetch comes from the follower, // update its corresponding log end offset @@ -624,7 +630,8 @@ class ReplicaManager(val config: KafkaConfig, fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, readPartitionInfo: Seq[(TopicPartition, PartitionData)], - quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = { + quota: ReplicaQuota, + eventPattern: String = null): Seq[(TopicPartition, LogReadResult)] = { def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = { val offset = fetchInfo.fetchOffset @@ -682,6 +689,22 @@ class ReplicaManager(val config: KafkaConfig, FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY) } + println("***************************Go inside read*******************************"); + + var batchs = logReadInfo.records.batches().iterator() + while(batchs.hasNext()){ + var currentBatch = batchs.next(); + var records = currentBatch.streamingIterator(decompressionBufferSupplier); + while(records.hasNext()) + { + var record = records.next(); + var valueByteArray = Utils.toArray(record.key()); + for( a <- 0 to (valueByteArray.length - 1)) { + println("********************************** record.key() = %d ************************".format(valueByteArray(a))); + } + } + } + LogReadResult(info = logReadInfo, hw = initialHighWatermark, leaderLogStartOffset = initialLogStartOffset, diff --git a/examples/bin/Windows-producer-consumer-demo.bat b/examples/bin/Windows-producer-consumer-demo.bat new file mode 100644 index 0000000000000..7e857689edb99 --- /dev/null +++ b/examples/bin/Windows-producer-consumer-demo.bat @@ -0,0 +1,20 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +SetLocal +set KAFKA_HEAP_OPTS=-Xmx512M +"%~dp0..\..\bin\windows\kafka-run-class.bat" kafka.examples.KafkaConsumerProducerDemo %* +EndLocal diff --git a/examples/bin/Windows-simple-consumer-demo.bat b/examples/bin/Windows-simple-consumer-demo.bat new file mode 100644 index 0000000000000..43b5ce8364e73 --- /dev/null +++ b/examples/bin/Windows-simple-consumer-demo.bat @@ -0,0 +1,20 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +SetLocal +set KAFKA_HEAP_OPTS=-Xmx512M +"%~dp0..\..\bin\windows\kafka-run-class.bat" kafka.examples.SimpleConsumerDemo %* +EndLocal diff --git a/examples/src/main/java/kafka/examples/KafkaProperties.java b/examples/src/main/java/kafka/examples/KafkaProperties.java index cd737cf900e29..3694103929d68 100644 --- a/examples/src/main/java/kafka/examples/KafkaProperties.java +++ b/examples/src/main/java/kafka/examples/KafkaProperties.java @@ -24,7 +24,7 @@ public class KafkaProperties { public static final int CONNECTION_TIMEOUT = 100000; public static final String TOPIC2 = "topic2"; public static final String TOPIC3 = "topic3"; - public static final String CLIENT_ID = "SimpleConsumerDemoClient"; + public static final String CLIENT_ID = "SimpleConsumerDemoClient_1"; private KafkaProperties() {} } diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index e7be1a078df7f..622c6c491f03f 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -42,7 +42,7 @@ public Producer(String topic, Boolean isAsync) { public void run() { int messageNo = 1; - while (true) { + while (messageNo < 5) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously From 0a58318e451f9f8493b6b152f5db06c138f168b5 Mon Sep 17 00:00:00 2001 From: yezhu Date: Wed, 18 Oct 2017 18:32:08 -0700 Subject: [PATCH 3/4] Add missing part --- .../scala/kafka/server/ReplicaManager.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 123f73d1c2541..391c06d0fe754 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -17,6 +17,7 @@ package kafka.server import java.io.{File, IOException} +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} @@ -705,6 +706,32 @@ class ReplicaManager(val config: KafkaConfig, } } + while (batchs.hasNext()) { + println("********************************** batchs.hasNext() ************************"); + var currentBatch = batchs.next(); + var records = currentBatch.streamingIterator(decompressionBufferSupplier); + while (records.hasNext()) { + println("********************************** record.hasNext() ************************"); + var record = records.next(); + var valueByteArray = Utils.toArray(record.key()); + for (a <- 0 to (valueByteArray.length - 1)) { + println("********************************** record.key() = %d ************************".format(valueByteArray(a))); + } + + var key = new String(valueByteArray, StandardCharsets.UTF_8); + println("********************************** record.key() = %s ************************".format(key)); + + if (eventPattern != null) { + println("***************************event pattern not equal to null*******************************"); + var eventPatternRegex = eventPattern.r; + var records = logReadInfo.records.batches().iterator(); + if (!eventPatternRegex.findFirstIn(key.toString).isDefined) { + records.remove(); + } + } + } + } + LogReadResult(info = logReadInfo, hw = initialHighWatermark, leaderLogStartOffset = initialLogStartOffset, From 2dfcdb211e2822b9065b9e62f0e5cc4c3a2c0947 Mon Sep 17 00:00:00 2001 From: yezhu Date: Thu, 26 Oct 2017 04:05:09 -0700 Subject: [PATCH 4/4] Add CloseableIterator and implement Remove method --- .../common/record/DefaultRecordBatch.java | 25 ++++++++++++--- .../kafka/common/utils/CloseableIterator.java | 31 +++++++++++++++++++ .../scala/kafka/server/ReplicaManager.scala | 2 +- 3 files changed, 53 insertions(+), 5 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index ac0e59063b02b..89d6f2e8c4136 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Crc32; import org.apache.kafka.common.utils.Utils; @@ -202,7 +203,7 @@ public int partitionLeaderEpoch() { return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET); } - private Iterator compressedIterator(BufferSupplier bufferSupplier) { + private CloseableIterator compressedIterator(BufferSupplier bufferSupplier) { // No needed right now return null; } @@ -238,7 +239,7 @@ private Iterator compressedIterator() { return records.iterator(); } - private Iterator uncompressedIterator() { + private CloseableIterator uncompressedIterator() { final ByteBuffer buffer = this.buffer.duplicate(); final Long logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null; final long baseOffset = baseOffset(); @@ -248,7 +249,7 @@ private Iterator uncompressedIterator() { buffer.position(RECORDS_OFFSET); final int totalRecords = count(); - return new Iterator() { + return new CloseableIterator() { int readRecords = 0; @Override @@ -266,6 +267,22 @@ public Record next() { public void remove() { throw new UnsupportedOperationException(); } + + @Override + public void removes(int length) { + int startPoint = buffer.position() - length; + int index = startPoint; + for (int i = buffer.position(); i < buffer.limit(); i++) { + buffer.put(index++, buffer.get(i)); + buffer.put(i, (byte) 0); + } + buffer.position(startPoint); + } + + @Override + public void close(){ + //Implement in future + } }; } @@ -278,7 +295,7 @@ public Iterator iterator() { } @Override - public Iterator streamingIterator(BufferSupplier bufferSupplier) { + public CloseableIterator streamingIterator(BufferSupplier bufferSupplier) { if (isCompressed()) return compressedIterator(bufferSupplier); else diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java b/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java new file mode 100644 index 0000000000000..6037593db8837 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java @@ -0,0 +1,31 @@ +/* + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * */ +package org.apache.kafka.common.utils; + +import java.io.Closeable; +import java.util.Iterator; + +/** + * * Iterators that need to be closed in order to release resources should implement this interface. + * * + * * Warning: before implementing this interface, consider if there are better options. The chance of misuse is + * * a bit high since people are used to iterating without closing. + * */ +public interface CloseableIterator extends Iterator, Closeable { + void close(); + void removes(int length); +} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 391c06d0fe754..fe158051d3aa4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -726,7 +726,7 @@ class ReplicaManager(val config: KafkaConfig, var eventPatternRegex = eventPattern.r; var records = logReadInfo.records.batches().iterator(); if (!eventPatternRegex.findFirstIn(key.toString).isDefined) { - records.remove(); + records.removes(record.sizeInBytes()); } } }