Skip to content

Commit

Permalink
Review update
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Jul 20, 2023
1 parent 4630b15 commit c3a8ad8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 39 deletions.
54 changes: 39 additions & 15 deletions extensions/aws/processors/PutS3Object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,46 @@

namespace org::apache::nifi::minifi::aws::processors {

int64_t PutS3Object::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) {
try {
if (flow_size_ <= multipart_threshold_) {
logger_.log_info("Uploading S3 Object '%s' in a single upload", options_.object_key);
result_ = s3_wrapper_.putObject(options_, stream, flow_size_);
return gsl::narrow<int64_t>(flow_size_);
} else {
logger_.log_info("S3 Object '%s' passes the multipart threshold, uploading it in multiple parts", options_.object_key);
result_ = s3_wrapper_.putObjectMultipart(options_, stream, flow_size_, multipart_size_);
return gsl::narrow<int64_t>(flow_size_);
namespace {
class ReadCallback {
public:
ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper,
uint64_t multipart_threshold, uint64_t multipart_size, core::logging::Logger& logger)
: flow_size_(flow_size),
options_(options),
s3_wrapper_(s3_wrapper),
multipart_threshold_(multipart_threshold),
multipart_size_(multipart_size),
logger_(logger) {
}

int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
try {
if (flow_size_ <= multipart_threshold_) {
logger_.log_info("Uploading S3 Object '%s' in a single upload", options_.object_key);
result_ = s3_wrapper_.putObject(options_, stream, flow_size_);
return gsl::narrow<int64_t>(flow_size_);
} else {
logger_.log_info("S3 Object '%s' passes the multipart threshold, uploading it in multiple parts", options_.object_key);
result_ = s3_wrapper_.putObjectMultipart(options_, stream, flow_size_, multipart_size_);
return gsl::narrow<int64_t>(flow_size_);
}
} catch(const aws::s3::StreamReadException& ex) {
logger_.log_error("Error occurred while uploading to S3: %s", ex.what());
return -1;
}
} catch(const aws::s3::StreamReadException& ex) {
logger_.log_error("Error occurred while uploading to S3: %s", ex.what());
return -1;
}
}

uint64_t flow_size_;
const minifi::aws::s3::PutObjectRequestParameters& options_;
aws::s3::S3Wrapper& s3_wrapper_;
uint64_t multipart_threshold_;
uint64_t multipart_size_;
uint64_t read_size_ = 0;
std::optional<minifi::aws::s3::PutObjectResult> result_;
core::logging::Logger& logger_;
};
} // namespace

void PutS3Object::initialize() {
setSupportedProperties(Properties);
Expand Down Expand Up @@ -291,7 +315,7 @@ void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context
return;
}

PutS3Object::ReadCallback callback(flow_file->getSize(), *put_s3_request_params, s3_wrapper_, multipart_threshold_, multipart_size_, *logger_);
ReadCallback callback(flow_file->getSize(), *put_s3_request_params, s3_wrapper_, multipart_threshold_, multipart_size_, *logger_);
session->read(flow_file, std::ref(callback));
if (!callback.result_.has_value()) {
logger_->log_error("Failed to upload S3 object to bucket '%s'", put_s3_request_params->bucket);
Expand Down
24 changes: 0 additions & 24 deletions extensions/aws/processors/PutS3Object.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,30 +174,6 @@ class PutS3Object : public S3Processor {
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;

class ReadCallback {
public:
ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper,
uint64_t multipart_threshold, uint64_t multipart_size, core::logging::Logger& logger)
: flow_size_(flow_size),
options_(options),
s3_wrapper_(s3_wrapper),
multipart_threshold_(multipart_threshold),
multipart_size_(multipart_size),
logger_(logger) {
}

int64_t operator()(const std::shared_ptr<io::InputStream>& stream);

uint64_t flow_size_;
const minifi::aws::s3::PutObjectRequestParameters& options_;
aws::s3::S3Wrapper& s3_wrapper_;
uint64_t multipart_threshold_;
uint64_t multipart_size_;
uint64_t read_size_ = 0;
std::optional<minifi::aws::s3::PutObjectResult> result_;
core::logging::Logger& logger_;
};

protected:
static constexpr uint64_t MIN_PART_SIZE = 5_MiB;
static constexpr uint64_t MAX_UPLOAD_SIZE = 5_GiB;
Expand Down

0 comments on commit c3a8ad8

Please sign in to comment.