forked from pravega/pravega-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathStreamCutsExample.java
343 lines (305 loc) · 16.4 KB
/
StreamCutsExample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
/*
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*/
package io.pravega.example.streamcuts;
import com.google.common.collect.Lists;
import io.pravega.client.ClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.batch.BatchClient;
import io.pravega.client.batch.SegmentRange;
import io.pravega.client.batch.StreamSegmentsIterator;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReinitializationRequiredException;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.JavaSerializer;
import java.io.Closeable;
import java.net.URI;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Consumer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class StreamCutsExample implements Closeable {
public static final int maxEventsPerDay = 3;
private static final String eventSeparator = ":";
private static final String streamSeparator = "-";
private int numStreams;
private int numEvents;
private URI controllerURI;
private String scope;
private ScheduledExecutorService executor;
private StreamManager streamManager;
private List<String> myStreamNames = new ArrayList<>();
private Map<String, SimpleEntry<Integer, Integer>> perDayEventIndex = new LinkedHashMap<>();
public StreamCutsExample(int numStreams, int numEvents, String scope, URI controllerURI) {
this.numStreams = numStreams;
this.numEvents = numEvents;
this.controllerURI = controllerURI;
this.scope = scope;
streamManager = StreamManager.create(controllerURI);
executor = new ScheduledThreadPoolExecutor(1);
}
/**
* A {@link StreamCut} is a collection of offsets, one for each open segment of the {@link Stream}, which indicates
* an event boundary. With a {@link StreamCut}, users can instruct readers to read from and/or up to a particular
* event boundary (e.g., read events from 100 to 200, events created since Tuesday) on multiple {@link Stream}s. To
* this end, Pravega allows us to create {@link StreamCut}s while readers are processing a {@link Stream} (e.g., via
* a {@link Checkpoint}) that can be used in the future to bound the processing of a set of {@link Stream}s. In this
* method, we read create two {@link StreamCut}s for a {@link Stream} according to the initial and final event
* indexes passed by parameter.
*
* @param streamName Name of the {@link Stream} from which {@link StreamCut}s will be created.
* @param iniEventIndex Index of the initial boundary for the {@link Stream} slice to process.
* @param endEventIndex Index of the final boundary for the {@link Stream} slice to process.
* @return Initial and final {@link Stream} boundaries represented as {@link StreamCut}s.
*/
public List<StreamCut> createStreamCutsByIndexFor(String streamName, int iniEventIndex, int endEventIndex) {
// Create the StreamCuts for the streams.
final List<StreamCut> streamCuts = new ArrayList<>();
final String randomId = String.valueOf(new Random(System.nanoTime()).nextInt());
// Free resources after execution.
try (ReaderGroupManager manager = ReaderGroupManager.withScope(scope, controllerURI);
ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI)) {
// Create a reader group and a reader to read from the stream.
final String readerGroupName = streamName + randomId;
ReaderGroupConfig config = ReaderGroupConfig.builder().stream(Stream.of(scope, streamName)).build();
manager.createReaderGroup(readerGroupName, config);
@Cleanup
ReaderGroup readerGroup = manager.getReaderGroup(readerGroupName);
@Cleanup
EventStreamReader<String> reader = clientFactory.createReader(randomId, readerGroup.getGroupName(),
new JavaSerializer<>(), ReaderConfig.builder().build());
// Read streams and create the StreamCuts during the read process.
Checkpoint checkpoint;
int eventIndex = 0;
EventRead<String> event;
do {
// Here is where we create a StreamCut that points to the event indicated by the user.
if (eventIndex == iniEventIndex || eventIndex == endEventIndex) {
reader.close();
checkpoint = readerGroup.initiateCheckpoint(randomId + eventIndex, executor).join();
streamCuts.add(checkpoint.asImpl().getPositions().values().iterator().next());
reader = clientFactory.createReader(randomId, readerGroup.getGroupName(),
new JavaSerializer<>(), ReaderConfig.builder().build());
}
event = reader.readNextEvent(1000);
eventIndex++;
} while (event.isCheckpoint() || event.getEvent() != null);
// If there is only the initial StreamCut, this means that the final one is the tail of the stream.
if (streamCuts.size() == 1) {
streamCuts.add(StreamCut.UNBOUNDED);
}
} catch (ReinitializationRequiredException e) {
// We do not expect this Exception from the reader in this situation, so we leave.
log.error("Non-expected reader re-initialization.");
}
return streamCuts;
}
/**
* This method is an example of bounded processing in Pravega with {@link StreamCut}s. {@link ReaderGroupConfig}
* contains the information related to the {@link Stream}s to be read as well as the (optional) user-defined
* boundaries in the form of {@link StreamCut}s that will limit the events to be read by reader processes. Note that
* event readers (i.e., {@link EventStreamReader}) are agnostic to any notion of boundaries and they do not interact
* with {@link StreamCut}s; they only consume events, which will be bounded within specific {@link Stream} slices as
* configured in {@link ReaderGroupConfig}. The method basically creates a string representation of the events read
* from {@link Stream}s within the bounds defined in the configuration parameter.
*
* @param config Configuration for the {@link ReaderGroup}, possibly containing {@link StreamCut} boundaries for
* limiting the number of events to read.
* @return String representation of the events read by the reader.
*/
public String printBoundedStreams(ReaderGroupConfig config) {
StringBuilder result = new StringBuilder();
final String randomId = String.valueOf(new Random(System.nanoTime()).nextInt());
try (ReaderGroupManager manager = ReaderGroupManager.withScope(scope, controllerURI);
ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI)) {
final String readerGroupName = "RG" + randomId;
manager.createReaderGroup(readerGroupName, config);
@Cleanup
EventStreamReader<String> reader = clientFactory.createReader(randomId, readerGroupName,
new JavaSerializer<>(), ReaderConfig.builder().build());
// Write dummy events that identify each Stream.
EventRead<String> event;
do {
event = reader.readNextEvent(1000);
if (event.getEvent() != null) {
result = result.append(event.getEvent()).append('|');
}
} while (event.isCheckpoint() || event.getEvent() != null);
result = result.append('\n');
} catch (ReinitializationRequiredException e) {
// We do not expect this Exception from the reader in this situation, so we leave.
log.error("Non-expected reader re-initialization.");
}
return result.toString();
}
/**
* A good use-case for {@link StreamCut}s is to allow efficient batch processing of data events within specific
* boundaries (e.g., perform a mean on the temperature values in 1986). Instead of ingesting all the data and force
* the reader to discard irrelevant events, {@link StreamCut}s help readers to only read the events that are
* important for a particular task. In this sense, this method enables the Pravega {@link BatchClient} to read from
* various {@link Stream}s within the specific ranges passed as input, and the sum up all the values contained in
* read events.
*
* @param streamCuts Map that defines the slices to read of a set of {@link Stream}s.
* @return Sum of all the values of time series data belonging to {@link Stream}s and bounded by {@link StreamCut}s.
*/
public int sumBoundedStreams(Map<Stream, List<StreamCut>> streamCuts) {
int totalSumValuesInDay = 0;
try (ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI)) {
final BatchClient batchClient = clientFactory.createBatchClient();
for (Stream myStream: streamCuts.keySet()) {
// Get the cuts for this stream that will bound the number of events to read.
final StreamCut startStreamCut = streamCuts.get(myStream).get(0);
final StreamCut endStreamCut = streamCuts.get(myStream).get(1);
// Then, we get the segment ranges according to the StreamCuts.
StreamSegmentsIterator segments = batchClient.getSegments(myStream, startStreamCut, endStreamCut);
List<SegmentRange> ranges = Lists.newArrayList(segments.getIterator());
// We basically sum up all the values of events within the ranges.
for (SegmentRange range: ranges) {
List<String> eventData = Lists.newArrayList(batchClient.readSegment(range, new JavaSerializer<>()));
totalSumValuesInDay += eventData.stream().map(s -> s.split(eventSeparator)[2]).mapToInt(Integer::valueOf).sum();
}
}
}
return totalSumValuesInDay;
}
// Region stream utils
public void createAndPopulateStreamsWithNumbers() {
Consumer<SimpleEntry> consumer = this::numericDataEvents;
createAndPopulateStreams(consumer);
}
public void createAndPopulateStreamsWithDataSeries() {
Consumer<SimpleEntry> consumer = this::dataSeriesEvents;
createAndPopulateStreams(consumer);
}
public SimpleEntry<Integer, Integer> getStreamEventIndexesForDay(String streamName, int day) {
return perDayEventIndex.get(getStreamDayKey(streamName, day));
}
/**
* This method first creates the scope that will contain the streams to write and read events.
*/
public void createAndPopulateStreams(Consumer<SimpleEntry> createDataEvents) {
// Create the scope in first place, before creating the Streams.
streamManager.createScope(scope);
// Create Streams and write dummy events in them.
for (char streamId = 'a'; streamId < 'a' + numStreams; streamId++) {
String streamName = String.valueOf(streamId) + streamSeparator + System.nanoTime();
myStreamNames.add(streamName);
StreamConfiguration streamConfig = StreamConfiguration.builder().scalingPolicy(ScalingPolicy.fixed(1)).build();
streamManager.createStream(scope, streamName, streamConfig);
// Note that we use the try-with-resources statement for those classes that should be closed after usage.
try (ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI);
EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName,
new JavaSerializer<>(), EventWriterConfig.builder().build())) {
// Write data to the streams according to our preferences
final SimpleEntry<EventStreamWriter<String>, String> writerAndStreamName = new SimpleEntry<>(writer, streamName);
createDataEvents.accept(writerAndStreamName);
}
}
}
public void numericDataEvents(SimpleEntry<EventStreamWriter<String>, String> writerAndStreamName) {
// Write dummy events that identify each Stream.
StringBuilder sb = new StringBuilder();
char streamBaseId = writerAndStreamName.getValue().charAt(0);
for (int j = 0; j < numEvents; j++) {
writerAndStreamName.getKey().writeEvent(sb.append(streamBaseId).append(j).toString()).join();
sb.setLength(0);
}
}
public void dataSeriesEvents(SimpleEntry<EventStreamWriter<String>, String> writerAndStreamName) {
StringBuilder sb = new StringBuilder();
Random random = new Random();
int totalEventsSoFar = 0;
char streamBaseId = writerAndStreamName.getValue().charAt(0);
for (int i = 0; i < numEvents; i++) {
final String daySuffix = eventSeparator + "day" + i;
int eventsPerDay = random.nextInt(maxEventsPerDay);
int lastDayEventIndex;
// Write events specifying the day they belong to and the value in their content.
for (lastDayEventIndex = 0; lastDayEventIndex < eventsPerDay; lastDayEventIndex++) {
writerAndStreamName.getKey().writeEvent(sb.append(streamBaseId)
.append(daySuffix)
.append(eventSeparator)
.append(random.nextInt(20)).toString()).join();
sb.setLength(0);
}
// Record the event indexes of events for day currentDayNumber
if (lastDayEventIndex > 0) {
perDayEventIndex.put(writerAndStreamName.getValue() + daySuffix,
new SimpleEntry<>(totalEventsSoFar, totalEventsSoFar + lastDayEventIndex));
totalEventsSoFar += lastDayEventIndex;
}
}
}
/**
* This method provides a print facility on the contents of all the {@link Stream}s.
*
* @return String containing the content of events for a specific {@link Stream}.
*/
public String printStreams() {
StringBuilder result = new StringBuilder();
for (String streamName: myStreamNames) {
ReaderGroupConfig config = ReaderGroupConfig.builder().stream(Stream.of(scope, streamName)).build();
result = result.append(printBoundedStreams(config));
}
return result.toString();
}
/**
* We delete all the {@link Stream}s created every example execution.
*/
public void deleteStreams() {
// Delete the streams for next execution.
for (String streamName: myStreamNames) {
try {
streamManager.sealStream(scope, streamName);
Thread.sleep(500);
streamManager.deleteStream(scope, streamName);
Thread.sleep(500);
} catch (InterruptedException e) {
log.error("Problem while sleeping current Thread in deleteStreams: {}.", e);
}
}
myStreamNames.clear();
perDayEventIndex.clear();
}
// End region stream utils
/**
* Close resources.
*/
public void close() {
streamManager.close();
executor.shutdown();
}
public List<String> getMyStreamNames() {
return myStreamNames;
}
private String getStreamDayKey (String streamName, int day) {
return streamName + eventSeparator + "day" + day;
}
}