Skip to content

Commit

Permalink
Review update
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Jun 19, 2023
1 parent 0f481f6 commit b45280b
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 28 deletions.
2 changes: 2 additions & 0 deletions docker/test/integration/cluster/checkers/AwsChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def check_s3_server_object_hash(self, container_name: str, expected_file_hash: s
return False
s3_mock_dir = output.strip()
(code, md5_output) = self.container_communicator.execute_command(container_name, ["md5sum", s3_mock_dir + "/binaryData"])
if code != 0:
return False
file_hash = md5_output.split(' ')[0].strip()
return code == 0 and file_hash == expected_file_hash

Expand Down
4 changes: 2 additions & 2 deletions docker/test/integration/features/s3.feature
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ Feature: Sending data from MiNiFi-C++ to an AWS server

Scenario: A MiNiFi instance transfers data in multiple parts to s3
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with 16MB content is present in "/tmp/input"
And a file of size 16MB is present in "/tmp/input"
And a PutS3Object processor set up to communicate with an s3 server
And the "Multipart Threshold" property of the PutS3Object processor is set to "5 MB"
And the "Multipart Part Size" property of the PutS3Object processor is set to "5 MB"
Expand All @@ -232,7 +232,7 @@ Feature: Sending data from MiNiFi-C++ to an AWS server

Scenario: A MiNiFi instance can use multipart upload through http proxy to s3
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with 6MB content is present in "/tmp/input"
And a file of size 6MB is present in "/tmp/input"
And a PutS3Object processor set up to communicate with an s3 server
And the "Multipart Threshold" property of the PutS3Object processor is set to "5 MB"
And the "Multipart Part Size" property of the PutS3Object processor is set to "5 MB"
Expand Down
2 changes: 1 addition & 1 deletion docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def step_impl(context, content, path):
context.test.add_test_data(path, content)


@given("a file with {size} content is present in \"{path}\"")
@given("a file of size {size} is present in \"{path}\"")
def step_impl(context, size: str, path: str):
context.test.add_random_test_data(path, humanfriendly.parse_size(size))

Expand Down
2 changes: 1 addition & 1 deletion extensions/aws/processors/PutS3Object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
context->getProperty(TemporaryDirectoryMultipartState.getName(), multipart_temp_dir);


s3_wrapper_.initailizeMultipartUploadStateStorage(multipart_temp_dir, getUUIDStr());
s3_wrapper_.initializeMultipartUploadStateStorage(multipart_temp_dir, getUUIDStr());
}

std::string PutS3Object::parseAccessControlList(const std::string &comma_separated_list) {
Expand Down
16 changes: 6 additions & 10 deletions extensions/aws/s3/MultipartUploadStateStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ namespace org::apache::nifi::minifi::aws::s3 {
MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& state_directory, const std::string& state_id) {
if (state_directory.empty()) {
char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format);
state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format) / std::string(state_id + "-s3-multipart-upload-state.properties");
} else {
state_file_path_ = std::filesystem::path(state_directory) / std::string(state_id + "-s3-multipart-upload-state.properties");
if (!std::filesystem::exists(state_file_path_)) {
std::filesystem::create_directories(state_file_path_.parent_path());
std::ofstream ofs(state_file_path_);
// std::ofstream ofs(state_file_path_);
} else {
loadFile();
}
Expand Down Expand Up @@ -96,10 +96,6 @@ void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds mul
if (!minifi::utils::StringUtils::endsWith(property_key, ".upload_time")) {
continue;
}
if (!state_.contains(property_key)) {
logger_->log_error("Could not retrieve value for multipart upload cache key '%s'", property_key);
continue;
}
int64_t stored_upload_time{};
if (!core::Property::StringToInt(value, stored_upload_time)) {
logger_->log_error("Multipart upload cache key '%s' has invalid value '%s'", property_key, value);
Expand All @@ -108,10 +104,6 @@ void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds mul
auto upload_time = Aws::Utils::DateTime(stored_upload_time);
if (upload_time < age_off_time) {
auto state_key_and_property_name = minifi::utils::StringUtils::split(property_key, ".");
if (state_key_and_property_name.size() < 2) {
logger_->log_error("Invalid property '%s'", property_key);
continue;
}
keys_to_remove.push_back(state_key_and_property_name[0]);
}
}
Expand Down Expand Up @@ -162,4 +154,8 @@ void MultipartUploadStateStorage::commitChanges() {
}
}

std::filesystem::path MultipartUploadStateStorage::getStateFilePath() const {
return state_file_path_;
}

} // namespace org::apache::nifi::minifi::aws::s3
1 change: 1 addition & 0 deletions extensions/aws/s3/MultipartUploadStateStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class MultipartUploadStateStorage {
std::optional<MultipartUploadState> getState(const std::string& bucket, const std::string& key) const;
void removeState(const std::string& bucket, const std::string& key);
void removeAgedStates(std::chrono::milliseconds multipart_upload_max_age_threshold);
std::filesystem::path getStateFilePath() const;

private:
void loadFile();
Expand Down
27 changes: 16 additions & 11 deletions extensions/aws/s3/S3Wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "utils/gsl.h"
#include "utils/RegexUtils.h"
#include "aws/core/utils/HashingUtils.h"
#include "range/v3/algorithm/find_if.hpp"
#include "range/v3/algorithm/any_of.hpp"

namespace org::apache::nifi::minifi::aws::s3 {

Expand Down Expand Up @@ -70,8 +70,7 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
}

std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
std::vector<std::byte> buffer;
buffer.resize(BUFFER_SIZE);
std::array<std::byte, BUFFER_SIZE> buffer{};
auto data_stream = std::make_shared<Aws::StringStream>();
uint64_t read_size = 0;
while (read_size < read_limit) {
Expand All @@ -81,7 +80,7 @@ std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shar
throw StreamReadException("Reading flow file inputstream failed!");
}
if (read_ret > 0) {
data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(read_ret));
read_size += read_ret;
} else {
break;
Expand Down Expand Up @@ -112,11 +111,17 @@ std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObje
result.upload_id = upload_state.upload_id;
result.part_etags = upload_state.uploaded_etags;
const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
const size_t part_count = flow_size % upload_state.part_size == 0 ? flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;
const auto div_ceil = [](size_t n, size_t d) {
if (n % d == 0)
return n / d;
else
return n / d + 1;
};
const size_t part_count = div_ceil(flow_size, upload_state.part_size);
size_t total_read = 0;
const size_t start_part = upload_state.uploaded_parts + 1;
const size_t last_part = start_part + part_count - 1;
for (size_t i = start_part; i <= last_part; ++i) {
for (size_t part_number = start_part; part_number <= last_part; ++part_number) {
uint64_t read_size{};
const auto remaining = flow_size - total_read;
const auto next_read_size = remaining < upload_state.part_size ? remaining : upload_state.part_size;
Expand All @@ -126,7 +131,7 @@ std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObje
Aws::S3::Model::UploadPartRequest upload_part_request;
upload_part_request.WithBucket(put_object_params.bucket)
.WithKey(put_object_params.object_key)
.WithPartNumber(i)
.WithPartNumber(part_number)
.WithUploadId(upload_state.upload_id);
upload_part_request.SetBody(stream_ptr);

Expand All @@ -135,15 +140,15 @@ std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObje

auto upload_part_result = request_sender_->sendUploadPartRequest(upload_part_request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
if (!upload_part_result) {
logger_->log_error("Failed to upload part %d of %d of S3 object with key '%s'", i, last_part, put_object_params.object_key);
logger_->log_error("Failed to upload part %d of %d of S3 object with key '%s'", part_number, last_part, put_object_params.object_key);
return std::nullopt;
}
result.part_etags.push_back(upload_part_result->GetETag());
upload_state.uploaded_etags.push_back(upload_part_result->GetETag());
upload_state.uploaded_parts += 1;
upload_state.uploaded_size += read_size;
multipart_upload_storage_->storeState(put_object_params.bucket, put_object_params.object_key, upload_state);
logger_->log_info("Uploaded part %d of %d S3 object with key '%s'", i, last_part, put_object_params.object_key);
logger_->log_info("Uploaded part %d of %d S3 object with key '%s'", part_number, last_part, put_object_params.object_key);
}

multipart_upload_storage_->removeState(put_object_params.bucket, put_object_params.object_key);
Expand Down Expand Up @@ -179,7 +184,7 @@ bool S3Wrapper::multipartUploadExistsInS3(const PutObjectRequestParameters& put_
return false;
}

return ranges::find_if(*pending_uploads, [&](const auto& upload) { return upload.key == put_object_params.object_key; }) != pending_uploads->end();
return ranges::any_of(*pending_uploads, [&](const auto& upload) { return upload.key == put_object_params.object_key; });
}

std::optional<MultipartUploadState> S3Wrapper::getMultipartUploadState(const PutObjectRequestParameters& put_object_params) {
Expand Down Expand Up @@ -449,7 +454,7 @@ bool S3Wrapper::abortMultipartUpload(const AbortMultipartUploadRequestParameters
return request_sender_->sendAbortMultipartUploadRequest(request, params.credentials, params.client_config, params.use_virtual_addressing);
}

void S3Wrapper::initailizeMultipartUploadStateStorage(const std::string& multipart_temp_dir, const std::string& state_id) {
void S3Wrapper::initializeMultipartUploadStateStorage(const std::string& multipart_temp_dir, const std::string& state_id) {
multipart_upload_storage_ = std::make_unique<MultipartUploadStateStorage>(multipart_temp_dir, state_id);
}

Expand Down
2 changes: 1 addition & 1 deletion extensions/aws/s3/S3Wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ class S3Wrapper {
std::optional<HeadObjectResult> headObject(const HeadObjectRequestParameters& head_object_params);
std::optional<std::vector<MultipartUpload>> listMultipartUploads(const ListMultipartUploadsRequestParameters& params);
bool abortMultipartUpload(const AbortMultipartUploadRequestParameters& params);
void initailizeMultipartUploadStateStorage(const std::string& multipart_temp_dir, const std::string& state_id);
void initializeMultipartUploadStateStorage(const std::string& multipart_temp_dir, const std::string& state_id);
void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds multipart_upload_max_age_threshold);

virtual ~S3Wrapper() = default;
Expand Down
29 changes: 29 additions & 0 deletions extensions/aws/tests/MultipartUploadStateStorageTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ class MultipartUploadStateStorageTestFixture {
minifi::aws::s3::MultipartUploadStateStorage upload_storage_;
};

class MultipartUploadStateStorageGeneratedStatedirTestFixture {
public:
MultipartUploadStateStorageGeneratedStatedirTestFixture()
: upload_storage_("", "test_id") {
}

~MultipartUploadStateStorageGeneratedStatedirTestFixture() {
std::filesystem::remove_all(upload_storage_.getStateFilePath());
}

protected:
TestController test_controller;
minifi::aws::s3::MultipartUploadStateStorage upload_storage_;
};

TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Store and get current key state", "[s3StateStorage]") {
REQUIRE(upload_storage_.getState("test_bucket", "key") == std::nullopt);
minifi::aws::s3::MultipartUploadState state;
Expand All @@ -46,6 +61,20 @@ TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Store and get current
REQUIRE(*upload_storage_.getState("test_bucket", "key") == state);
}

TEST_CASE_METHOD(MultipartUploadStateStorageGeneratedStatedirTestFixture, "Store and get current key state in generated state dir", "[s3StateStorage]") {
REQUIRE(upload_storage_.getState("test_bucket", "key") == std::nullopt);
minifi::aws::s3::MultipartUploadState state;
state.upload_id = "id1";
state.uploaded_parts = 2;
state.uploaded_size = 100_MiB;
state.part_size = 50_MiB;
state.full_size = 200_MiB;
state.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
state.uploaded_etags = {"etag1", "etag2"};
upload_storage_.storeState("test_bucket", "key", state);
REQUIRE(*upload_storage_.getState("test_bucket", "key") == state);
}

TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Get key upload state from multiple keys and buckets", "[s3StateStorage]") {
minifi::aws::s3::MultipartUploadState state1;
state1.upload_id = "id1";
Expand Down
4 changes: 2 additions & 2 deletions libminifi/include/utils/TimeUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,12 @@ inline bool unit_matches<std::chrono::weeks>(const std::string& unit) {

template<>
inline bool unit_matches<std::chrono::months>(const std::string& unit) {
return unit == "month" || unit == "months";
return unit == "mth" || unit == "mths" || unit == "month" || unit == "months";
}

template<>
inline bool unit_matches<std::chrono::years>(const std::string& unit) {
return unit == "y" || unit == "year" || unit == "years";
return unit == "y" || unit == "yr" || unit == "yrs" || unit == "year" || unit == "years";
}

template<class TargetDuration, class SourceDuration>
Expand Down

0 comments on commit b45280b

Please sign in to comment.