Skip to content

Commit

Permalink
Review update
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Jul 31, 2023
1 parent eb8ba4c commit c777a50
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 85 deletions.
79 changes: 28 additions & 51 deletions extensions/aws/processors/PutS3Object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,47 +32,6 @@

namespace org::apache::nifi::minifi::aws::processors {

namespace {
class ReadCallback {
public:
ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper,
uint64_t multipart_threshold, uint64_t multipart_size, core::logging::Logger& logger)
: flow_size_(flow_size),
options_(options),
s3_wrapper_(s3_wrapper),
multipart_threshold_(multipart_threshold),
multipart_size_(multipart_size),
logger_(logger) {
}

int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
try {
if (flow_size_ <= multipart_threshold_) {
logger_.log_info("Uploading S3 Object '%s' in a single upload", options_.object_key);
result_ = s3_wrapper_.putObject(options_, stream, flow_size_);
return gsl::narrow<int64_t>(flow_size_);
} else {
logger_.log_info("S3 Object '%s' passes the multipart threshold, uploading it in multiple parts", options_.object_key);
result_ = s3_wrapper_.putObjectMultipart(options_, stream, flow_size_, multipart_size_);
return gsl::narrow<int64_t>(flow_size_);
}
} catch(const aws::s3::StreamReadException& ex) {
logger_.log_error("Error occurred while uploading to S3: %s", ex.what());
return -1;
}
}

uint64_t flow_size_;
const minifi::aws::s3::PutObjectRequestParameters& options_;
aws::s3::S3Wrapper& s3_wrapper_;
uint64_t multipart_threshold_;
uint64_t multipart_size_;
uint64_t read_size_ = 0;
std::optional<minifi::aws::s3::PutObjectResult> result_;
core::logging::Logger& logger_;
};
} // namespace

void PutS3Object::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
Expand Down Expand Up @@ -251,10 +210,14 @@ void PutS3Object::setAttributes(
}

void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) {
const auto now = std::chrono::system_clock::now();
if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
return;
{
std::lock_guard<std::mutex> lock(last_ageoff_mutex_);
const auto now = std::chrono::system_clock::now();
if (now - last_ageoff_time_ < multipart_upload_ageoff_interval_) {
logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
return;
}
last_ageoff_time_ = now;
}

logger_->log_trace("Listing aged off multipart uploads still in progress.");
Expand All @@ -272,7 +235,7 @@ void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properti
logger_->log_info("Found %d aged off pending multipart upload jobs in bucket '%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
size_t aborted = 0;
for (const auto& upload : *aged_off_uploads_in_progress) {
logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s' due to reaching maximum upload age threshold.", upload.key, upload.upload_id, common_properties.bucket);
aws::s3::AbortMultipartUploadRequestParameters abort_params(common_properties.credentials, *client_config_);
abort_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
abort_params.bucket = common_properties.bucket;
Expand All @@ -289,7 +252,6 @@ void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properti
logger_->log_info("Aborted %d pending multipart upload jobs in bucket '%s'", aborted, common_properties.bucket);
}
s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
last_ageoff_time_ = now;
}

void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
Expand All @@ -314,13 +276,28 @@ void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context
return;
}

ReadCallback callback(flow_file->getSize(), *put_s3_request_params, s3_wrapper_, multipart_threshold_, multipart_size_, *logger_);
session->read(flow_file, std::ref(callback));
if (!callback.result_.has_value()) {
std::optional<minifi::aws::s3::PutObjectResult> result;
session->read(flow_file, [this, &flow_file, &put_s3_request_params, &result](const std::shared_ptr<io::InputStream>& stream) -> int64_t {
try {
if (flow_file->getSize() <= multipart_threshold_) {
logger_->log_info("Uploading S3 Object '%s' in a single upload", put_s3_request_params->object_key);
result = s3_wrapper_.putObject(*put_s3_request_params, stream, flow_file->getSize());
return gsl::narrow<int64_t>(flow_file->getSize());
} else {
logger_->log_info("S3 Object '%s' passes the multipart threshold, uploading it in multiple parts", put_s3_request_params->object_key);
result = s3_wrapper_.putObjectMultipart(*put_s3_request_params, stream, flow_file->getSize(), multipart_size_);
return gsl::narrow<int64_t>(flow_file->getSize());
}
} catch(const aws::s3::StreamReadException& ex) {
logger_->log_error("Error occurred while uploading to S3: %s", ex.what());
return -1;
}
});
if (!result.has_value()) {
logger_->log_error("Failed to upload S3 object to bucket '%s'", put_s3_request_params->bucket);
session->transfer(flow_file, Failure);
} else {
setAttributes(session, flow_file, *put_s3_request_params, *callback.result_);
setAttributes(session, flow_file, *put_s3_request_params, *result);
logger_->log_debug("Successfully uploaded S3 object '%s' to bucket '%s'", put_s3_request_params->object_key, put_s3_request_params->bucket);
session->transfer(flow_file, Success);
}
Expand Down
3 changes: 2 additions & 1 deletion extensions/aws/processors/PutS3Object.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ class PutS3Object : public S3Processor {
uint64_t multipart_size_{};
std::chrono::milliseconds multipart_upload_ageoff_interval_;
std::chrono::milliseconds multipart_upload_max_age_threshold_;
std::atomic<std::chrono::time_point<std::chrono::system_clock>> last_ageoff_time_;
std::mutex last_ageoff_mutex_;
std::chrono::time_point<std::chrono::system_clock> last_ageoff_time_;
};

} // namespace org::apache::nifi::minifi::aws::processors
38 changes: 17 additions & 21 deletions extensions/aws/s3/S3Wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObje
auto stream_ptr = readFlowFileStream(stream, next_read_size, read_size);
total_read += read_size;

Aws::S3::Model::UploadPartRequest upload_part_request;
upload_part_request.WithBucket(put_object_params.bucket)
auto upload_part_request = Aws::S3::Model::UploadPartRequest{}
.WithBucket(put_object_params.bucket)
.WithKey(put_object_params.object_key)
.WithPartNumber(part_number)
.WithUploadId(upload_state.upload_id);
Expand Down Expand Up @@ -155,15 +155,15 @@ std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObje

std::optional<Aws::S3::Model::CompleteMultipartUploadResult> S3Wrapper::completeMultipartUpload(const PutObjectRequestParameters& put_object_params,
const S3Wrapper::UploadPartsResult& upload_parts_result) {
Aws::S3::Model::CompleteMultipartUploadRequest complete_multipart_upload_request;
complete_multipart_upload_request.WithBucket(put_object_params.bucket)
auto complete_multipart_upload_request = Aws::S3::Model::CompleteMultipartUploadRequest{}
.WithBucket(put_object_params.bucket)
.WithKey(put_object_params.object_key)
.WithUploadId(upload_parts_result.upload_id);

Aws::S3::Model::CompletedMultipartUpload completed_multipart_upload;
for (size_t i = 0; i < upload_parts_result.part_etags.size(); ++i) {
Aws::S3::Model::CompletedPart part;
part.WithETag(upload_parts_result.part_etags[i])
auto part = Aws::S3::Model::CompletedPart{}
.WithETag(upload_parts_result.part_etags[i])
.WithPartNumber(i + 1);
completed_multipart_upload.AddParts(part);
}
Expand Down Expand Up @@ -218,8 +218,8 @@ std::optional<PutObjectResult> S3Wrapper::putObjectMultipart(const PutObjectRequ
}

bool S3Wrapper::deleteObject(const DeleteObjectRequestParameters& params) {
Aws::S3::Model::DeleteObjectRequest request;
request.WithBucket(params.bucket)
auto request = Aws::S3::Model::DeleteObjectRequest{}
.WithBucket(params.bucket)
.WithKey(params.object_key);
if (!params.version.empty()) {
request.SetVersionId(params.version);
Expand Down Expand Up @@ -343,8 +343,8 @@ std::optional<std::vector<ListedObjectAttributes>> S3Wrapper::listBucket(const L
}

std::optional<std::map<std::string, std::string>> S3Wrapper::getObjectTags(const GetObjectTagsParameters& params) {
Aws::S3::Model::GetObjectTaggingRequest request;
request.WithBucket(params.bucket)
auto request = Aws::S3::Model::GetObjectTaggingRequest{}
.WithBucket(params.bucket)
.WithKey(params.object_key);
if (!params.version.empty()) {
request.SetVersionId(params.version);
Expand All @@ -371,17 +371,16 @@ std::optional<HeadObjectResult> S3Wrapper::headObject(const HeadObjectRequestPar

template<typename ListRequest>
ListRequest S3Wrapper::createListRequest(const ListRequestParameters& params) {
ListRequest request;
request.WithBucket(params.bucket)
return ListRequest{}
.WithBucket(params.bucket)
.WithDelimiter(params.delimiter)
.WithPrefix(params.prefix);
return request;
}

template<typename FetchObjectRequest>
FetchObjectRequest S3Wrapper::createFetchObjectRequest(const GetObjectRequestParameters& get_object_params) {
FetchObjectRequest request;
request.WithBucket(get_object_params.bucket)
auto request = FetchObjectRequest{}
.WithBucket(get_object_params.bucket)
.WithKey(get_object_params.object_key);
if (!get_object_params.version.empty()) {
request.SetVersionId(get_object_params.version);
Expand Down Expand Up @@ -417,10 +416,7 @@ void S3Wrapper::addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::
continue;
}

MultipartUpload filtered_upload;
filtered_upload.key = upload.GetKey();
filtered_upload.upload_id = upload.GetUploadId();
filtered_uploads.push_back(filtered_upload);
filtered_uploads.push_back({.key = upload.GetKey(), .upload_id = upload.GetUploadId()});
}
}

Expand All @@ -446,8 +442,8 @@ std::optional<std::vector<MultipartUpload>> S3Wrapper::listMultipartUploads(cons
}

bool S3Wrapper::abortMultipartUpload(const AbortMultipartUploadRequestParameters& params) {
Aws::S3::Model::AbortMultipartUploadRequest request;
request.WithBucket(params.bucket)
auto request = Aws::S3::Model::AbortMultipartUploadRequest{}
.WithBucket(params.bucket)
.WithKey(params.key)
.WithUploadId(params.upload_id);
return request_sender_->sendAbortMultipartUploadRequest(request, params.credentials, params.client_config, params.use_virtual_addressing);
Expand Down
23 changes: 11 additions & 12 deletions extensions/aws/s3/S3Wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ class S3Wrapper {

template<typename RequestType>
RequestType createPutObjectRequest(const PutObjectRequestParameters& put_object_params) {
RequestType request;
request.WithBucket(put_object_params.bucket)
auto request = RequestType{}
.WithBucket(put_object_params.bucket)
.WithKey(put_object_params.object_key)
.WithStorageClass(minifi::utils::at(STORAGE_CLASS_MAP, put_object_params.storage_class))
.WithServerSideEncryption(minifi::utils::at(SERVER_SIDE_ENCRYPTION_MAP, put_object_params.server_side_encryption))
Expand All @@ -284,16 +284,15 @@ class S3Wrapper {

template<typename ResultType>
PutObjectResult createPutObjectResult(const ResultType& upload_result) {
PutObjectResult put_object_result;
// Etags are returned by AWS in quoted form that should be removed
put_object_result.etag = minifi::utils::StringUtils::removeFramingCharacters(upload_result.GetETag(), '"');
put_object_result.version = upload_result.GetVersionId();

// GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
// s3.expiration only needs the date member of this pair
put_object_result.expiration = getExpiration(upload_result.GetExpiration()).expiration_time;
put_object_result.ssealgorithm = getEncryptionString(upload_result.GetServerSideEncryption());
return put_object_result;
return {
.version = upload_result.GetVersionId(),
// Etags are returned by AWS in quoted form that should be removed
.etag = minifi::utils::StringUtils::removeFramingCharacters(upload_result.GetETag(), '"'),
// GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
// s3.expiration only needs the date member of this pair
.expiration = getExpiration(upload_result.GetExpiration()).expiration_time,
.ssealgorithm = getEncryptionString(upload_result.GetServerSideEncryption())
};
}

static int64_t writeFetchedBody(Aws::IOStream& source, int64_t data_size, io::OutputStream& output);
Expand Down

0 comments on commit c777a50

Please sign in to comment.