Skip to content

Commit

Permalink
Issue 2006: Move StreamCut to client/stream. (pravega#2342)
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep <[email protected]>
  • Loading branch information
shrids authored and fpj committed Mar 1, 2018
1 parent e2e3165 commit 20774c5
Show file tree
Hide file tree
Showing 17 changed files with 127 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import io.pravega.client.admin.impl.StreamManagerImpl;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.StreamCut;
import io.pravega.client.stream.StreamCut;

import java.net.URI;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.pravega.client.stream.impl.Controller;
import io.pravega.client.stream.impl.ControllerImpl;
import io.pravega.client.stream.impl.ControllerImplConfig;
import io.pravega.client.stream.impl.StreamCut;
import io.pravega.client.stream.StreamCut;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.shared.NameUtils;
Expand Down
8 changes: 4 additions & 4 deletions client/src/main/java/io/pravega/client/batch/BatchClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface BatchClient {

/**
* Provides a list of segments and their metadata for a given stream.
*
*
* @param stream the stream
* @return The segments in the requested stream.
*/
Expand All @@ -53,19 +53,19 @@ public interface BatchClient {
/**
* Provides a SegmentIterator to read the events after the startingOffset in the requested
* segment ending at the current end of the segment.
*
*
* Offsets can be obtained by calling {@link SegmentIterator#getOffset()} or
* {@link SegmentInfo#getWriteOffset()}. There is no validation that the provided offset actually
* aligns to an event. If it does not, the deserializer will be passed corrupt data. This means
* that it is invalid to, for example, attempt to divide a segment by simply passing a starting
* offset that is half of the segment length.
*
*
* @param <T> The type of events written to the segment.
* @param segment The segment to read from
* @param deserializer A deserializer to be used to parse events
* @param startingOffset The offset to start iterating from.
* @return A SegmentIterator over the requested segment at startingOffset
*/
<T> SegmentIterator<T> readSegment(Segment segment, Serializer<T> deserializer, long startingOffset);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.impl.Controller;
import io.pravega.client.stream.impl.StreamCut;
import io.pravega.client.stream.impl.StreamCutImpl;
import io.pravega.client.stream.impl.StreamImpl;
import java.util.Date;
import java.util.Iterator;
Expand All @@ -49,7 +49,7 @@ public BatchClientImpl(Controller controller, ConnectionFactory connectionFactor
inputStreamFactory = new SegmentInputStreamFactoryImpl(controller, connectionFactory);
segmentMetadataClientFactory = new SegmentMetadataClientFactoryImpl(controller, connectionFactory);
}

private StreamInfo getStreamInfo(Stream stream) {
// TODO: Implement this method and make it public
// Name from stream
Expand All @@ -72,7 +72,7 @@ private Iterator<SegmentInfo> listSegments(Stream stream, Date from) {
RuntimeException::new);
SortedSet<Segment> result = new TreeSet<>();
result.addAll(segments.keySet());
result.addAll(getAndHandleExceptions(controller.getSuccessors(new StreamCut(stream, segments)),
result.addAll(getAndHandleExceptions(controller.getSuccessors(new StreamCutImpl(stream, segments)),
RuntimeException::new));
return Iterators.transform(result.iterator(), s -> segmentToInfo(s));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package io.pravega.client.stream;

import io.pravega.client.ClientFactory;
import io.pravega.client.stream.impl.StreamCut;
import io.pravega.client.stream.notifications.ReaderGroupNotificationListener;

import java.util.Map;
Expand Down
29 changes: 29 additions & 0 deletions client/src/main/java/io/pravega/client/stream/StreamCut.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.client.stream;

import io.pravega.client.stream.impl.StreamCutInternal;
import java.io.Serializable;

/**
* A set of segment/offset pairs for a single stream that represent a consistent position in the
* stream. (IE: Segment 1 and 2 will not both appear in the set if 2 succeeds 1, and if 0 appears
* and is responsible for keyspace 0-0.5 then other segments covering the range 0.5-1.0 will also be
* included.)
*/
public interface StreamCut extends Serializable {

/**
* Used internally. Do not call.
*
* @return Implementation of EventPointer interface
*/
StreamCutInternal asImpl();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -41,7 +42,7 @@ public class CheckpointImpl implements Checkpoint {
ImmutableMap.Builder<Stream, StreamCut> positionBuilder = ImmutableMap.builder();
for (Entry<Stream, Builder<Segment, Long>> streamPosition : streamPositions.entrySet()) {
positionBuilder.put(streamPosition.getKey(),
new StreamCut(streamPosition.getKey(), streamPosition.getValue().build()));
new StreamCutImpl(streamPosition.getKey(), streamPosition.getValue().build()));
}
this.positions = positionBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TxnFailedException;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.pravega.client.stream.PingFailedException;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TxnFailedException;
import io.pravega.common.Exceptions;
Expand Down Expand Up @@ -302,7 +303,7 @@ public CompletableFuture<Boolean> updateStream(final StreamConfiguration streamC

@Override
public CompletableFuture<Boolean> truncateStream(final String scope, final String stream, final StreamCut streamCut) {
return truncateStream(scope, stream, streamCut.getPositions().entrySet()
return truncateStream(scope, stream, streamCut.asImpl().getPositions().entrySet()
.stream().collect(Collectors.toMap(x -> x.getKey().getSegmentNumber(), Map.Entry::getValue)));
}

Expand Down Expand Up @@ -602,14 +603,14 @@ public CompletableFuture<StreamSegmentsWithPredecessors> getSuccessors(Segment s
@Override
public CompletableFuture<Set<Segment>> getSuccessors(StreamCut from) {
Exceptions.checkNotClosed(closed.get(), this);
Stream stream = from.getStream();
Stream stream = from.asImpl().getStream();
long traceId = LoggerHelpers.traceEnter(log, "getSuccessorsFromCut", stream);
HashSet<Segment> unread = new HashSet<>(from.getPositions().keySet());
HashSet<Segment> unread = new HashSet<>(from.asImpl().getPositions().keySet());
val currentSegments = getAndHandleExceptions(getCurrentSegments(stream.getScope(), stream.getStreamName()),
RuntimeException::new);
unread.addAll(computeKnownUnreadSegments(currentSegments, from));
unread.addAll(computeKnownUnreadSegments(currentSegments, from));
ArrayDeque<Segment> toFetchSuccessors = new ArrayDeque<>();
for (Segment toFetch : from.getPositions().keySet()) {
for (Segment toFetch : from.asImpl().getPositions().keySet()) {
if (!unread.contains(toFetch)) {
toFetchSuccessors.add(toFetch);
}
Expand All @@ -629,16 +630,16 @@ public CompletableFuture<Set<Segment>> getSuccessors(StreamCut from) {
LoggerHelpers.traceLeave(log, "getSuccessorsFromCut", traceId);
return CompletableFuture.completedFuture(unread);
}

private List<Segment> computeKnownUnreadSegments(StreamSegments currentSegments, StreamCut from) {
int highestCut = from.getPositions().keySet().stream().mapToInt(s -> s.getSegmentNumber()).max().getAsInt();
int highestCut = from.asImpl().getPositions().keySet().stream().mapToInt(s -> s.getSegmentNumber()).max().getAsInt();
int lowestCurrent = currentSegments.getSegments().stream().mapToInt(s -> s.getSegmentNumber()).min().getAsInt();
if (highestCut >= lowestCurrent) {
return Collections.emptyList();
}
List<Segment> result = new ArrayList<>(lowestCurrent - highestCut);
for (int num = highestCut + 1; num < lowestCurrent; num++) {
result.add(new Segment(from.getStream().getScope(), from.getStream().getStreamName(), num));
result.add(new Segment(from.asImpl().getStream().getScope(), from.asImpl().getStream().getStreamName(), num));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.pravega.client.stream.ReaderGroupMetrics;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.ReaderGroupState.ClearCheckpoints;
import io.pravega.client.stream.impl.ReaderGroupState.CreateCheckpoint;
import io.pravega.client.stream.impl.ReaderGroupState.ReaderGroupStateInit;
Expand Down Expand Up @@ -167,7 +168,7 @@ public void resetReadersToCheckpoint(Checkpoint checkpoint) {
ReaderGroupConfig config = state.getConfig();
Map<Segment, Long> positions = new HashMap<>();
for (StreamCut cut : checkpoint.asImpl().getPositions().values()) {
positions.putAll(cut.getPositions());
positions.putAll(cut.asImpl().getPositions());
}
return Collections.singletonList(new ReaderGroupStateInit(config, positions));
});
Expand Down Expand Up @@ -208,7 +209,7 @@ private long getUnreadBytes(Map<Stream, Map<Segment, Long>> positions, SegmentMe
log.debug("Compute unread bytes from position {}", positions);
long totalLength = 0;
for (Entry<Stream, Map<Segment, Long>> streamPosition : positions.entrySet()) {
StreamCut position = new StreamCut(streamPosition.getKey(), streamPosition.getValue());
StreamCut position = new StreamCutImpl(streamPosition.getKey(), streamPosition.getValue());
totalLength += getRemainingBytes(metaFactory, position);
}
return totalLength;
Expand All @@ -222,7 +223,7 @@ private long getRemainingBytes(SegmentMetadataClientFactory metaFactory, StreamC
SegmentMetadataClient metadataClient = metaFactory.createSegmentMetadataClient(s);
totalLength += metadataClient.fetchCurrentSegmentLength();
}
for (long bytesRead : position.getPositions().values()) {
for (long bytesRead : position.asImpl().getPositions().values()) {
totalLength -= bytesRead;
}
log.debug("Remaining bytes after position: {} is {}", position, totalLength);
Expand Down Expand Up @@ -251,7 +252,7 @@ public Map<Stream, StreamCut> getStreamCuts() {
HashMap<Stream, StreamCut> cuts = new HashMap<>();

for (Entry<Stream, Map<Segment, Long>> streamPosition : positions.entrySet()) {
StreamCut position = new StreamCut(streamPosition.getKey(), streamPosition.getValue());
StreamCut position = new StreamCutImpl(streamPosition.getKey(), streamPosition.getValue());
cuts.put(streamPosition.getKey(), position);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,46 @@
import com.google.common.annotations.VisibleForTesting;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.stream.Stream;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* A set of segment/offset pairs for a single stream that represent a consistent position in the
* stream. (IE: Segment 1 and 2 will not both appear in the set if 2 succeeds 1, and if 0 appears
* and is responsible for keyspace 0-0.5 then other segments covering the range 0.5-1.0 will also be
* included.)
* Implementation of {@link io.pravega.client.stream.StreamCut} interface. {@link StreamCutInternal} abstract class is
* used as in intermediate class to make StreamCut instances opaque.
*/
@Data
public class StreamCut implements Serializable {
@EqualsAndHashCode(callSuper = false)
@ToString
public class StreamCutImpl extends StreamCutInternal {
private static final long serialVersionUID = 1L;

private final Stream stream;
@Getter(value = AccessLevel.PACKAGE)

private final Map<Segment, Long> positions;

public StreamCutImpl(Stream stream, Map<Segment, Long> positions) {
this.stream = stream;
this.positions = positions;
}

@Override
public Map<Segment, Long> getPositions() {
return Collections.unmodifiableMap(positions);
}

@Override
public Stream getStream() {
return stream;
}

@Override
public StreamCutInternal asImpl() {
return this;
}

@VisibleForTesting
public boolean validate(Set<String> segmentNames) {
for (Segment s: positions.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.client.stream.impl;

import io.pravega.client.segment.impl.Segment;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import java.util.Map;

/**
* This is an abstract class which acts an intermediate class to make the actual StreamCut implementation opaque.
*/
public abstract class StreamCutInternal implements StreamCut {

/**
* Get {@link Stream} for the StreamCut.
* @return The stream.
*/
public abstract Stream getStream();

/**
* Get a mapping of Segment and its offset.
* @return Map of Segment to its offset.
*/
public abstract Map<Segment, Long> getPositions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.Transaction;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
Expand Down Expand Up @@ -1121,7 +1122,7 @@ public void testCutpointSuccessors() throws Exception {
Map<Segment, Long> segments = new HashMap<>();
segments.put(new Segment(scope, stream, 0), 4L);
segments.put(new Segment(scope, stream, 1), 6L);
StreamCut cut = new StreamCut(s, segments);
StreamCut cut = new StreamCutImpl(s, segments);
Set<Segment> successors = controllerClient.getSuccessors(cut).get();
assertEquals(ImmutableSet.of(new Segment(scope, stream, 0), new Segment(scope, stream, 1),
new Segment(scope, stream, 2), new Segment(scope, stream, 3),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.pravega.client.stream.impl.CancellableRequest;
import io.pravega.client.stream.impl.ConnectionClosedException;
import io.pravega.client.stream.impl.Controller;
import io.pravega.client.stream.impl.StreamCut;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.client.stream.impl.StreamSegments;
import io.pravega.client.stream.impl.StreamSegmentsWithPredecessors;
Expand Down Expand Up @@ -407,7 +407,7 @@ public CompletableFuture<StreamSegmentsWithPredecessors> getSuccessors(Segment s

@Override
public CompletableFuture<Set<Segment>> getSuccessors(StreamCut from) {
StreamConfiguration configuration = createdStreams.get(from.getStream());
StreamConfiguration configuration = createdStreams.get(from.asImpl().getStream());
if (configuration.getScalingPolicy().getScaleType() != ScalingPolicy.ScaleType.FIXED_NUM_SEGMENTS) {
throw new IllegalArgumentException("getSuccessors not supported with dynamic scaling on mock controller");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.netty.impl.ConnectionFactoryImpl;
import io.pravega.client.stream.impl.StreamCut;
import io.pravega.client.stream.StreamCut;
import io.pravega.common.concurrent.Futures;
import io.pravega.shared.NameUtils;
import io.pravega.client.state.SynchronizerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
import io.pravega.client.stream.PingFailedException;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.impl.CancellableRequest;
import io.pravega.client.stream.impl.Controller;
import io.pravega.client.stream.impl.ControllerFailureException;
import io.pravega.client.stream.impl.ModelHelper;
import io.pravega.client.stream.impl.SegmentWithRange;
import io.pravega.client.stream.impl.StreamCut;
import io.pravega.client.stream.impl.StreamSegments;
import io.pravega.client.stream.impl.StreamSegmentsWithPredecessors;
import io.pravega.client.stream.impl.TxnSegments;
Expand Down
Loading

0 comments on commit 20774c5

Please sign in to comment.