Skip to content

Commit

Permalink
[Preview] Implementing footer caching with introducing single cache o…
Browse files Browse the repository at this point in the history
…ption. (#35)

Implements initial footer prefetching and caching.

---------

Co-authored-by: Ilya Isaev <[email protected]>
  • Loading branch information
IsaevIlya and IsaevIlya authored Jun 5, 2024
1 parent b4f4bcf commit fa883c4
Show file tree
Hide file tree
Showing 22 changed files with 842 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.gradle.api.tasks.testing.logging.TestExceptionFormat

val codeCoverageToolVersion = "0.8.11"
val codeCoverageReports = "reports/codeCoverage"
val codeCoverageThreshold = "0.95".toBigDecimal()
val codeCoverageThreshold = "0.92".toBigDecimal()

plugins {
// Apply the java Plugin to add support for Java.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.amazon.connector.s3.io.logical.LogicalIO;
import com.amazon.connector.s3.io.logical.impl.ParquetLogicalIOImpl;
import com.amazon.connector.s3.io.physical.blockmanager.BlockManager;
import com.amazon.connector.s3.io.physical.blockmanager.BlockManagerInterface;
import com.amazon.connector.s3.io.physical.impl.PhysicalIOImpl;
import com.amazon.connector.s3.util.S3URI;
import java.io.EOFException;
Expand Down Expand Up @@ -39,7 +40,23 @@ protected S3SeekableInputStream(
new ParquetLogicalIOImpl(
new PhysicalIOImpl(
new BlockManager(
objectClient, s3URI, configuration.getBlockManagerConfiguration()))));
objectClient, s3URI, configuration.getBlockManagerConfiguration())),
configuration.getLogicalIOConfiguration()));
}

/**
* Creates a new instance of {@link S3SeekableInputStream}. This version of the constructor
* initialises the stream with sensible defaults.
*
* @param blockManager provides instance of {@link BlockManagerInterface}
* @param configuration provides instance of {@link S3SeekableInputStreamConfiguration}
*/
protected S3SeekableInputStream(
@NonNull BlockManagerInterface blockManager,
@NonNull S3SeekableInputStreamConfiguration configuration) {
this(
new ParquetLogicalIOImpl(
new PhysicalIOImpl(blockManager), configuration.getLogicalIOConfiguration()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.amazon.connector.s3;

import com.amazon.connector.s3.io.logical.LogicalIOConfiguration;
import com.amazon.connector.s3.io.physical.blockmanager.BlockManagerConfiguration;
import lombok.Builder;
import lombok.EqualsAndHashCode;
Expand All @@ -12,8 +13,10 @@
@EqualsAndHashCode
public class S3SeekableInputStreamConfiguration {
@Builder.Default
private final BlockManagerConfiguration blockManagerConfiguration =
BlockManagerConfiguration.DEFAULT;
private BlockManagerConfiguration blockManagerConfiguration = BlockManagerConfiguration.DEFAULT;

@Builder.Default
private LogicalIOConfiguration logicalIOConfiguration = LogicalIOConfiguration.DEFAULT;

/** Default set of settings for {@link S3SeekableInputStream} */
public static final S3SeekableInputStreamConfiguration DEFAULT =
Expand All @@ -22,10 +25,13 @@ public class S3SeekableInputStreamConfiguration {
/**
* Creates a new instance of
*
* @param blockManagerConfiguration - {@link BlockManagerConfiguration}
* @param blockManagerConfiguration - {@link BlockManagerConfiguration} configuration
* @param logicalIOConfiguration - {@link LogicalIOConfiguration} configuration
*/
private S3SeekableInputStreamConfiguration(
@NonNull BlockManagerConfiguration blockManagerConfiguration) {
@NonNull BlockManagerConfiguration blockManagerConfiguration,
@NonNull LogicalIOConfiguration logicalIOConfiguration) {
this.blockManagerConfiguration = blockManagerConfiguration;
this.logicalIOConfiguration = logicalIOConfiguration;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.amazon.connector.s3;

import com.amazon.connector.s3.io.physical.blockmanager.BlockManager;
import com.amazon.connector.s3.io.physical.blockmanager.MultiObjectsBlockManager;
import com.amazon.connector.s3.util.S3URI;
import java.io.IOException;
import lombok.Getter;
import lombok.NonNull;

Expand All @@ -15,9 +18,10 @@
* SeekableInputStream}.
*/
@Getter
public class S3SeekableInputStreamFactory {
public class S3SeekableInputStreamFactory implements AutoCloseable {
private final ObjectClient objectClient;
private final S3SeekableInputStreamConfiguration configuration;
private final MultiObjectsBlockManager multiObjectsBlockManager;

/**
* Creates a new instance of {@link S3SeekableInputStreamFactory}. This factory should be used to
Expand All @@ -32,6 +36,8 @@ public S3SeekableInputStreamFactory(
@NonNull S3SeekableInputStreamConfiguration configuration) {
this.objectClient = objectClient;
this.configuration = configuration;
this.multiObjectsBlockManager =
new MultiObjectsBlockManager(objectClient, configuration.getBlockManagerConfiguration());
}

/**
Expand All @@ -41,6 +47,21 @@ public S3SeekableInputStreamFactory(
* @return An instance of the input stream.
*/
public S3SeekableInputStream createStream(@NonNull S3URI s3URI) {
if (configuration.getBlockManagerConfiguration().isUseSingleCache()) {
BlockManager blockManager = new BlockManager(multiObjectsBlockManager, s3URI);
return new S3SeekableInputStream(blockManager, configuration);
}

return new S3SeekableInputStream(objectClient, s3URI, configuration);
}

/**
* Closes the factory and underlying resources.
*
* @throws IOException
*/
@Override
public void close() throws IOException {
multiObjectsBlockManager.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.amazon.connector.s3.io.logical;

import static com.amazon.connector.s3.util.Constants.DEFAULT_FOOTER_PRECACHING_SIZE;
import static com.amazon.connector.s3.util.Constants.DEFAULT_SMALL_OBJECT_SIZE_THRESHOLD;

import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;

/** Configuration for {@link LogicalIO} */
@Getter
@Builder
@EqualsAndHashCode
public class LogicalIOConfiguration {
@Builder.Default private boolean FooterPrecachingEnabled = true;

@Builder.Default private long FooterPrecachingSize = DEFAULT_FOOTER_PRECACHING_SIZE;

@Builder.Default private boolean SmallObjectsPrefetchingEnabled = true;

@Builder.Default private long SmallObjectSizeThreshold = DEFAULT_SMALL_OBJECT_SIZE_THRESHOLD;

public static LogicalIOConfiguration DEFAULT = LogicalIOConfiguration.builder().build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.amazon.connector.s3.io.logical;

import com.amazon.connector.s3.object.ObjectMetadata;
import com.amazon.connector.s3.util.S3URI;
import java.util.concurrent.CompletableFuture;
import lombok.Data;

/** ObjectStatus contains the metadata of the object and the S3URI of the object. */
@Data
public class ObjectStatus {
private final CompletableFuture<ObjectMetadata> objectMetadata;
private final S3URI s3URI;
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package com.amazon.connector.s3.io.logical.impl;

import com.amazon.connector.s3.io.logical.LogicalIO;
import com.amazon.connector.s3.io.logical.LogicalIOConfiguration;
import com.amazon.connector.s3.io.physical.PhysicalIO;
import com.amazon.connector.s3.io.physical.plan.IOPlan;
import com.amazon.connector.s3.io.physical.plan.Range;
import com.amazon.connector.s3.object.ObjectMetadata;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* A basic proxying implementation of a LogicalIO layer. To be extended later with logical
Expand All @@ -13,14 +20,27 @@
public class ParquetLogicalIOImpl implements LogicalIO {

private final PhysicalIO physicalIO;
private final LogicalIOConfiguration logicalIOConfiguration;

private static final Logger LOG = LogManager.getLogger(ParquetLogicalIOImpl.class);
/**
* Constructs an instance of LogicalIOImpl.
*
* @param physicalIO underlying physical IO that knows how to fetch bytes
* @param logicalIOConfiguration configuration for this logical IO implementation
*/
public ParquetLogicalIOImpl(PhysicalIO physicalIO) {
public ParquetLogicalIOImpl(
PhysicalIO physicalIO, LogicalIOConfiguration logicalIOConfiguration) {
this.physicalIO = physicalIO;
this.logicalIOConfiguration = logicalIOConfiguration;

CompletableFuture<ObjectMetadata> metadata = this.physicalIO.metadata();
try {
if (logicalIOConfiguration.isFooterPrecachingEnabled())
this.createFooterCachingPlan(metadata);
} catch (IOException e) {
LOG.info("There exception during footer prefetching {}", e.toString());
}
}

@Override
Expand All @@ -47,4 +67,25 @@ public CompletableFuture<ObjectMetadata> metadata() {
public void close() throws IOException {
physicalIO.close();
}

private void createFooterCachingPlan(final CompletableFuture<ObjectMetadata> metadata)
throws IOException {
long contentLength = metadata.join().getContentLength();
long startRange = 0;
if (contentLength > logicalIOConfiguration.getFooterPrecachingSize()) {
boolean smallFileCacheEnabledButFileTooBig =
contentLength > logicalIOConfiguration.getSmallObjectSizeThreshold()
&& logicalIOConfiguration.isSmallObjectsPrefetchingEnabled();

if (smallFileCacheEnabledButFileTooBig
|| !logicalIOConfiguration.isSmallObjectsPrefetchingEnabled()) {
startRange = contentLength - logicalIOConfiguration.getFooterPrecachingSize();
}
}

List<Range> prefetchRanges = new ArrayList<>();
prefetchRanges.add(new Range(startRange, contentLength - 1));
IOPlan ioPlan = IOPlan.builder().prefetchRanges(prefetchRanges).build();
physicalIO.execute(ioPlan);
}
}
Loading

0 comments on commit fa883c4

Please sign in to comment.