Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ public Iterator<Record> iterator() {
return Collections.<Record>singletonList(this).iterator();
}

@Override
public Iterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
return iterator();
}

static void writeHeader(ByteBuffer buffer, long offset, int size) {
buffer.putLong(offset);
buffer.putInt(size);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.record;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;

/**
* Simple non-threadsafe interface for caching byte buffers. This is suitable for simple cases like ensuring that
* a given KafkaConsumer reuses the same decompression buffer when iterating over fetched records. For small record
* batches, allocating a potentially large buffer (64 KB for LZ4) will dominate the cost of decompressing and
* iterating over the records in the batch.
*/
public abstract class BufferSupplier implements AutoCloseable {

public static final BufferSupplier NO_CACHING = new BufferSupplier() {
@Override
public ByteBuffer get(int capacity) {
return ByteBuffer.allocate(capacity);
}

@Override
public void release(ByteBuffer buffer) {}

@Override
public void close() {}
};

public static BufferSupplier create() {
return new DefaultSupplier();
}

/**
* Supply a buffer with the required capacity. This may return a cached buffer or allocate a new instance.
*/
public abstract ByteBuffer get(int capacity);

/**
* Return the provided buffer to be reused by a subsequent call to `get`.
*/
public abstract void release(ByteBuffer buffer);

/**
* Release all resources associated with this supplier.
*/
public abstract void close();

private static class DefaultSupplier extends BufferSupplier {
// We currently use a single block size, so optimise for that case
private final Map<Integer, Deque<ByteBuffer>> bufferMap = new HashMap<>(1);

@Override
public ByteBuffer get(int size) {
Deque<ByteBuffer> bufferQueue = bufferMap.get(size);
if (bufferQueue == null || bufferQueue.isEmpty())
return ByteBuffer.allocate(size);
else
return bufferQueue.pollFirst();
}

@Override
public void release(ByteBuffer buffer) {
buffer.clear();
Deque<ByteBuffer> bufferQueue = bufferMap.get(buffer.capacity());
if (bufferQueue == null) {
// We currently keep a single buffer in flight, so optimise for that case
bufferQueue = new ArrayDeque<>(1);
bufferMap.put(buffer.capacity(), bufferQueue);
}
bufferQueue.addLast(buffer);
}

@Override
public void close() {
bufferMap.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ public int partitionLeaderEpoch() {
return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
}

private Iterator<Record> compressedIterator(BufferSupplier bufferSupplier) {
// No needed right now
return null;
}

private Iterator<Record> compressedIterator() {
ByteBuffer buffer = this.buffer.duplicate();
buffer.position(RECORDS_OFFSET);
Expand Down Expand Up @@ -272,6 +277,14 @@ public Iterator<Record> iterator() {
return uncompressedIterator();
}

@Override
public Iterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
if (isCompressed())
return compressedIterator(bufferSupplier);
else
return uncompressedIterator();
}

@Override
public void setLastOffset(long offset) {
buffer.putLong(BASE_OFFSET_OFFSET, offset - lastOffsetDelta());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ public Iterator<Record> iterator() {
return underlying.iterator();
}

@Override
public Iterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
loadUnderlyingRecordBatch();
return underlying.streamingIterator(bufferSupplier);
}

@Override
public boolean isValid() {
loadUnderlyingRecordBatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.record;

import java.nio.ByteBuffer;
import java.util.Iterator;

/**
* A record batch is a container for records. In old versions of the record format (versions 0 and 1),
Expand Down Expand Up @@ -200,4 +201,6 @@ public interface RecordBatch extends Iterable<Record> {
*/
int partitionLeaderEpoch();

Iterator<Record> streamingIterator(BufferSupplier decompressionBufferSupplier);

}
2 changes: 1 addition & 1 deletion config/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ socket.request.max.bytes=104857600
############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
log.dirs=C:\kafka_2.11-0.9.0.0\kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ class KafkaApis(val requestChannel: RequestChannel,
val versionId = request.header.apiVersion
val clientId = request.header.clientId

var result = clientId.split('_');
var eventPattern = result(1);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Kafka 0.11, you can use Record Header to store eventPattern info.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is my previous plan, but it need additional work to parse it from header. I think this is not the highest priority issue we need to resolve, we could left hack way here and make code work firstly.


val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.fetchData.asScala.toSeq.partition {
case (tp, _) => authorize(request.session, Describe, new Resource(auth.Topic, tp.topic)) && metadataCache.contains(tp.topic)
}
Expand Down Expand Up @@ -540,7 +543,8 @@ class KafkaApis(val requestChannel: RequestChannel,
versionId <= 2,
authorizedRequestInfo,
replicationQuota(fetchRequest),
sendResponseCallback)
sendResponseCallback,
eventPattern)
}
}

Expand Down
56 changes: 53 additions & 3 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kafka.server

import java.io.{File, IOException}
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

Expand All @@ -39,6 +40,8 @@ import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, St
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.record.BufferSupplier

import scala.collection._
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -140,6 +143,8 @@ class ReplicaManager(val config: KafkaConfig,
private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())

private var decompressionBufferSupplier = BufferSupplier.create();

val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", localBrokerId, config.producerPurgatoryPurgeIntervalRequests)
val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
Expand Down Expand Up @@ -558,7 +563,8 @@ class ReplicaManager(val config: KafkaConfig,
hardMaxBytesLimit: Boolean,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota = UnboundedQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) {
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
eventPattern: String = null) {
val isFromFollower = replicaId >= 0
val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
Expand All @@ -571,7 +577,8 @@ class ReplicaManager(val config: KafkaConfig,
fetchMaxBytes = fetchMaxBytes,
hardMaxBytesLimit = hardMaxBytesLimit,
readPartitionInfo = fetchInfos,
quota = quota)
quota = quota,
eventPattern)

// if the fetch comes from the follower,
// update its corresponding log end offset
Expand Down Expand Up @@ -624,7 +631,8 @@ class ReplicaManager(val config: KafkaConfig,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {
quota: ReplicaQuota,
eventPattern: String = null): Seq[(TopicPartition, LogReadResult)] = {

def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
val offset = fetchInfo.fetchOffset
Expand Down Expand Up @@ -682,6 +690,48 @@ class ReplicaManager(val config: KafkaConfig,
FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)
}

println("***************************Go inside read*******************************");

var batchs = logReadInfo.records.batches().iterator()
while(batchs.hasNext()){
var currentBatch = batchs.next();
var records = currentBatch.streamingIterator(decompressionBufferSupplier);
while(records.hasNext())
{
var record = records.next();
var valueByteArray = Utils.toArray(record.key());
for( a <- 0 to (valueByteArray.length - 1)) {
println("********************************** record.key() = %d ************************".format(valueByteArray(a)));
}
}
}

while (batchs.hasNext()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset the batchs iterator first?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, should be. But I test this two separately and add them together, so this is not the issue cause get out key failed.

println("********************************** batchs.hasNext() ************************");
var currentBatch = batchs.next();
var records = currentBatch.streamingIterator(decompressionBufferSupplier);
while (records.hasNext()) {
println("********************************** record.hasNext() ************************");
var record = records.next();
var valueByteArray = Utils.toArray(record.key());
for (a <- 0 to (valueByteArray.length - 1)) {
println("********************************** record.key() = %d ************************".format(valueByteArray(a)));
}

var key = new String(valueByteArray, StandardCharsets.UTF_8);
println("********************************** record.key() = %s ************************".format(key));

if (eventPattern != null) {
println("***************************event pattern not equal to null*******************************");
var eventPatternRegex = eventPattern.r;
var records = logReadInfo.records.batches().iterator();
if (!eventPatternRegex.findFirstIn(key.toString).isDefined) {
records.remove();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need we store the final records to the result?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to double check and confirm this thing after resolve get out key failed issue.

}
}
}
}

LogReadResult(info = logReadInfo,
hw = initialHighWatermark,
leaderLogStartOffset = initialLogStartOffset,
Expand Down
20 changes: 20 additions & 0 deletions examples/bin/Windows-producer-consumer-demo.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M
"%~dp0..\..\bin\windows\kafka-run-class.bat" kafka.examples.KafkaConsumerProducerDemo %*
EndLocal
20 changes: 20 additions & 0 deletions examples/bin/Windows-simple-consumer-demo.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M
"%~dp0..\..\bin\windows\kafka-run-class.bat" kafka.examples.SimpleConsumerDemo %*
EndLocal
2 changes: 1 addition & 1 deletion examples/src/main/java/kafka/examples/KafkaProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class KafkaProperties {
public static final int CONNECTION_TIMEOUT = 100000;
public static final String TOPIC2 = "topic2";
public static final String TOPIC3 = "topic3";
public static final String CLIENT_ID = "SimpleConsumerDemoClient";
public static final String CLIENT_ID = "SimpleConsumerDemoClient_1";

private KafkaProperties() {}
}
2 changes: 1 addition & 1 deletion examples/src/main/java/kafka/examples/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public Producer(String topic, Boolean isAsync) {

public void run() {
int messageNo = 1;
while (true) {
while (messageNo < 5) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
Expand Down
4 changes: 4 additions & 0 deletions examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Modify for test
*/
package kafka.examples;

import kafka.api.FetchRequest;
Expand Down