Skip to content

Commit

Permalink
Fix clang issue and revise temporary failures
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Oct 9, 2024
1 parent 64ec4f3 commit 21e4e6b
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> CouchbaseClient::ups
auto [upsert_err, upsert_resp] = collection_result->upsert<::couchbase::codec::raw_binary_transcoder>(document_id, buffer, options).get();
if (upsert_err.ec()) {
setConnectionError();
if (upsert_err.ec().value() == static_cast<int>(::couchbase::errc::common::unambiguous_timeout)) {
logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' due to timeout",
document_id, collection.bucket_name, collection.scope_name, collection.collection_name);
// ambiguous_timeout should not be retried as we do not know if the insert was successful or not
if (getErrorType(upsert_err.ec()) == CouchbaseErrorType::TEMPORARY && upsert_err.ec().value() != static_cast<int>(::couchbase::errc::common::ambiguous_timeout)) {
logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' due to temporary issue, error code: '{}', message: '{}'",
document_id, collection.bucket_name, collection.scope_name, collection.collection_name, upsert_err.ec(), upsert_err.message());
return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY);
}
std::string cause = upsert_err.cause() ? upsert_err.cause()->message() : "";
logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' with error code: '{}', message: '{}'",
document_id, collection.bucket_name, collection.scope_name, collection.collection_name, upsert_err.ec(), upsert_err.message());
return nonstd::make_unexpected(CouchbaseErrorType::FATAL);
Expand All @@ -62,7 +62,7 @@ nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> CouchbaseClient::ups
const uint64_t sequence_number = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->sequence_number() : 0);
const uint16_t partition_id = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_id() : 0);
return CouchbaseUpsertResult {
std::string(collection.bucket_name),
collection.bucket_name,
upsert_resp.cas().value(),
partition_uuid,
sequence_number,
Expand Down Expand Up @@ -103,7 +103,6 @@ std::optional<CouchbaseErrorType> CouchbaseClient::establishConnection() {
auto options = ::couchbase::cluster_options(username_, password_);
auto [connect_err, cluster] = ::couchbase::cluster::connect(connection_string_, options).get();
if (connect_err.ec()) {
std::string cause = connect_err.cause() ? connect_err.cause()->message() : "";
logger_->log_error("Failed to connect to Couchbase cluster with error code: '{}' and message: '{}'", connect_err.ec(), connect_err.message());
return getErrorType(connect_err.ec());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class CouchbaseClient {
static constexpr std::array<::couchbase::errc::common, 9> temporary_connection_errors = {
::couchbase::errc::common::temporary_failure,
::couchbase::errc::common::request_canceled,
::couchbase::errc::common::service_not_available,
::couchbase::errc::common::internal_server_failure,
::couchbase::errc::common::cas_mismatch,
::couchbase::errc::common::ambiguous_timeout,
Expand Down
4 changes: 2 additions & 2 deletions extensions/couchbase/processors/PutCouchbaseKey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ void PutCouchbaseKey::onTrigger(core::ProcessContext& context, core::ProcessSess
options.durability(persist_to_, replicate_to_);
auto result = session.readBuffer(flow_file);
if (auto upsert_result = couchbase_cluster_service_->upsert(collection, document_id, result.buffer, options)) {
session.putAttribute(*flow_file, "couchbase.bucket", std::string(upsert_result->bucket_name));
session.putAttribute(*flow_file, "couchbase.bucket", upsert_result->bucket_name);
session.putAttribute(*flow_file, "couchbase.doc.id", document_id);
session.putAttribute(*flow_file, "couchbase.doc.cas", std::to_string(upsert_result->cas));
session.putAttribute(*flow_file, "couchbase.doc.sequence.number", std::to_string(upsert_result->sequence_number));
session.putAttribute(*flow_file, "couchbase.partition.uuid", std::to_string(upsert_result->partition_uuid));
session.putAttribute(*flow_file, "couchbase.partition.id", std::to_string(upsert_result->partition_id));
session.transfer(flow_file, Success);
} else if (upsert_result.error() == CouchbaseErrorType::TEMPORARY) {
logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' due to timeout, transferring to retry relationship",
logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' due to temporary issue, transferring to retry relationship",
document_id, collection.bucket_name, collection.scope_name, collection.collection_name);
session.transfer(flow_file, Retry);
} else {
Expand Down
1 change: 0 additions & 1 deletion extensions/couchbase/tests/MockCouchbaseClusterService.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,5 @@ class MockCouchbaseClusterService : public controllers::CouchbaseClusterService
CouchbaseCollection collection_;
UpsertParameters upsert_parameters_;
std::optional<CouchbaseErrorType> upsert_error_;
bool get_collection_succeeds_{true};
};
} // namespace org::apache::nifi::minifi::couchbase::test

0 comments on commit 21e4e6b

Please sign in to comment.