Skip to content

Commit

Permalink
[Backport 2.x] Added BufferedInputStream to allow mark and reset ops …
Browse files Browse the repository at this point in the history
…during IO errors #10690  (#10741)

* Added BufferedInputStream to allow mark and reset ops during IO errors (#10690)

Signed-off-by: vikasvb90 <[email protected]>
(cherry picked from commit 75bd9f2)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* Added close on buffered stream in s3 async upload for additional cleanup (#10710)

Signed-off-by: vikasvb90 <[email protected]>

---------

Signed-off-by: vikasvb90 <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Vikas Bansal <[email protected]>
  • Loading branch information
3 people authored Oct 20, 2023
1 parent 7911429 commit 8896f58
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand All @@ -55,6 +56,7 @@
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

Expand Down Expand Up @@ -93,17 +95,21 @@ public S3RepositoryPlugin(final Settings settings, final Path configPath) {
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors(settings));
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION)
);
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_STREAM_READER, priorityPoolCount(settings), 10_000, PRIORITY_STREAM_READER)
);
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));

executorBuilders.add(new FixedExecutorBuilder(settings, FUTURE_COMPLETION, normalPoolCount(settings), 10_000, FUTURE_COMPLETION));
executorBuilders.add(new FixedExecutorBuilder(settings, STREAM_READER, normalPoolCount(settings), 10_000, STREAM_READER));
executorBuilders.add(new ScalingExecutorBuilder(STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
return executorBuilders;
}

static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
return boundedBy((allocatedProcessors + 1) / 2, 1, 5);
}

S3RepositoryPlugin(final Settings settings, final Path configPath, final S3Service service, final S3AsyncService s3AsyncService) {
this.service = Objects.requireNonNull(service, "S3 service must not be null");
this.configPath = configPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -138,26 +141,39 @@ private static void uploadPart(
ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH
? priorityExecutorService
: executorService;
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
CompletableFuture<UploadPartResponse> uploadPartResponseFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.uploadPart(
uploadPartRequest,
AsyncRequestBody.fromInputStream(
inputStreamContainer.getInputStream(),
inputStreamContainer.getContentLength(),
streamReadExecutor
)
AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor)
)
);

CompletableFuture<CompletedPart> convertFuture = uploadPartResponseFuture.thenApply(
uploadPartResponse -> convertUploadPartResponse(
completedParts,
inputStreamContainers,
uploadPartResponse,
partNumber,
uploadRequest.doRemoteDataIntegrityCheck()
)
);
CompletableFuture<CompletedPart> convertFuture = uploadPartResponseFuture.whenComplete((resp, throwable) -> {
try {
inputStream.close();
} catch (IOException ex) {
log.error(
() -> new ParameterizedMessage(
"Failed to close stream while uploading a part of idx {} and file {}.",
uploadPartRequest.partNumber(),
uploadPartRequest.key()
),
ex
);
}
})
.thenApply(
uploadPartResponse -> convertUploadPartResponse(
completedParts,
inputStreamContainers,
uploadPartResponse,
partNumber,
uploadRequest.doRemoteDataIntegrityCheck()
)
);
futures.add(convertFuture);

CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartResponseFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
Expand Down Expand Up @@ -309,15 +311,22 @@ private void uploadInOneChunk(
ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH
? priorityExecutorService
: executorService;
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
CompletableFuture<Void> putObjectFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.putObject(
putObjectRequestBuilder.build(),
AsyncRequestBody.fromInputStream(
inputStreamContainer.getInputStream(),
inputStreamContainer.getContentLength(),
streamReadExecutor
)
AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor)
).handle((resp, throwable) -> {
try {
inputStream.close();
} catch (IOException e) {
log.error(
() -> new ParameterizedMessage("Failed to close stream while uploading single file {}.", uploadRequest.getKey()),
e
);
}
if (throwable != null) {
Throwable unwrappedThrowable = ExceptionsHelper.unwrap(throwable, S3Exception.class);
if (unwrappedThrowable != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,14 @@
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -71,17 +76,16 @@ public void testOneChunkUpload() {
putObjectResponseCompletableFuture
);

AtomicReference<InputStream> streamRef = new AtomicReference<>();
CompletableFuture<Void> resultFuture = asyncTransferManager.uploadObject(
s3AsyncClient,
new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> {
// do nothing
}, false, null),
new StreamContext(
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position),
ByteSizeUnit.MB.toBytes(1),
ByteSizeUnit.MB.toBytes(1),
1
),
new StreamContext((partIdx, partSize, position) -> {
streamRef.set(new ZeroInputStream(partSize));
return new InputStreamContainer(streamRef.get(), partSize, position);
}, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 1),
new StatsMetricPublisher()
);

Expand All @@ -92,6 +96,14 @@ public void testOneChunkUpload() {
}

verify(s3AsyncClient, times(1)).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class));

boolean closeError = false;
try {
streamRef.get().available();
} catch (IOException e) {
closeError = e.getMessage().equals("Stream closed");
}
assertTrue("InputStream was still open after upload", closeError);
}

public void testOneChunkUploadCorruption() {
Expand Down Expand Up @@ -162,17 +174,17 @@ public void testMultipartUpload() {
abortMultipartUploadResponseCompletableFuture
);

List<InputStream> streams = new ArrayList<>();
CompletableFuture<Void> resultFuture = asyncTransferManager.uploadObject(
s3AsyncClient,
new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> {
// do nothing
}, true, 3376132981L),
new StreamContext(
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position),
ByteSizeUnit.MB.toBytes(1),
ByteSizeUnit.MB.toBytes(1),
5
),
new StreamContext((partIdx, partSize, position) -> {
InputStream stream = new ZeroInputStream(partSize);
streams.add(stream);
return new InputStreamContainer(stream, partSize, position);
}, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 5),
new StatsMetricPublisher()
);

Expand All @@ -182,6 +194,16 @@ public void testMultipartUpload() {
fail("did not expect resultFuture to fail");
}

streams.forEach(stream -> {
boolean closeError = false;
try {
stream.available();
} catch (IOException e) {
closeError = e.getMessage().equals("Stream closed");
}
assertTrue("InputStream was still open after upload", closeError);
});

verify(s3AsyncClient, times(1)).createMultipartUpload(any(CreateMultipartUploadRequest.class));
verify(s3AsyncClient, times(5)).uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class));
verify(s3AsyncClient, times(1)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
Expand Down

0 comments on commit 8896f58

Please sign in to comment.