diff --git a/extensions/aws/processors/PutS3Object.cpp b/extensions/aws/processors/PutS3Object.cpp index b9a2ce259c3..04ce0fd5652 100644 --- a/extensions/aws/processors/PutS3Object.cpp +++ b/extensions/aws/processors/PutS3Object.cpp @@ -28,6 +28,7 @@ #include "core/ProcessSession.h" #include "core/Resource.h" #include "range/v3/algorithm/contains.hpp" +#include "utils/ProcessorConfigUtils.h" namespace org::apache::nifi::minifi::aws::processors { @@ -117,22 +118,20 @@ void PutS3Object::onSchedule(const std::shared_ptr &contex use_virtual_addressing_ = !*use_path_style_access; } - context->getProperty(MultipartThreshold.getName(), multipart_threshold_); - if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) { + if (!context->getProperty(MultipartThreshold, multipart_threshold_) || multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!"); } logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_); - context->getProperty(MultipartPartSize.getName(), multipart_size_); - if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) { + if (!context->getProperty(MultipartPartSize, multipart_size_) || multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!"); } logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_); - multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds(); + multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow(*context, MultipartUploadAgeOffInterval.name).getMilliseconds(); logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRId64 " ms", int64_t{multipart_upload_ageoff_interval_.count()}); - multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds(); + multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow(*context, MultipartUploadMaxAgeThreshold.name).getMilliseconds(); logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRId64 " ms", int64_t{multipart_upload_max_age_threshold_.count()}); fillUserMetadata(context); diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h index ac16beddd73..802bd1bd8b9 100644 --- a/extensions/aws/processors/PutS3Object.h +++ b/extensions/aws/processors/PutS3Object.h @@ -113,29 +113,33 @@ class PutS3Object : public S3Processor { .isRequired(true) .build(); EXTENSIONAPI static constexpr auto MultipartThreshold = core::PropertyDefinitionBuilder<>::createProperty("Multipart Threshold") - .withDescription("Specifies the file size threshold for switch from the PutS3Object API to the PutS3MultipartUpload API. " - "Flow files bigger than this limit will be sent using the multipart process. The valid range is 5MB to 5GB.") - .withDefaultValue("5 GB") - .isRequired(true) - .build(); + .withDescription("Specifies the file size threshold for switch from the PutS3Object API to the PutS3MultipartUpload API. " + "Flow files bigger than this limit will be sent using the multipart process. The valid range is 5MB to 5GB.") + .withPropertyType(core::StandardPropertyTypes::DATA_SIZE_TYPE) + .withDefaultValue("5 GB") + .isRequired(true) + .build(); EXTENSIONAPI static constexpr auto MultipartPartSize = core::PropertyDefinitionBuilder<>::createProperty("Multipart Part Size") - .withDescription("Specifies the part size for use when the PutS3Multipart Upload API is used. " - "Flow files will be broken into chunks of this size for the upload process, but the last part sent can be smaller since it is not padded. The valid range is 5MB to 5GB.") - .withDefaultValue("5 GB") - .isRequired(true) - .build(); + .withDescription("Specifies the part size for use when the PutS3Multipart Upload API is used. " + "Flow files will be broken into chunks of this size for the upload process, but the last part sent can be smaller since it is not padded. The valid range is 5MB to 5GB.") + .withPropertyType(core::StandardPropertyTypes::DATA_SIZE_TYPE) + .withDefaultValue("5 GB") + .isRequired(true) + .build(); EXTENSIONAPI static constexpr auto MultipartUploadAgeOffInterval = core::PropertyDefinitionBuilder<>::createProperty("Multipart Upload AgeOff Interval") - .withDescription("Specifies the interval at which existing multipart uploads in AWS S3 will be evaluated for ageoff. " - "When processor is triggered it will initiate the ageoff evaluation if this interval has been exceeded.") - .withDefaultValue("60 min") - .isRequired(true) - .build(); + .withDescription("Specifies the interval at which existing multipart uploads in AWS S3 will be evaluated for ageoff. " + "When processor is triggered it will initiate the ageoff evaluation if this interval has been exceeded.") + .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) + .withDefaultValue("60 min") + .isRequired(true) + .build(); EXTENSIONAPI static constexpr auto MultipartUploadMaxAgeThreshold = core::PropertyDefinitionBuilder<>::createProperty("Multipart Upload Max Age Threshold") - .withDescription("Specifies the maximum age for existing multipart uploads in AWS S3. When the ageoff process occurs, any upload older than this threshold will be aborted.") - .withDefaultValue("7 days") - .isRequired(true) - .build(); - EXTENSIONAPI static constexpr auto Properties = minifi::utils::array_cat(S3Processor::Properties, std::array{ + .withDescription("Specifies the maximum age for existing multipart uploads in AWS S3. When the ageoff process occurs, any upload older than this threshold will be aborted.") + .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) + .withDefaultValue("7 days") + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto Properties = minifi::utils::array_cat(S3Processor::Properties, std::array{ ObjectKey, ContentType, StorageClass, diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp index 78d1f4ad98e..a07ff30d691 100644 --- a/extensions/aws/s3/S3Wrapper.cpp +++ b/extensions/aws/s3/S3Wrapper.cpp @@ -25,7 +25,6 @@ #include #include "S3ClientRequestSender.h" -#include "range/v3/algorithm/find.hpp" #include "utils/ArrayUtils.h" #include "utils/StringUtils.h" #include "utils/file/FileUtils.h" diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3Wrapper.h index e9617c9ca47..3c19caff26b 100644 --- a/extensions/aws/s3/S3Wrapper.h +++ b/extensions/aws/s3/S3Wrapper.h @@ -43,6 +43,7 @@ #include "S3RequestSender.h" #include "Exception.h" #include "MultipartUploadStateStorage.h" +#include "range/v3/algorithm/find.hpp" namespace org::apache::nifi::minifi::aws::s3 { diff --git a/extensions/aws/tests/PutS3ObjectTests.cpp b/extensions/aws/tests/PutS3ObjectTests.cpp index 5afe27c55e4..13185ffcdd7 100644 --- a/extensions/aws/tests/PutS3ObjectTests.cpp +++ b/extensions/aws/tests/PutS3ObjectTests.cpp @@ -271,24 +271,24 @@ TEST_CASE_METHOD(PutS3ObjectUploadLimitChangedTestsFixture, "Test multipart uplo auto temp_dir = test_controller.createTempDirectory(); plan->setProperty(s3_processor, "Temporary Directory Multipart State", temp_dir.string()); - plan->setProperty(update_attribute, "s3.permissions.full.users", "myuserid123, myuser@example.com", true); + plan->setDynamicProperty(update_attribute, "s3.permissions.full.users", "myuserid123, myuser@example.com"); plan->setProperty(s3_processor, "FullControl User List", "${s3.permissions.full.users}"); - plan->setProperty(update_attribute, "s3.permissions.read.users", "myuserid456,myuser2@example.com", true); + plan->setDynamicProperty(update_attribute, "s3.permissions.read.users", "myuserid456,myuser2@example.com"); plan->setProperty(s3_processor, "Read Permission User List", "${s3.permissions.read.users}"); - plan->setProperty(update_attribute, "s3.permissions.readacl.users", "myuserid789, otheruser", true); + plan->setDynamicProperty(update_attribute, "s3.permissions.readacl.users", "myuserid789, otheruser"); plan->setProperty(s3_processor, "Read ACL User List", "${s3.permissions.readacl.users}"); - plan->setProperty(update_attribute, "s3.permissions.writeacl.users", "myuser3@example.com", true); + plan->setDynamicProperty(update_attribute, "s3.permissions.writeacl.users", "myuser3@example.com"); plan->setProperty(s3_processor, "Write ACL User List", "${s3.permissions.writeacl.users}"); - plan->setProperty(update_attribute, "s3.permissions.cannedacl", "PublicReadWrite", true); + plan->setDynamicProperty(update_attribute, "s3.permissions.cannedacl", "PublicReadWrite"); plan->setProperty(s3_processor, "Canned ACL", "${s3.permissions.cannedacl}"); - plan->setProperty(s3_processor, "meta_key1", "meta_value1", true); - plan->setProperty(s3_processor, "meta_key2", "meta_value2", true); - plan->setProperty(update_attribute, "test.contentType", "application/tar", true); + plan->setDynamicProperty(s3_processor, "meta_key1", "meta_value1"); + plan->setDynamicProperty(s3_processor, "meta_key2", "meta_value2"); + plan->setDynamicProperty(update_attribute, "test.contentType", "application/tar"); plan->setProperty(s3_processor, "Content Type", "${test.contentType}"); plan->setProperty(s3_processor, "Storage Class", "ReducedRedundancy"); plan->setProperty(s3_processor, "Region", minifi::aws::processors::region::AP_SOUTHEAST_3); plan->setProperty(s3_processor, "Communications Timeout", "10 Sec"); - plan->setProperty(update_attribute, "test.endpoint", "http://localhost:1234", true); + plan->setDynamicProperty(update_attribute, "test.endpoint", "http://localhost:1234"); plan->setProperty(s3_processor, "Endpoint Override URL", "${test.endpoint}"); plan->setProperty(s3_processor, "Server Side Encryption", "AES256");