Skip to content

Commit

Permalink
Issue 3274: (SegmentStore) Table Segment Compaction (pravega#3567)
Browse files Browse the repository at this point in the history
* Implements Table Segment Compaction

* Defines two more Table Segment Attributes that aid
in compaction: one for tracking utilization and one for
tracking compaction progress.

* Compaction is not enabled for any segment yet (all
have MIN_UTILIZATION set to 0, which means no
compaction). This will change in a future PR.

Signed-off-by: Andrei Paduroiu <[email protected]>
  • Loading branch information
andreipaduroiu authored and fpj committed Apr 12, 2019
1 parent 63f63aa commit 23be70a
Show file tree
Hide file tree
Showing 30 changed files with 2,401 additions and 292 deletions.
11 changes: 8 additions & 3 deletions common/src/main/java/io/pravega/common/io/StreamHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import com.google.common.base.Preconditions;
import io.pravega.common.Exceptions;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

Expand Down Expand Up @@ -54,13 +55,17 @@ public static int readAll(InputStream stream, byte[] target, int startOffset, in
* @param source The InputStream to read.
* @param length The number of bytes to read.
* @return A byte array containing the contents of the Stream.
* @throws IOException If unable to read from the given InputStream.
* @throws IOException If unable to read from the given InputStream. Throws {@link EOFException} if the number of bytes
* remaining in the InputStream is less than length.
*/
public static byte[] readAll(InputStream source, int length) throws IOException {
byte[] ret = new byte[length];
int readBytes = readAll(source, ret, 0, ret.length);
Preconditions.checkArgument(readBytes == ret.length,
"Invalid value for length (%s). Was only able to read %s bytes from the given InputStream.", ret.length, readBytes);
if (readBytes < ret.length) {
throw new EOFException(String.format(
"Was only able to read %d bytes, which is less than the requested length of %d.", readBytes, ret.length));
}

return ret;
}
}
30 changes: 30 additions & 0 deletions common/src/main/java/io/pravega/common/util/BitConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,36 @@ public static long readLong(byte[] source, int position) {
| (source[position + 7] & 0xFF);
}

/**
* Reads a 64-bit long from the given InputStream that was encoded using BitConverter.writeLong.
*
* @param source The InputStream to read from.
* @return The read number.
* @throws IOException If an exception got thrown.
*/
public static long readLong(InputStream source) throws IOException {
int b1 = source.read();
int b2 = source.read();
int b3 = source.read();
int b4 = source.read();
int b5 = source.read();
int b6 = source.read();
int b7 = source.read();
int b8 = source.read();
if ((b1 | b2 | b3 | b4 | b5 | b6 | b7 | b8) < 0) {
throw new EOFException();
} else {
return ((long) b1 << 56) +
((long) (b2 & 255) << 48) +
((long) (b3 & 255) << 40) +
((long) (b4 & 255) << 32) +
((long) (b5 & 255) << 24) +
(long) ((b6 & 255) << 16) +
(long) ((b7 & 255) << 8) +
(long) ((b8 & 255));
}
}

/**
* Writes the given 64-bit Unsigned Long to the given byte array at the given offset. This value can then be
* deserialized using {@link #readUnsignedLong}. This method is not interoperable with {@link #readLong}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package io.pravega.common.io;

import io.pravega.test.common.AssertExtensions;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import org.junit.Assert;
Expand Down Expand Up @@ -64,7 +65,7 @@ public void testReadAllNewArray() throws IOException {
AssertExtensions.assertThrows(
"readAll accepted a length higher than the given input stream length.",
() -> StreamHelpers.readAll(new TestInputStream(buffer), buffer.length + 1),
ex -> ex instanceof IllegalArgumentException);
ex -> ex instanceof EOFException);
}

private static class TestInputStream extends InputStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public class TableAttributes extends Attributes {
*/
public static final UUID TOTAL_ENTRY_COUNT = new UUID(CORE_ATTRIBUTE_ID_PREFIX, TABLE_ATTRIBUTES_START_OFFSET + 3);

/**
* Defines an attribute that is used to store the offset of a (Table) Segment where compaction has last run at.
*/
public static final UUID COMPACTION_OFFSET = new UUID(CORE_ATTRIBUTE_ID_PREFIX, TABLE_ATTRIBUTES_START_OFFSET + 4);

/**
* Defines an attribute that is used to set the minimum utilization (as a percentage of {@link #ENTRY_COUNT} out of
* {@link #TOTAL_ENTRY_COUNT}) of a Table Segment below which a Table Compaction is triggered.
*/
public static final UUID MIN_UTILIZATION = new UUID(CORE_ATTRIBUTE_ID_PREFIX, TABLE_ATTRIBUTES_START_OFFSET + 5);

/**
* Defines a Map that contains all Table Attributes along with their default values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,25 @@
*/
package io.pravega.segmentstore.server.reading;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.segmentstore.contracts.ReadResult;
import io.pravega.segmentstore.contracts.ReadResultEntry;
import io.pravega.segmentstore.contracts.ReadResultEntryContents;
import io.pravega.segmentstore.contracts.ReadResultEntryType;
import com.google.common.base.Preconditions;

import java.io.InputStream;
import java.io.SequenceInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* An Asynchronous processor for ReadResult objects. Attaches to a ReadResult and executes a callback using an Executor
Expand Down Expand Up @@ -80,6 +88,21 @@ public static AsyncReadResultProcessor process(ReadResult readResult, AsyncReadR
return processor;
}

/**
* Processes the given {@link ReadResult} and returns the contents as an {@link InputStream}.
*
* @param readResult The {@link ReadResult} to process.
* @param executor An Executor to run asynchronous tasks on.
* @param requestContentTimeout Timeout for each call to {@link ReadResultEntry#requestContent(Duration)}, for those
* {@link ReadResultEntry} instances that are not already cached in memory.
* @return A CompletableFuture that, when completed, will contain an {@link InputStream} with the requested data.
*/
public static CompletableFuture<InputStream> processAll(ReadResult readResult, Executor executor, Duration requestContentTimeout) {
ProcessAllHandler handler = new ProcessAllHandler(requestContentTimeout);
process(readResult, handler, executor);
return handler.result;
}

//endregion

//region AutoCloseable Implementation
Expand Down Expand Up @@ -148,5 +171,34 @@ private CompletableFuture<ReadResultEntry> fetchNextEntry() {
}

//endregion

@RequiredArgsConstructor
private static class ProcessAllHandler implements AsyncReadResultHandler {
@Getter
private final Duration requestContentTimeout;
private final List<InputStream> parts = Collections.synchronizedList(new ArrayList<>());
private final CompletableFuture<InputStream> result = new CompletableFuture<>();

@Override
public boolean shouldRequestContents(ReadResultEntryType entryType, long streamSegmentOffset) {
return true;
}

@Override
public boolean processEntry(ReadResultEntry entry) {
this.parts.add(entry.getContent().join().getData());
return true;
}

@Override
public void processError(Throwable cause) {
this.result.completeExceptionally(cause);
}

@Override
public void processResultComplete() {
this.result.complete(new SequenceInputStream(Iterators.asEnumeration(this.parts.iterator())));
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import io.pravega.segmentstore.contracts.tables.TableKey;
import io.pravega.segmentstore.server.reading.AsyncReadResultHandler;
import io.pravega.segmentstore.server.reading.AsyncReadResultProcessor;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.val;

/**
Expand All @@ -51,17 +54,21 @@ abstract class AsyncTableEntryReader<ResultT> implements AsyncReadResultHandler
private final EntrySerializer serializer;
@Getter(AccessLevel.PROTECTED)
private EntrySerializer.Header header;
private final long keyVersion;

//endregion

//region Constructor
//region Constructor and Static Methods

/**
* Creates a new instance of the AsyncTableEntryReader class.
*
* @param keyVersion The version of the item that is located at this position.
* @param serializer The {@link EntrySerializer} to use.
* @param timer Timer for the whole operation.
*/
private AsyncTableEntryReader(@NonNull EntrySerializer serializer, @NonNull TimeoutTimer timer) {
private AsyncTableEntryReader(long keyVersion, @NonNull EntrySerializer serializer, @NonNull TimeoutTimer timer) {
this.keyVersion = keyVersion;
this.serializer = serializer;
this.timer = timer;
this.readData = new EnhancedByteArrayOutputStream();
Expand Down Expand Up @@ -99,6 +106,25 @@ static AsyncTableEntryReader<TableKey> readKey(long keyVersion, EntrySerializer
return new KeyReader(keyVersion, serializer, timer);
}

/**
* Reads a single {@link TableEntry} from the given InputStream. The {@link TableEntry} itself is not constructed,
* rather all of its components are returned individually.
*
* @param input An InputStream to read from.
* @param segmentOffset The Segment Offset that the first byte of the InputStream maps to. This wll be used as a Version,
* unless the deserialized segment's Header contains an explicit version.
* @param serializer The {@link EntrySerializer} to use for deserializing entries.
* @return A {@link DeserializedEntry} that contains all the components of the {@link TableEntry}.
* @throws IOException If an Exception occurred while reading from the given InputStream.
*/
static DeserializedEntry readEntryComponents(InputStream input, long segmentOffset, EntrySerializer serializer) throws IOException {
val h = serializer.readHeader(input);
long version = getKeyVersion(h, segmentOffset);
byte[] key = StreamHelpers.readAll(input, h.getKeyLength());
byte[] value = h.isDeletion() ? null : (h.getValueLength() == 0 ? new byte[0] : StreamHelpers.readAll(input, h.getValueLength()));
return new DeserializedEntry(h, version, key, value);
}

//endregion

//region Internal Operations
Expand All @@ -119,6 +145,14 @@ protected void complete(ResultT result) {
this.result.complete(result);
}

private static long getKeyVersion(EntrySerializer.Header header, long segmentOffset) {
return header.getEntryVersion() == TableKey.NO_VERSION ? segmentOffset : header.getEntryVersion();
}

protected long getKeyVersion() {
return getKeyVersion(this.header, this.keyVersion);
}

//endregion

//region AsyncReadResultHandler implementation
Expand Down Expand Up @@ -187,11 +221,8 @@ public Duration getRequestContentTimeout() {
* AsyncTableEntryReader implementation that reads a Key from a ReadResult.
*/
private static class KeyReader extends AsyncTableEntryReader<TableKey> {
private final long keyVersion;

KeyReader(long keyVersion, EntrySerializer serializer, TimeoutTimer timer) {
super(serializer, timer);
this.keyVersion = keyVersion;
super(keyVersion, serializer, timer);
}

@Override
Expand All @@ -205,7 +236,7 @@ protected boolean processReadData(ByteArraySegment readData) {
if (header.isDeletion()) {
complete(TableKey.notExists(keyData));
} else {
complete(TableKey.versioned(keyData, this.keyVersion));
complete(TableKey.versioned(keyData, getKeyVersion()));
}

return true; // We are done.
Expand All @@ -224,13 +255,11 @@ protected boolean processReadData(ByteArraySegment readData) {
*/
private static class EntryReader extends AsyncTableEntryReader<TableEntry> {
private final ArrayView soughtKey;
private final long keyVersion;
private boolean keyValidated;

private EntryReader(ArrayView soughtKey, long keyVersion, EntrySerializer serializer, TimeoutTimer timer) {
super(serializer, timer);
super(keyVersion, serializer, timer);
this.soughtKey = soughtKey;
this.keyVersion = keyVersion;
this.keyValidated = soughtKey == null;
}

Expand Down Expand Up @@ -285,7 +314,7 @@ protected boolean processReadData(ByteArraySegment readData) {
valueData = readData.subSegment(header.getValueOffset(), header.getValueLength());
}

complete(TableEntry.versioned(getKeyData(this.soughtKey, readData, header), valueData, this.keyVersion));
complete(TableEntry.versioned(getKeyData(this.soughtKey, readData, header), valueData, getKeyVersion()));
return true; // Now we are truly done.
}

Expand All @@ -299,4 +328,29 @@ private ArrayView getKeyData(ArrayView soughtKey, ByteArraySegment readData, Ent
}

//endregion

@Getter
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
static class DeserializedEntry {
/**
* The Entry's Header.
*/
private final EntrySerializer.Header header;

/**
* The computed Entry's Version. If explicitly defined in the Header, this mirrors it, otherwise this is the
* offset at which this Entry resides in the Segment.
*/
private final long version;

/**
* Key Data.
*/
private final byte[] key;

/**
* Value data. Null if a deletion.
*/
private final byte[] value;
}
}
Loading

0 comments on commit 23be70a

Please sign in to comment.