diff --git a/src/main/java/io/pravega/perf/PravegaBatchReaderWorker.java b/src/main/java/io/pravega/perf/PravegaBatchReaderWorker.java new file mode 100644 index 000000000..8a9084f7b --- /dev/null +++ b/src/main/java/io/pravega/perf/PravegaBatchReaderWorker.java @@ -0,0 +1,76 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.pravega.perf; + +import io.pravega.client.BatchClientFactory; +import io.pravega.client.batch.SegmentIterator; +import io.pravega.client.batch.SegmentRange; +import io.pravega.client.stream.impl.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.List; + +public class PravegaBatchReaderWorker extends ReaderWorker { + private static Logger log = LoggerFactory.getLogger(PravegaBatchReaderWorker.class); + + private final BatchClientFactory batchClientFactory; + private final Iterator assignedSegments; + + private SegmentIterator currentSegmentIterator; + private SegmentRange currentRange; + private boolean finished; + + PravegaBatchReaderWorker(int readerId, int events, int secondsToRun, long start, PerfStats stats, String readerGrp, int timeout, boolean writeAndRead, BatchClientFactory batchClientFactory, List assignedSegments) { + super(readerId, events, secondsToRun, start, stats, readerGrp, timeout, writeAndRead); + this.batchClientFactory = batchClientFactory; + + this.assignedSegments = assignedSegments.iterator(); + } + + @Override + public byte[] readData() throws WorkerCompleteException { + if (finished) { + throw new WorkerCompleteException(workerID); + } + + if (currentSegmentIterator == null || !currentSegmentIterator.hasNext()) { + if (currentRange != null) { + currentSegmentIterator.close(); + + log.info("id:{} Completed Segment {}, {}({}:{})", workerID, currentRange.getStreamName(), currentRange.getSegmentId(), currentRange.getStartOffset(), currentRange.getEndOffset()); + } + + if (assignedSegments.hasNext()) { + + currentRange = assignedSegments.next(); + currentSegmentIterator = batchClientFactory.readSegment(currentRange, new ByteArraySerializer()); + + log.info("id:{} Starting Segment {}, {}({}:{})", workerID, currentRange.getStreamName(), currentRange.getSegmentId(), currentRange.getStartOffset(), currentRange.getEndOffset()); + } else { + log.info("id:{} Completed all assigned assignedSegments", workerID); + currentSegmentIterator = null; + finished = true; + return null; + } + } + + return currentSegmentIterator.next(); + } + + @Override + public void close() { + if (currentSegmentIterator != null) { + currentSegmentIterator.close(); + } + } +} diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java index 1ed3da6bd..b9213ec18 100644 --- a/src/main/java/io/pravega/perf/PravegaPerfTest.java +++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java @@ -10,8 +10,10 @@ package io.pravega.perf; +import io.pravega.client.BatchClientFactory; import io.pravega.client.ClientConfig; import io.pravega.client.EventStreamClientFactory; +import io.pravega.client.batch.SegmentRange; import io.pravega.client.stream.ReaderGroup; import io.pravega.client.stream.impl.ClientFactoryImpl; import io.pravega.client.stream.impl.ControllerImpl; @@ -23,10 +25,13 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -43,6 +48,8 @@ * Data format is in comma separated format as following: {TimeStamp, Sensor Id, Location, TempValue }. */ public class PravegaPerfTest { + private static Logger log = LoggerFactory.getLogger(PravegaPerfTest.class); + final static String BENCHMARKNAME = "pravega-benchmark"; public static void main(String[] args) { @@ -88,6 +95,8 @@ public static void main(String[] args) { "If -1 (default), watermarks will not be read.\n" + "If >0, watermarks will be read with a period of this many milliseconds."); + options.addOption("batchreaders", false, "Use batch readers rather than Streaming readers for consumers"); + options.addOption("help", false, "Help message"); parser = new DefaultParser(); @@ -186,7 +195,7 @@ static private abstract class Test { final boolean recreate; final boolean writeAndRead; final int producerCount; - final int consumerCount; + int consumerCount; final int segmentCount; final int events; final int eventsPerSec; @@ -204,6 +213,7 @@ static private abstract class Test { final boolean enableConnectionPooling; final long writeWatermarkPeriodMillis; final long readWatermarkPeriodMillis; + final boolean batchReaders; Test(long startTime, CommandLine commandline) throws IllegalArgumentException { this.startTime = startTime; @@ -303,6 +313,8 @@ static private abstract class Test { readFile = null; } + batchReaders = commandline.hasOption("batchreaders"); + enableConnectionPooling = Boolean.parseBoolean(commandline.getOptionValue("enableConnectionPooling", "true")); writeWatermarkPeriodMillis = Long.parseLong(commandline.getOptionValue("writeWatermarkPeriodMillis", "-1")); @@ -403,6 +415,7 @@ static private class PravegaTest extends Test { final PravegaStreamHandler streamHandle; final EventStreamClientFactory factory; final ReaderGroup readerGroup; + final BatchClientFactory batchClientFactory; PravegaTest(long startTime, CommandLine commandline) throws IllegalArgumentException, URISyntaxException, InterruptedException, Exception { @@ -426,8 +439,15 @@ static private class PravegaTest extends Test { } } if (consumerCount > 0) { - readerGroup = streamHandle.createReaderGroup(!writeAndRead); + if (batchReaders) { + batchClientFactory = streamHandle.newBatchClientFactory(); + readerGroup = null; + } else { + batchClientFactory = null; + readerGroup = streamHandle.createReaderGroup(!writeAndRead); + } } else { + batchClientFactory = null; readerGroup = null; } @@ -468,11 +488,43 @@ public List getProducers() { } public List getConsumers() throws URISyntaxException { + return batchReaders ? getBatchConsumers() : getStreamingConsumers(); + } + + public List getBatchConsumers() { + final List readers; + if (consumerCount > 0) { + List segmentRanges = streamHandle.getBatchSegmentRanges(batchClientFactory); + + if (consumerCount > segmentRanges.size()) { + consumerCount = segmentRanges.size(); + + log.info("Limiting To {} consumers due to small number of segment ranges", consumerCount); + } + + List> assignedRanges = assignSegmentsToConsumers(segmentRanges, consumerCount); + + readers = IntStream.range(0, consumerCount) + .boxed() + .map(i -> + new PravegaBatchReaderWorker(i, eventsPerConsumer, + runtimeSec, startTime, consumeStats, + rdGrpName, TIMEOUT, writeAndRead, batchClientFactory, assignedRanges.get(i)) + ) + .collect(Collectors.toList()); + } else { + readers = null; + } + return readers; + + } + + public List getStreamingConsumers() throws URISyntaxException { final List readers; if (consumerCount > 0) { readers = IntStream.range(0, consumerCount) .boxed() - .map(i -> new PravegaReaderWorker(i, eventsPerConsumer, + .map(i -> new PravegaStreamingReaderWorker(i, eventsPerConsumer, runtimeSec, startTime, consumeStats, rdGrpName, TIMEOUT, writeAndRead, factory, io.pravega.client.stream.Stream.of(scopeName, streamName), @@ -491,5 +543,33 @@ public void closeReaderGroup() { } } + /** + * Chunks the list of segment ranges between the number consumers. If the number of segment ranges is + * divisible by the number of consumers then each consumer will recieve an equal number of segments, otherwise + * some consumers may receive more segments than others. + * + * @return A list of lists, each list representing the segments assigned to that consumer + */ + private List> assignSegmentsToConsumers(List segmentRanges, int consumers) { + List> results = new ArrayList<>(); + for (int f=0; f < consumers; f++) { + results.add(new ArrayList<>()); + } + + for (int f=0;f < segmentRanges.size(); f++) { + int consumerId = f % consumers; + + if (results.size() < f) { + results.add(new ArrayList<>()); + } + + SegmentRange segmentRange = segmentRanges.get(f); + results.get(consumerId).add(segmentRange); + + log.info("Segment Assignment {} -> {} {}({}:{})", consumerId, segmentRange.getStreamName(), segmentRange.getSegmentId(), segmentRange.getStartOffset(), segmentRange.getEndOffset()); + } + + return results; + } } } diff --git a/src/main/java/io/pravega/perf/PravegaStreamHandler.java b/src/main/java/io/pravega/perf/PravegaStreamHandler.java index daeca1c94..9c8975c9a 100644 --- a/src/main/java/io/pravega/perf/PravegaStreamHandler.java +++ b/src/main/java/io/pravega/perf/PravegaStreamHandler.java @@ -11,8 +11,10 @@ package io.pravega.perf; import java.net.URI; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Spliterators; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -21,7 +23,12 @@ import java.net.URISyntaxException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.stream.StreamSupport; +import com.google.common.collect.Streams; +import io.pravega.client.BatchClientFactory; +import io.pravega.client.batch.SegmentRange; +import io.pravega.client.stream.StreamCut; import io.pravega.client.stream.impl.ControllerImpl; import io.pravega.client.admin.StreamManager; import io.pravega.client.stream.StreamConfiguration; @@ -139,6 +146,8 @@ void recreate() throws InterruptedException, ExecutionException, TimeoutExceptio } } + + ReaderGroup createReaderGroup(boolean reset) throws URISyntaxException { if (readerGroupManager == null) { readerGroupManager = ReaderGroupManager.withScope(scope, @@ -154,6 +163,18 @@ ReaderGroup createReaderGroup(boolean reset) throws URISyntaxException { return rdGroup; } + public BatchClientFactory newBatchClientFactory() { + ClientConfig clientConfig = ClientConfig.builder().controllerURI(URI.create(controllerUri)).build(); + return BatchClientFactory.withScope(scope, clientConfig); + } + + public List getBatchSegmentRanges(BatchClientFactory batchFactory) { + Iterator segmentRangeIterator = batchFactory.getSegments(Stream.of(scope, stream), StreamCut.UNBOUNDED, StreamCut.UNBOUNDED) + .getIterator(); + + return Streams.stream(segmentRangeIterator).collect(Collectors.toList()); + } + void deleteReaderGroup() { try { readerGroupManager.deleteReaderGroup(rdGrpName); diff --git a/src/main/java/io/pravega/perf/PravegaReaderWorker.java b/src/main/java/io/pravega/perf/PravegaStreamingReaderWorker.java similarity index 83% rename from src/main/java/io/pravega/perf/PravegaReaderWorker.java rename to src/main/java/io/pravega/perf/PravegaStreamingReaderWorker.java index 7c787049f..5540975ff 100644 --- a/src/main/java/io/pravega/perf/PravegaReaderWorker.java +++ b/src/main/java/io/pravega/perf/PravegaStreamingReaderWorker.java @@ -27,8 +27,8 @@ /** * Class for Pravega reader/consumer. */ -public class PravegaReaderWorker extends ReaderWorker { - private static Logger log = LoggerFactory.getLogger(PravegaReaderWorker.class); +public class PravegaStreamingReaderWorker extends ReaderWorker { + private static Logger log = LoggerFactory.getLogger(PravegaStreamingReaderWorker.class); private final EventStreamReader reader; private final Stream stream; @@ -38,10 +38,11 @@ public class PravegaReaderWorker extends ReaderWorker { * * @param readWatermarkPeriodMillis If >0, watermarks will be read with a period of this many milliseconds. */ - PravegaReaderWorker(int readerId, int events, int secondsToRun, - long start, PerfStats stats, String readergrp, - int timeout, boolean writeAndRead, EventStreamClientFactory factory, - Stream stream, long readWatermarkPeriodMillis) { + + PravegaStreamingReaderWorker(int readerId, int events, int secondsToRun, + long start, PerfStats stats, String readergrp, + int timeout, boolean writeAndRead, EventStreamClientFactory factory, + Stream stream, long readWatermarkPeriodMillis) { super(readerId, events, secondsToRun, start, stats, readergrp, timeout, writeAndRead); final String readerSt = Integer.toString(readerId); diff --git a/src/main/java/io/pravega/perf/ReaderWorker.java b/src/main/java/io/pravega/perf/ReaderWorker.java index f6d1d8b19..3778ba1a0 100644 --- a/src/main/java/io/pravega/perf/ReaderWorker.java +++ b/src/main/java/io/pravega/perf/ReaderWorker.java @@ -12,7 +12,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -47,7 +46,7 @@ private Performance createBenchmark() { /** * read the data. */ - public abstract byte[] readData(); + public abstract byte[] readData() throws WorkerCompleteException; /** * close the consumer/reader. @@ -78,6 +77,7 @@ public void EventsReader() throws IOException { i++; } } + } catch(WorkerCompleteException ignore) { } finally { close(); } @@ -100,6 +100,7 @@ public void EventsReaderRW() throws IOException { i++; } } + } catch(WorkerCompleteException ignore) { } finally { close(); } @@ -119,6 +120,7 @@ public void EventsTimeReader() throws IOException { stats.recordTime(time, System.currentTimeMillis(), ret.length); } } + } catch(WorkerCompleteException ignore) { } finally { close(); } @@ -141,6 +143,7 @@ public void EventsTimeReaderRW() throws IOException { stats.recordTime(start, time, ret.length); } } + } catch(WorkerCompleteException ignore) { } finally { close(); } diff --git a/src/main/java/io/pravega/perf/WorkerCompleteException.java b/src/main/java/io/pravega/perf/WorkerCompleteException.java new file mode 100644 index 000000000..abff25935 --- /dev/null +++ b/src/main/java/io/pravega/perf/WorkerCompleteException.java @@ -0,0 +1,31 @@ +/** + * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.pravega.perf; + +/** + * Signals that a worker has completed all it's assigned work + */ +public class WorkerCompleteException extends Exception { + private final int workerId; + + public WorkerCompleteException(int workerId) { + this.workerId = workerId; + } + + public int getWorkerId() { + return workerId; + } + + @Override + public String toString() { + return "Worker " + workerId + " Complete"; + } +}