Skip to content

Commit

Permalink
Fix after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Jul 20, 2023
1 parent 9dcca29 commit 4a74f69
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 36 deletions.
11 changes: 5 additions & 6 deletions extensions/aws/processors/PutS3Object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "range/v3/algorithm/contains.hpp"
#include "utils/ProcessorConfigUtils.h"

namespace org::apache::nifi::minifi::aws::processors {

Expand Down Expand Up @@ -117,22 +118,20 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
use_virtual_addressing_ = !*use_path_style_access;
}

context->getProperty(MultipartThreshold.getName(), multipart_threshold_);
if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
if (!context->getProperty(MultipartThreshold, multipart_threshold_) || multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
}
logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
context->getProperty(MultipartPartSize.getName(), multipart_size_);
if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
if (!context->getProperty(MultipartPartSize, multipart_size_) || multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
}
logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);


multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds();
multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.name).getMilliseconds();
logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRId64 " ms", int64_t{multipart_upload_ageoff_interval_.count()});

multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.name).getMilliseconds();
logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRId64 " ms", int64_t{multipart_upload_max_age_threshold_.count()});

fillUserMetadata(context);
Expand Down
44 changes: 24 additions & 20 deletions extensions/aws/processors/PutS3Object.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,29 +113,33 @@ class PutS3Object : public S3Processor {
.isRequired(true)
.build();
EXTENSIONAPI static constexpr auto MultipartThreshold = core::PropertyDefinitionBuilder<>::createProperty("Multipart Threshold")
.withDescription("Specifies the file size threshold for switch from the PutS3Object API to the PutS3MultipartUpload API. "
"Flow files bigger than this limit will be sent using the multipart process. The valid range is 5MB to 5GB.")
.withDefaultValue<core::DataSizeValue>("5 GB")
.isRequired(true)
.build();
.withDescription("Specifies the file size threshold for switch from the PutS3Object API to the PutS3MultipartUpload API. "
"Flow files bigger than this limit will be sent using the multipart process. The valid range is 5MB to 5GB.")
.withPropertyType(core::StandardPropertyTypes::DATA_SIZE_TYPE)
.withDefaultValue("5 GB")
.isRequired(true)
.build();
EXTENSIONAPI static constexpr auto MultipartPartSize = core::PropertyDefinitionBuilder<>::createProperty("Multipart Part Size")
.withDescription("Specifies the part size for use when the PutS3Multipart Upload API is used. "
"Flow files will be broken into chunks of this size for the upload process, but the last part sent can be smaller since it is not padded. The valid range is 5MB to 5GB.")
.withDefaultValue<core::DataSizeValue>("5 GB")
.isRequired(true)
.build();
.withDescription("Specifies the part size for use when the PutS3Multipart Upload API is used. "
"Flow files will be broken into chunks of this size for the upload process, but the last part sent can be smaller since it is not padded. The valid range is 5MB to 5GB.")
.withPropertyType(core::StandardPropertyTypes::DATA_SIZE_TYPE)
.withDefaultValue("5 GB")
.isRequired(true)
.build();
EXTENSIONAPI static constexpr auto MultipartUploadAgeOffInterval = core::PropertyDefinitionBuilder<>::createProperty("Multipart Upload AgeOff Interval")
.withDescription("Specifies the interval at which existing multipart uploads in AWS S3 will be evaluated for ageoff. "
"When processor is triggered it will initiate the ageoff evaluation if this interval has been exceeded.")
.withDefaultValue<core::TimePeriodValue>("60 min")
.isRequired(true)
.build();
.withDescription("Specifies the interval at which existing multipart uploads in AWS S3 will be evaluated for ageoff. "
"When processor is triggered it will initiate the ageoff evaluation if this interval has been exceeded.")
.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
.withDefaultValue("60 min")
.isRequired(true)
.build();
EXTENSIONAPI static constexpr auto MultipartUploadMaxAgeThreshold = core::PropertyDefinitionBuilder<>::createProperty("Multipart Upload Max Age Threshold")
.withDescription("Specifies the maximum age for existing multipart uploads in AWS S3. When the ageoff process occurs, any upload older than this threshold will be aborted.")
.withDefaultValue<core::TimePeriodValue>("7 days")
.isRequired(true)
.build();
EXTENSIONAPI static constexpr auto Properties = minifi::utils::array_cat(S3Processor::Properties, std::array<core::PropertyReference, 10>{
.withDescription("Specifies the maximum age for existing multipart uploads in AWS S3. When the ageoff process occurs, any upload older than this threshold will be aborted.")
.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
.withDefaultValue("7 days")
.isRequired(true)
.build();
EXTENSIONAPI static constexpr auto Properties = minifi::utils::array_cat(S3Processor::Properties, std::array<core::PropertyReference, 14>{
ObjectKey,
ContentType,
StorageClass,
Expand Down
1 change: 0 additions & 1 deletion extensions/aws/s3/S3Wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <algorithm>

#include "S3ClientRequestSender.h"
#include "range/v3/algorithm/find.hpp"
#include "utils/ArrayUtils.h"
#include "utils/StringUtils.h"
#include "utils/file/FileUtils.h"
Expand Down
1 change: 1 addition & 0 deletions extensions/aws/s3/S3Wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "S3RequestSender.h"
#include "Exception.h"
#include "MultipartUploadStateStorage.h"
#include "range/v3/algorithm/find.hpp"

namespace org::apache::nifi::minifi::aws::s3 {

Expand Down
18 changes: 9 additions & 9 deletions extensions/aws/tests/PutS3ObjectTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,24 +271,24 @@ TEST_CASE_METHOD(PutS3ObjectUploadLimitChangedTestsFixture, "Test multipart uplo
auto temp_dir = test_controller.createTempDirectory();
plan->setProperty(s3_processor, "Temporary Directory Multipart State", temp_dir.string());

plan->setProperty(update_attribute, "s3.permissions.full.users", "myuserid123, [email protected]", true);
plan->setDynamicProperty(update_attribute, "s3.permissions.full.users", "myuserid123, [email protected]");
plan->setProperty(s3_processor, "FullControl User List", "${s3.permissions.full.users}");
plan->setProperty(update_attribute, "s3.permissions.read.users", "myuserid456,[email protected]", true);
plan->setDynamicProperty(update_attribute, "s3.permissions.read.users", "myuserid456,[email protected]");
plan->setProperty(s3_processor, "Read Permission User List", "${s3.permissions.read.users}");
plan->setProperty(update_attribute, "s3.permissions.readacl.users", "myuserid789, otheruser", true);
plan->setDynamicProperty(update_attribute, "s3.permissions.readacl.users", "myuserid789, otheruser");
plan->setProperty(s3_processor, "Read ACL User List", "${s3.permissions.readacl.users}");
plan->setProperty(update_attribute, "s3.permissions.writeacl.users", "[email protected]", true);
plan->setDynamicProperty(update_attribute, "s3.permissions.writeacl.users", "[email protected]");
plan->setProperty(s3_processor, "Write ACL User List", "${s3.permissions.writeacl.users}");
plan->setProperty(update_attribute, "s3.permissions.cannedacl", "PublicReadWrite", true);
plan->setDynamicProperty(update_attribute, "s3.permissions.cannedacl", "PublicReadWrite");
plan->setProperty(s3_processor, "Canned ACL", "${s3.permissions.cannedacl}");
plan->setProperty(s3_processor, "meta_key1", "meta_value1", true);
plan->setProperty(s3_processor, "meta_key2", "meta_value2", true);
plan->setProperty(update_attribute, "test.contentType", "application/tar", true);
plan->setDynamicProperty(s3_processor, "meta_key1", "meta_value1");
plan->setDynamicProperty(s3_processor, "meta_key2", "meta_value2");
plan->setDynamicProperty(update_attribute, "test.contentType", "application/tar");
plan->setProperty(s3_processor, "Content Type", "${test.contentType}");
plan->setProperty(s3_processor, "Storage Class", "ReducedRedundancy");
plan->setProperty(s3_processor, "Region", minifi::aws::processors::region::AP_SOUTHEAST_3);
plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
plan->setProperty(update_attribute, "test.endpoint", "http://localhost:1234", true);
plan->setDynamicProperty(update_attribute, "test.endpoint", "http://localhost:1234");
plan->setProperty(s3_processor, "Endpoint Override URL", "${test.endpoint}");
plan->setProperty(s3_processor, "Server Side Encryption", "AES256");

Expand Down

0 comments on commit 4a74f69

Please sign in to comment.