Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2127 Add multipart upload support for PutS3Object processor #1586

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e9ef136
MINIFICPP-2127 Add multipart upload support for PutS3Object processor
lordgamez May 24, 2023
7ca7a32
Abort aged off multipart uploads
lordgamez May 31, 2023
6351163
Refactor S3 changes
lordgamez Jun 1, 2023
6e823a7
Test multipart upload with valid file sizes
lordgamez Jun 2, 2023
1ecc862
Fix logging
lordgamez Jun 2, 2023
1781d16
Fix last age off time
lordgamez Jun 5, 2023
c0c5468
Implement resume multipart upload feature
lordgamez Jun 5, 2023
cac6ad7
Use properties file for multipart state
lordgamez Jun 6, 2023
7fd4afd
Check if multipart upload exists in S3
lordgamez Jun 7, 2023
db5620a
Removed aged off local multipart upload cache
lordgamez Jun 7, 2023
8938327
Fix clang build
lordgamez Jun 7, 2023
5b5a396
Refactoring
lordgamez Jun 7, 2023
cc42080
Use own property file for cache
lordgamez Jun 7, 2023
9e4314b
Additional time unit string support
lordgamez Jun 8, 2023
7aa1ef3
Fix after rebase
lordgamez Jun 14, 2023
5e52e63
Review update
lordgamez Jun 16, 2023
29a075c
Wait for S3 transfer
lordgamez Jun 16, 2023
d4d649b
Review update
lordgamez Jun 19, 2023
9379775
Review update
lordgamez Jun 19, 2023
b57b304
Configure multipart upload state in minifi.properties
lordgamez Jun 19, 2023
0c9673e
Make last_ageoff_time_ atomic
lordgamez Jul 7, 2023
07effd0
MINIFICPP-2140 Use state manager for multipart states
lordgamez Jul 6, 2023
ffe6928
Review update
lordgamez Jul 10, 2023
4790381
Synchronize MultipartUploadStateStorage
lordgamez Jul 10, 2023
1fe3fd3
Review update
lordgamez Jul 10, 2023
14ff2cb
Review update
lordgamez Jul 11, 2023
e59311c
Fix linter issue
lordgamez Jul 11, 2023
cdc0da5
Fix after rebase
lordgamez Jul 20, 2023
c87e3a2
Review update
lordgamez Jul 25, 2023
3f4696d
Fix linter issue
lordgamez Jul 25, 2023
a20f0f2
Review update
lordgamez Jul 31, 2023
b6d1016
Fix after rebase
lordgamez Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 35 additions & 31 deletions PROCESSORS.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions docker/test/integration/cluster/DockerTestCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ def check_s3_server_object_data(self, container_name, test_data):
container_name = self.container_store.get_container_name_with_postfix(container_name)
return self.aws_checker.check_s3_server_object_data(container_name, test_data)

def check_s3_server_object_hash(self, container_name: str, expected_file_hash: str):
container_name = self.container_store.get_container_name_with_postfix(container_name)
return self.aws_checker.check_s3_server_object_hash(container_name, expected_file_hash)

def check_s3_server_object_metadata(self, container_name, content_type="application/octet-stream", metadata=dict()):
container_name = self.container_store.get_container_name_with_postfix(container_name)
return self.aws_checker.check_s3_server_object_metadata(container_name, content_type, metadata)
Expand Down
17 changes: 17 additions & 0 deletions docker/test/integration/cluster/DockerTestDirectoryBindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import logging
import os
import shutil
import hashlib


class DockerTestDirectoryBindings:
Expand Down Expand Up @@ -134,6 +135,22 @@ def put_file_to_docker_path(self, feature_id, path, file_name, contents):
file_abs_path = os.path.join(self.docker_path_to_local_path(feature_id, path), file_name)
self.put_file_contents(file_abs_path, contents)

@staticmethod
def generate_md5_hash(file_path):
with open(file_path, 'rb') as file:
md5_hash = hashlib.md5()
for chunk in iter(lambda: file.read(4096), b''):
md5_hash.update(chunk)

return md5_hash.hexdigest()

def put_random_file_to_docker_path(self, test_id: str, path: str, file_name: str, file_size: int):
file_abs_path = os.path.join(self.docker_path_to_local_path(test_id, path), file_name)
with open(file_abs_path, 'wb') as test_input_file:
test_input_file.write(os.urandom(file_size))
os.chmod(file_abs_path, 0o0777)
return self.generate_md5_hash(file_abs_path)

def rm_out_child(self, feature_id, dir):
child = os.path.join(self.data_directories[feature_id]["output_dir"], dir)
logging.info('Removing %s from output folder', child)
Expand Down
12 changes: 12 additions & 0 deletions docker/test/integration/cluster/checkers/AwsChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ def check_s3_server_object_data(self, container_name, test_data):
(code, file_data) = self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir + "/binaryData"])
return code == 0 and file_data == test_data

@retry_check()
def check_s3_server_object_hash(self, container_name: str, expected_file_hash: str):
(code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
if code != 0:
return False
s3_mock_dir = output.strip()
(code, md5_output) = self.container_communicator.execute_command(container_name, ["md5sum", s3_mock_dir + "/binaryData"])
if code != 0:
return False
file_hash = md5_output.split(' ')[0].strip()
return file_hash == expected_file_hash

@retry_check()
def check_s3_server_object_metadata(self, container_name, content_type="application/octet-stream", metadata=dict()):
(code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self, context, feature_id: str):
# Remote process groups are not connectables
self.remote_process_groups = []
self.file_system_observer = None
self.test_file_hash = None

self.docker_directory_bindings = context.directory_bindings
self.cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.feature_id), self.docker_directory_bindings.get_data_directories(self.feature_id))
Expand Down Expand Up @@ -178,6 +179,11 @@ def add_test_data(self, path, test_data, file_name=None):
test_data = decode_escaped_str(test_data)
self.docker_directory_bindings.put_file_to_docker_path(self.feature_id, path, file_name, test_data.encode('utf-8'))

def add_random_test_data(self, path: str, size: int, file_name: str = None):
if file_name is None:
file_name = str(uuid.uuid4())
self.test_file_hash = self.docker_directory_bindings.put_random_file_to_docker_path(self.feature_id, path, file_name, size)

def put_test_resource(self, file_name, contents):
self.docker_directory_bindings.put_test_resource(self.feature_id, file_name, contents)

Expand Down Expand Up @@ -248,6 +254,9 @@ def __validate(self, validator):
def check_s3_server_object_data(self, s3_container_name, object_data):
assert self.cluster.check_s3_server_object_data(s3_container_name, object_data) or self.cluster.log_app_output()

def check_s3_server_large_object_data(self, s3_container_name: str):
assert self.cluster.check_s3_server_object_hash(s3_container_name, self.test_file_hash) or self.cluster.log_app_output()

def check_s3_server_object_metadata(self, s3_container_name, content_type):
assert self.cluster.check_s3_server_object_metadata(s3_container_name, content_type) or self.cluster.log_app_output()

Expand Down
41 changes: 41 additions & 0 deletions docker/test/integration/features/s3.feature
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Feature: Sending data from MiNiFi-C++ to an AWS server
Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
And the object on the s3 server is "LH_O#L|FD<FASD{FO#@$#$%^ \"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{"
And the object content type on the s3 server is "application/octet-stream" and the object metadata matches use metadata
And the Minifi logs contain the following message: "in a single upload" in less than 10 seconds

Scenario: A MiNiFi instance transfers encoded data through a http proxy to s3
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
Expand Down Expand Up @@ -213,3 +214,43 @@ Feature: Sending data from MiNiFi-C++ to an AWS server

Then 1 flowfile is placed in the monitored directory in 120 seconds
And no errors were generated on the http-proxy regarding "http://s3-server-${feature_id}:9090/test_bucket"

Scenario: A MiNiFi instance transfers data in multiple parts to s3
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file of size 16MB is present in "/tmp/input"
And a PutS3Object processor set up to communicate with an s3 server
And the "Multipart Threshold" property of the PutS3Object processor is set to "5 MB"
And the "Multipart Part Size" property of the PutS3Object processor is set to "5 MB"
And the "success" relationship of the GetFile processor is connected to the PutS3Object
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the PutS3Object processor is connected to the PutFile
And a s3 server is set up in correspondence with the PutS3Object
When all instances start up
Then 1 flowfile is placed in the monitored directory in 120 seconds
And the object on the s3 server is present and matches the original hash
And the Minifi logs contain the following message: "passes the multipart threshold, uploading it in multiple parts" in less than 10 seconds

Scenario: A MiNiFi instance can use multipart upload through http proxy to s3
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file of size 6MB is present in "/tmp/input"
And a PutS3Object processor set up to communicate with an s3 server
And the "Multipart Threshold" property of the PutS3Object processor is set to "5 MB"
And the "Multipart Part Size" property of the PutS3Object processor is set to "5 MB"
And these processor properties are set to match the http proxy:
| processor name | property name | property value |
| PutS3Object | Proxy Host | http-proxy-${feature_id} |
| PutS3Object | Proxy Port | 3128 |
| PutS3Object | Proxy Username | admin |
| PutS3Object | Proxy Password | test101 |
And the "success" relationship of the GetFile processor is connected to the PutS3Object
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the PutS3Object processor is connected to the PutFile

And a s3 server is set up in correspondence with the PutS3Object
And the http proxy server is set up
When all instances start up

Then 1 flowfile is placed in the monitored directory in 120 seconds
And the object on the s3 server is present and matches the original hash
And the Minifi logs contain the following message: "passes the multipart threshold, uploading it in multiple parts" in less than 10 seconds
And no errors were generated on the http-proxy regarding "http://s3-server-${feature_id}:9090/test_bucket/test_object_key"
10 changes: 10 additions & 0 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ def step_impl(context, content, path):
context.test.add_test_data(path, content)


@given("a file of size {size} is present in \"{path}\"")
def step_impl(context, size: str, path: str):
context.test.add_random_test_data(path, humanfriendly.parse_size(size))


@given("{number_of_files:d} files with the content \"{content}\" are present in \"{path}\"")
def step_impl(context, number_of_files, content, path):
for i in range(0, number_of_files):
Expand Down Expand Up @@ -859,6 +864,11 @@ def step_impl(context, object_data):
context.test.check_s3_server_object_data("s3-server", object_data)


@then("the object on the s3 server is present and matches the original hash")
def step_impl(context):
context.test.check_s3_server_large_object_data("s3-server")


@then("the object content type on the s3 server is \"{content_type}\" and the object metadata matches use metadata")
def step_impl(context, content_type):
context.test.check_s3_server_object_metadata("s3-server", content_type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,13 @@ def get_num_files_created(self):
def on_created(self, event):
if os.path.isfile(event.src_path) and not is_temporary_output_file(event.src_path):
logging.info("Output file created: %s", event.src_path)
with open(os.path.abspath(event.src_path), "r") as out_file:
logging.info("Contents: %s", out_file.read())
with self.files_created_lock:
self.files_created.add(event.src_path)
self.done_event.set()

def on_modified(self, event):
if os.path.isfile(event.src_path) and not is_temporary_output_file(event.src_path):
logging.info("Output file modified: %s", event.src_path)
with open(os.path.abspath(event.src_path), "r") as out_file:
logging.info("Contents: %s", out_file.read())
with self.files_created_lock:
self.files_created.add(event.src_path)
self.done_event.set()
Expand All @@ -60,8 +56,6 @@ def on_moved(self, event):
file_count_modified = True

if not is_temporary_output_file(event.dest_path):
with open(os.path.abspath(event.dest_path), "r") as out_file:
logging.info("Contents: %s", out_file.read())
with self.files_created_lock:
self.files_created.add(event.dest_path)
file_count_modified = True
Expand Down
94 changes: 90 additions & 4 deletions extensions/aws/processors/PutS3Object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "range/v3/algorithm/contains.hpp"
#include "utils/ProcessorConfigUtils.h"

namespace org::apache::nifi::minifi::aws::processors {

Expand Down Expand Up @@ -76,7 +77,29 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
use_virtual_addressing_ = !*use_path_style_access;
}

if (!context->getProperty(MultipartThreshold, multipart_threshold_) || multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
}
logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
if (!context->getProperty(MultipartPartSize, multipart_size_) || multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
}
logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);


multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.name).getMilliseconds();
logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRId64 " ms", int64_t{multipart_upload_ageoff_interval_.count()});

multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.name).getMilliseconds();
logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRId64 " ms", int64_t{multipart_upload_max_age_threshold_.count()});

fillUserMetadata(context);

auto state_manager = context->getStateManager();
if (state_manager == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
s3_wrapper_.initializeMultipartUploadStateStorage(gsl::make_not_null(state_manager));
}

std::string PutS3Object::parseAccessControlList(const std::string &comma_separated_list) {
Expand Down Expand Up @@ -186,6 +209,52 @@ void PutS3Object::setAttributes(
}
}

void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) {
{
std::lock_guard<std::mutex> lock(last_ageoff_mutex_);
const auto now = std::chrono::system_clock::now();
if (now - last_ageoff_time_ < multipart_upload_ageoff_interval_) {
logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
return;
}
last_ageoff_time_ = now;
}

logger_->log_trace("Listing aged off multipart uploads still in progress.");
aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_);
list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
list_params.bucket = common_properties.bucket;
list_params.age_off_limit = multipart_upload_max_age_threshold_;
list_params.use_virtual_addressing = use_virtual_addressing_;
auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params);
if (!aged_off_uploads_in_progress) {
logger_->log_error("Listing aged off multipart uploads failed!");
return;
}

logger_->log_info("Found %d aged off pending multipart upload jobs in bucket '%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
size_t aborted = 0;
for (const auto& upload : *aged_off_uploads_in_progress) {
logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s' due to reaching maximum upload age threshold.",
upload.key, upload.upload_id, common_properties.bucket);
aws::s3::AbortMultipartUploadRequestParameters abort_params(common_properties.credentials, *client_config_);
abort_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
abort_params.bucket = common_properties.bucket;
abort_params.key = upload.key;
abort_params.upload_id = upload.upload_id;
abort_params.use_virtual_addressing = use_virtual_addressing_;
if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
logger_->log_error("Failed to abort multipart upload with key '%s' and upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id, abort_params.bucket);
continue;
}
++aborted;
}
if (aborted > 0) {
logger_->log_info("Aborted %d pending multipart upload jobs in bucket '%s'", aborted, common_properties.bucket);
}
s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
}

void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
logger_->log_trace("PutS3Object onTrigger");
std::shared_ptr<core::FlowFile> flow_file = session->get();
Expand All @@ -200,19 +269,36 @@ void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context
return;
}

ageOffMultipartUploads(*common_properties);

auto put_s3_request_params = buildPutS3RequestParams(context, flow_file, *common_properties);
if (!put_s3_request_params) {
session->transfer(flow_file, Failure);
return;
}

PutS3Object::ReadCallback callback(flow_file->getSize(), *put_s3_request_params, s3_wrapper_);
session->read(flow_file, std::ref(callback));
if (!callback.result_.has_value()) {
std::optional<minifi::aws::s3::PutObjectResult> result;
session->read(flow_file, [this, &flow_file, &put_s3_request_params, &result](const std::shared_ptr<io::InputStream>& stream) -> int64_t {
try {
if (flow_file->getSize() <= multipart_threshold_) {
logger_->log_info("Uploading S3 Object '%s' in a single upload", put_s3_request_params->object_key);
result = s3_wrapper_.putObject(*put_s3_request_params, stream, flow_file->getSize());
return gsl::narrow<int64_t>(flow_file->getSize());
} else {
logger_->log_info("S3 Object '%s' passes the multipart threshold, uploading it in multiple parts", put_s3_request_params->object_key);
result = s3_wrapper_.putObjectMultipart(*put_s3_request_params, stream, flow_file->getSize(), multipart_size_);
return gsl::narrow<int64_t>(flow_file->getSize());
}
} catch(const aws::s3::StreamReadException& ex) {
logger_->log_error("Error occurred while uploading to S3: %s", ex.what());
return -1;
}
});
if (!result.has_value()) {
logger_->log_error("Failed to upload S3 object to bucket '%s'", put_s3_request_params->bucket);
session->transfer(flow_file, Failure);
} else {
setAttributes(session, flow_file, *put_s3_request_params, *callback.result_);
setAttributes(session, flow_file, *put_s3_request_params, *result);
logger_->log_debug("Successfully uploaded S3 object '%s' to bucket '%s'", put_s3_request_params->object_key, put_s3_request_params->bucket);
session->transfer(flow_file, Success);
}
Expand Down
Loading
Loading