diff --git a/PROCESSORS.md b/PROCESSORS.md index 7060dd25a8..2ecccfae10 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -48,6 +48,7 @@ limitations under the License. - [FetchSmb](#FetchSmb) - [FocusArchiveEntry](#FocusArchiveEntry) - [GenerateFlowFile](#GenerateFlowFile) +- [GetCouchbaseKey](#GetCouchbaseKey) - [GetFile](#GetFile) - [GetTCP](#GetTCP) - [HashContent](#HashContent) @@ -1093,6 +1094,45 @@ In the list below, the names of required properties appear in bold. Any other pr | success | success operational on the flow record | +## GetCouchbaseKey + +### Description + +Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the property. NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of the entire FlowFile will be buffered in memory. + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|------------------------------------------|---------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Couchbase Cluster Controller Service** | | | A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster. | +| **Bucket Name** | default | | The name of bucket to access.
**Supports Expression Language: true** | +| Scope Name | | | Scope to use inside the bucket. If not specified, the _default scope is used.
**Supports Expression Language: true** | +| Collection Name | | | Collection to use inside the bucket scope. If not specified, the _default collection is used.
**Supports Expression Language: true** | +| **Document Type** | Json | Json
Binary
String | Content type of the retrieved value. | +| Document Id | | | A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.
**Supports Expression Language: true** | +| Put Value to Attribute | | | If set, the retrieved value will be put into an attribute of the FlowFile instead of a the content of the FlowFile. The attribute key to put to is determined by evaluating value of this property.
**Supports Expression Language: true** | + +### Relationships + +| Name | Description | +|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| success | Values retrieved from Couchbase Server are written as outgoing FlowFiles content or put into an attribute of the incoming FlowFile and routed to this relationship. | +| failure | All FlowFiles failed to fetch from Couchbase Server and not retry-able are routed to this relationship. | +| retry | All FlowFiles failed to fetch from Couchbase Server but can be retried are routed to this relationship. | +| original | The original input FlowFile is routed to this relationship when the value is retrieved from Couchbase Server and routed to 'success'. | + +### Output Attributes + +| Attribute | Relationship | Description | +|----------------------|--------------|---------------------------------------| +| couchbase.bucket | success | Bucket where the document was stored. | +| couchbase.doc.id | success | Id of the document. | +| couchbase.doc.cas | success | CAS of the document. | +| couchbase.doc.expiry | success | Expiration of the document. | + + ## GetFile ### Description diff --git a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py index 1c43530460..9baf01f088 100644 --- a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py +++ b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py @@ -33,7 +33,7 @@ def run_post_startup_commands(self): ["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query", "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"], ["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase", - "--bucket-ramsize", "1024"] + "--bucket-ramsize", "1024", "--max-ttl", "36000"] ] for command in commands: (code, _) = self.client.containers.get(self.name).exec_run(command) @@ -47,7 +47,7 @@ def deploy(self): return self.docker_container = self.client.containers.run( - "couchbase:community-7.6.2", + "couchbase:enterprise-7.2.5", detach=True, name=self.name, network=self.network.name, diff --git a/docker/test/integration/features/couchbase.feature b/docker/test/integration/features/couchbase.feature index f4e352cbef..902efdcd32 100644 --- a/docker/test/integration/features/couchbase.feature +++ b/docker/test/integration/features/couchbase.feature @@ -65,3 +65,111 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the Minifi logs match the following regex: "key:couchbase.partition.uuid value:[1-9][0-9]*" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.partition.id value:[1-9][0-9]*" in less than 1 seconds And a document with id "test_doc_id" in bucket "test_bucket" is present with data '{"field1": "value1"}' of type "Binary" in Couchbase + + Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor + Given a Couchbase server is set up + And a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" + And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And a CouchbaseClusterService is setup up for PutCouchbaseKey with the name "CouchbaseClusterService" + And a CouchbaseClusterService is setup up for GetCouchbaseKey with the name "CouchbaseClusterService" + + And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey + And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When a Couchbase server is started + And all instances start up + + Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 60 seconds + And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds + And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds + + Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using binary storage + Given a Couchbase server is set up + And a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" + And the "Document Type" property of the PutCouchbaseKey processor is set to "Binary" + And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" + And the "Document Type" property of the GetCouchbaseKey processor is set to "Binary" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And a CouchbaseClusterService is setup up for PutCouchbaseKey with the name "CouchbaseClusterService" + And a CouchbaseClusterService is setup up for GetCouchbaseKey with the name "CouchbaseClusterService" + + And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey + And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When a Couchbase server is started + And all instances start up + + Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 60 seconds + And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds + And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds + + Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor and put the result in an attribute + Given a Couchbase server is set up + And a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" + And the "Document Type" property of the PutCouchbaseKey processor is set to "String" + And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" + And the "Document Type" property of the GetCouchbaseKey processor is set to "String" + And the "Put Value to Attribute" property of the GetCouchbaseKey processor is set to "get_couchbase_result" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And a CouchbaseClusterService is setup up for PutCouchbaseKey with the name "CouchbaseClusterService" + And a CouchbaseClusterService is setup up for GetCouchbaseKey with the name "CouchbaseClusterService" + + And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey + And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When a Couchbase server is started + And all instances start up + + Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 60 seconds + And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds + And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds + And the Minifi logs contain the following message: 'key:get_couchbase_result value:{"field1": "value1", "field2": "value2"}' in less than 1 seconds + + Scenario: GetCouchbaseKey transfers FlowFile to failure relationship on Couchbase value type mismatch + Given a Couchbase server is set up + And a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" + And the "Document Type" property of the PutCouchbaseKey processor is set to "String" + And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" + And the "Document Type" property of the GetCouchbaseKey processor is set to "Binary" + And a CouchbaseClusterService is setup up for PutCouchbaseKey with the name "CouchbaseClusterService" + And a CouchbaseClusterService is setup up for GetCouchbaseKey with the name "CouchbaseClusterService" + + And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey + And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + + When a Couchbase server is started + And all instances start up + + Then the Minifi logs contain the following message: "Failed to get content for document 'test_doc_id' from collection 'test_bucket._default._default' with the following exception: 'raw_binary_transcoder expects document to have BINARY common flags" in less than 60 seconds diff --git a/docker/test/integration/minifi/processors/GetCouchbaseKey.py b/docker/test/integration/minifi/processors/GetCouchbaseKey.py new file mode 100644 index 0000000000..0b48dcbdd2 --- /dev/null +++ b/docker/test/integration/minifi/processors/GetCouchbaseKey.py @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.Processor import Processor + + +class GetCouchbaseKey(Processor): + def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): + super(GetCouchbaseKey, self).__init__( + context=context, + clazz='GetCouchbaseKey', + auto_terminate=['success', 'failure', 'retry'], + schedule=schedule) diff --git a/docker/test/integration/minifi/processors/PutCouchbaseKey.py b/docker/test/integration/minifi/processors/PutCouchbaseKey.py index 5e94aaa07b..341338771b 100644 --- a/docker/test/integration/minifi/processors/PutCouchbaseKey.py +++ b/docker/test/integration/minifi/processors/PutCouchbaseKey.py @@ -20,5 +20,5 @@ def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): super(PutCouchbaseKey, self).__init__( context=context, clazz='PutCouchbaseKey', - auto_terminate=['success', 'failure'], + auto_terminate=['success', 'failure', 'retry'], schedule=schedule) diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp index 8abf7985c1..bcab7b90f8 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp @@ -22,6 +22,7 @@ #include "couchbase/codec/raw_json_transcoder.hxx" #include "core/Resource.h" +#include "utils/TimeUtil.h" namespace org::apache::nifi::minifi::couchbase { @@ -70,16 +71,57 @@ nonstd::expected CouchbaseClient::ups document_id, collection.bucket_name, collection.scope_name, collection.collection_name, upsert_err.ec(), upsert_err.message()); return nonstd::make_unexpected(CouchbaseErrorType::FATAL); } else { - const uint64_t partition_uuid = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_uuid() : 0); - const uint64_t sequence_number = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->sequence_number() : 0); - const uint16_t partition_id = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_id() : 0); - return CouchbaseUpsertResult { - collection.bucket_name, - upsert_resp.cas().value(), - partition_uuid, - sequence_number, - partition_id - }; + CouchbaseUpsertResult result; + result.bucket_name = collection.bucket_name; + result.cas = upsert_resp.cas().value(); + result.partition_uuid = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_uuid() : 0); + result.sequence_number = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->sequence_number() : 0); + result.partition_id = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_id() : 0); + return result; + } +} + +nonstd::expected CouchbaseClient::get(const CouchbaseCollection& collection, + const std::string& document_id, CouchbaseValueType return_type) { + auto collection_result = getCollection(collection); + if (!collection_result.has_value()) { + return nonstd::make_unexpected(collection_result.error()); + } + + ::couchbase::get_options options; + options.with_expiry(true); + auto [get_err, resp] = collection_result->get(document_id, options).get(); + if (get_err.ec()) { + if (get_err.ec().value() == static_cast(::couchbase::errc::common::unambiguous_timeout) || get_err.ec().value() == static_cast(::couchbase::errc::common::ambiguous_timeout)) { + logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' due to timeout", + document_id, collection.bucket_name, collection.scope_name, collection.collection_name); + return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY); + } + std::string cause = get_err.cause() ? get_err.cause()->message() : ""; + logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' with error code: '{}', message: '{}'", + document_id, collection.bucket_name, collection.scope_name, collection.collection_name, get_err.ec(), get_err.message()); + return nonstd::make_unexpected(CouchbaseErrorType::FATAL); + } else { + try { + CouchbaseGetResult result; + result.bucket_name = collection.bucket_name; + result.cas = resp.cas().value(); + if (return_type == CouchbaseValueType::Json) { + result.value = resp.content_as<::couchbase::codec::binary, ::couchbase::codec::raw_json_transcoder>(); + } else if (return_type == CouchbaseValueType::String) { + result.value = resp.content_as<::couchbase::codec::raw_string_transcoder>(); + } else { + result.value = resp.content_as<::couchbase::codec::raw_binary_transcoder>(); + } + if (resp.expiry_time().has_value()) { + result.expiry = utils::timeutils::getTimeStr(*resp.expiry_time()); + } + return result; + } catch(const std::exception& ex) { + logger_->log_error("Failed to get content for document '{}' from collection '{}.{}.{}' with the following exception: '{}'", + document_id, collection.bucket_name, collection.scope_name, collection.collection_name, ex.what()); + return nonstd::make_unexpected(CouchbaseErrorType::FATAL); + } } } diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.h b/extensions/couchbase/controllerservices/CouchbaseClusterService.h index 0103404c2f..1a065d518a 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.h +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.h @@ -21,6 +21,7 @@ #include #include #include +#include #include "core/controller/ControllerService.h" #include "core/PropertyDefinition.h" @@ -38,9 +39,17 @@ struct CouchbaseCollection { std::string collection_name; }; -struct CouchbaseUpsertResult { +struct CouchbaseCallResult { std::string bucket_name; std::uint64_t cas{0}; +}; + +struct CouchbaseGetResult : public CouchbaseCallResult { + std::string expiry; + std::variant, std::string> value; +}; + +struct CouchbaseUpsertResult : public CouchbaseCallResult { std::uint64_t sequence_number{0}; std::uint64_t partition_uuid{0}; std::uint16_t partition_id{0}; @@ -65,6 +74,7 @@ class CouchbaseClient { nonstd::expected upsert(const CouchbaseCollection& collection, CouchbaseValueType document_type, const std::string& document_id, const std::vector& buffer, const ::couchbase::upsert_options& options); + nonstd::expected get(const CouchbaseCollection& collection, const std::string& document_id, CouchbaseValueType return_type); nonstd::expected establishConnection(); void close(); @@ -152,6 +162,11 @@ class CouchbaseClusterService : public core::controller::ControllerService { return client_->upsert(collection, document_type, document_id, buffer, options); } + virtual nonstd::expected get(const CouchbaseCollection& collection, const std::string& document_id, CouchbaseValueType return_type) { + gsl_Expects(client_); + return client_->get(collection, document_id, return_type); + } + static gsl::not_null> getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property); private: diff --git a/extensions/couchbase/processors/GetCouchbaseKey.cpp b/extensions/couchbase/processors/GetCouchbaseKey.cpp new file mode 100644 index 0000000000..89cf160f64 --- /dev/null +++ b/extensions/couchbase/processors/GetCouchbaseKey.cpp @@ -0,0 +1,111 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "GetCouchbaseKey.h" +#include "utils/gsl.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" + +namespace org::apache::nifi::minifi::couchbase::processors { + +void GetCouchbaseKey::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + couchbase_cluster_service_ = controllers::CouchbaseClusterService::getFromProperty(context, GetCouchbaseKey::CouchbaseClusterControllerService); + document_type_ = utils::parseEnumProperty(context, GetCouchbaseKey::DocumentType); +} + +void GetCouchbaseKey::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + gsl_Expects(couchbase_cluster_service_); + + auto flow_file = session.get(); + if (!flow_file) { + context.yield(); + return; + } + + CouchbaseCollection collection; + if (!context.getProperty(BucketName, collection.bucket_name, flow_file.get()) || collection.bucket_name.empty()) { + logger_->log_error("Bucket '{}' is invalid or empty!", collection.bucket_name); + session.transfer(flow_file, Failure); + return; + } + + if (!context.getProperty(ScopeName, collection.scope_name, flow_file.get()) || collection.scope_name.empty()) { + collection.scope_name = ::couchbase::scope::default_name; + } + + if (!context.getProperty(CollectionName, collection.collection_name, flow_file.get()) || collection.collection_name.empty()) { + collection.collection_name = ::couchbase::collection::default_name; + } + + std::string document_id; + if (!context.getProperty(DocumentId, document_id, flow_file.get()) || document_id.empty()) { + auto ff_content = session.readBuffer(flow_file).buffer; + document_id = std::string(reinterpret_cast(ff_content.data()), ff_content.size()); + } + + if (document_id.empty()) { + logger_->log_error("Document ID is empty, transferring FlowFile to failure relationship"); + session.transfer(flow_file, Failure); + return; + } + + std::string attribute_to_put_result_to; + context.getProperty(PutValueToAttribute, attribute_to_put_result_to, flow_file.get()); + + if (auto get_result = couchbase_cluster_service_->get(collection, document_id, document_type_)) { + if (!attribute_to_put_result_to.empty()) { + if (document_type_ == CouchbaseValueType::String) { + session.putAttribute(*flow_file, attribute_to_put_result_to, std::get(get_result->value)); + } else { + auto& binary_data = std::get>(get_result->value); + std::string str_value{reinterpret_cast(binary_data.data()), binary_data.size()}; + session.putAttribute(*flow_file, attribute_to_put_result_to, str_value); + } + } else { + session.write(flow_file, [&, this](const std::shared_ptr& stream) -> int64_t { + if (document_type_ == CouchbaseValueType::String) { + auto& value = std::get(get_result->value); + stream->write(value); + return gsl::narrow(value.size()); + } else { + auto& value = std::get>(get_result->value); + stream->write(value); + return gsl::narrow(value.size()); + } + }); + } + + session.putAttribute(*flow_file, "couchbase.bucket", get_result->bucket_name); + session.putAttribute(*flow_file, "couchbase.doc.id", document_id); + session.putAttribute(*flow_file, "couchbase.doc.cas", std::to_string(get_result->cas)); + session.putAttribute(*flow_file, "couchbase.doc.expiry", get_result->expiry); + session.transfer(flow_file, Success); + } else if (get_result.error() == CouchbaseErrorType::TEMPORARY) { + logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' due to timeout, transferring to retry relationship", + document_id, collection.bucket_name, collection.scope_name, collection.collection_name); + session.transfer(flow_file, Retry); + } else { + logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}', transferring to failure relationship", + document_id, collection.bucket_name, collection.scope_name, collection.collection_name); + session.transfer(flow_file, Failure); + } +} + +REGISTER_RESOURCE(GetCouchbaseKey, Processor); + +} // namespace org::apache::nifi::minifi::couchbase::processors diff --git a/extensions/couchbase/processors/GetCouchbaseKey.h b/extensions/couchbase/processors/GetCouchbaseKey.h new file mode 100644 index 0000000000..c286561e4f --- /dev/null +++ b/extensions/couchbase/processors/GetCouchbaseKey.h @@ -0,0 +1,112 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include "core/AbstractProcessor.h" +#include "core/ProcessSession.h" +#include "core/logging/LoggerConfiguration.h" +#include "CouchbaseClusterService.h" + +namespace org::apache::nifi::minifi::couchbase::processors { + +class GetCouchbaseKey final : public core::AbstractProcessor { + public: + using core::AbstractProcessor::AbstractProcessor; + + EXTENSIONAPI static constexpr const char* Description = "Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the " + " property. NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of " + "the entire FlowFile will be buffered in memory."; + + EXTENSIONAPI static constexpr auto CouchbaseClusterControllerService = core::PropertyDefinitionBuilder<>::createProperty("Couchbase Cluster Controller Service") + .withDescription("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.") + .withAllowedTypes() + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto BucketName = core::PropertyDefinitionBuilder<>::createProperty("Bucket Name") + .withDescription("The name of bucket to access.") + .withDefaultValue("default") + .isRequired(true) + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto ScopeName = core::PropertyDefinitionBuilder<>::createProperty("Scope Name") + .withDescription("Scope to use inside the bucket. If not specified, the _default scope is used.") + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto CollectionName = core::PropertyDefinitionBuilder<>::createProperty("Collection Name") + .withDescription("Collection to use inside the bucket scope. If not specified, the _default collection is used.") + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto DocumentType = core::PropertyDefinitionBuilder<3>::createProperty("Document Type") + .withDescription("Content type of the retrieved value.") + .isRequired(true) + .withDefaultValue(magic_enum::enum_name(CouchbaseValueType::Json)) + .withAllowedValues(magic_enum::enum_names()) + .build(); + EXTENSIONAPI static constexpr auto DocumentId = core::PropertyDefinitionBuilder<>::createProperty("Document Id") + .withDescription("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.") + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto PutValueToAttribute = core::PropertyDefinitionBuilder<>::createProperty("Put Value to Attribute") + .withDescription("If set, the retrieved value will be put into an attribute of the FlowFile instead of a the content of the FlowFile. " + "The attribute key to put to is determined by evaluating value of this property.") + .supportsExpressionLanguage(true) + .build(); + + EXTENSIONAPI static constexpr auto Properties = std::to_array({ + CouchbaseClusterControllerService, + BucketName, + ScopeName, + CollectionName, + DocumentType, + DocumentId, + PutValueToAttribute + }); + + EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", + "Values retrieved from Couchbase Server are written as outgoing FlowFiles content or put into an attribute of the incoming FlowFile and routed to this relationship."}; + EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "All FlowFiles failed to fetch from Couchbase Server and not retry-able are routed to this relationship."}; + EXTENSIONAPI static constexpr auto Retry = core::RelationshipDefinition{"retry", "All FlowFiles failed to fetch from Couchbase Server but can be retried are routed to this relationship."}; + EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", + "The original input FlowFile is routed to this relationship when the value is retrieved from Couchbase Server and routed to 'success'."}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure, Retry, Original}; + + EXTENSIONAPI static constexpr auto CouchbaseBucket = core::OutputAttributeDefinition<>{"couchbase.bucket", {Success}, "Bucket where the document was stored."}; + EXTENSIONAPI static constexpr auto CouchbaseDocId = core::OutputAttributeDefinition<>{"couchbase.doc.id", {Success}, "Id of the document."}; + EXTENSIONAPI static constexpr auto CouchbaseDocCas = core::OutputAttributeDefinition<>{"couchbase.doc.cas", {Success}, "CAS of the document."}; + EXTENSIONAPI static constexpr auto CouchbaseDocExpiry = core::OutputAttributeDefinition<>{"couchbase.doc.expiry", {Success}, "Expiration of the document."}; + EXTENSIONAPI static constexpr auto OutputAttributes = std::array{ + CouchbaseBucket, CouchbaseDocId, CouchbaseDocCas, CouchbaseDocExpiry}; + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; + EXTENSIONAPI static constexpr bool IsSingleThreaded = false; + + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& sessionFactory) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + + private: + std::shared_ptr couchbase_cluster_service_; + CouchbaseValueType document_type_ = CouchbaseValueType::Json; +}; + +} // namespace org::apache::nifi::minifi::couchbase::processors diff --git a/extensions/couchbase/processors/PutCouchbaseKey.h b/extensions/couchbase/processors/PutCouchbaseKey.h index ca79b7dba9..c4bdced4cd 100644 --- a/extensions/couchbase/processors/PutCouchbaseKey.h +++ b/extensions/couchbase/processors/PutCouchbaseKey.h @@ -151,7 +151,6 @@ class PutCouchbaseKey final : public core::AbstractProcessor { private: std::shared_ptr couchbase_cluster_service_; - std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(uuid_); CouchbaseValueType document_type_ = CouchbaseValueType::Json; ::couchbase::persist_to persist_to_ = ::couchbase::persist_to::none; ::couchbase::replicate_to replicate_to_ = ::couchbase::replicate_to::none; diff --git a/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp b/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp new file mode 100644 index 0000000000..69f84aa066 --- /dev/null +++ b/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp @@ -0,0 +1,163 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "unit/SingleProcessorTestController.h" +#include "processors/GetCouchbaseKey.h" +#include "MockCouchbaseClusterService.h" +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::couchbase::test { + +REGISTER_RESOURCE(MockCouchbaseClusterService, ControllerService); + +struct ExpectedCallOptions { + std::string bucket_name; + std::string scope_name; + std::string collection_name; + std::string doc_id; + couchbase::CouchbaseValueType document_type; +}; + +class GetCouchbaseKeyTestController : public TestController { + public: + GetCouchbaseKeyTestController() { + auto controller_service_node = controller_.plan->addController("MockCouchbaseClusterService", "MockCouchbaseClusterService"); + mock_couchbase_cluster_service_ = std::static_pointer_cast(controller_service_node->getControllerServiceImplementation()); + proc_->setProperty(processors::GetCouchbaseKey::CouchbaseClusterControllerService, "MockCouchbaseClusterService"); + } + + void verifyResults(const minifi::test::ProcessorTriggerResult& results, const minifi::core::Relationship& expected_result, const ExpectedCallOptions& expected_call_options, + const std::string& input) const { + std::shared_ptr flow_file; + if (expected_result == processors::GetCouchbaseKey::Success) { + REQUIRE(results.at(processors::GetCouchbaseKey::Success).size() == 1); + REQUIRE(results.at(processors::GetCouchbaseKey::Failure).empty()); + REQUIRE(results.at(processors::GetCouchbaseKey::Retry).empty()); + flow_file = results.at(processors::GetCouchbaseKey::Success)[0]; + } else if (expected_result == processors::GetCouchbaseKey::Failure) { + REQUIRE(results.at(processors::GetCouchbaseKey::Success).empty()); + REQUIRE(results.at(processors::GetCouchbaseKey::Failure).size() == 1); + REQUIRE(results.at(processors::GetCouchbaseKey::Retry).empty()); + flow_file = results.at(processors::GetCouchbaseKey::Failure)[0]; + REQUIRE(LogTestController::getInstance().contains("Failed to get document", 1s)); + } else { + REQUIRE(results.at(processors::GetCouchbaseKey::Success).empty()); + REQUIRE(results.at(processors::GetCouchbaseKey::Failure).empty()); + REQUIRE(results.at(processors::GetCouchbaseKey::Retry).size() == 1); + flow_file = results.at(processors::GetCouchbaseKey::Retry)[0]; + } + + auto get_collection_parameters = mock_couchbase_cluster_service_->getCollectionParameter(); + CHECK(get_collection_parameters.bucket_name == expected_call_options.bucket_name); + CHECK(get_collection_parameters.collection_name == expected_call_options.collection_name); + CHECK(get_collection_parameters.scope_name == expected_call_options.scope_name); + + auto get_parameters = mock_couchbase_cluster_service_->getGetParameters(); + CHECK(get_parameters.document_id == expected_call_options.doc_id); + CHECK(get_parameters.document_type == expected_call_options.document_type); + + if (expected_result != processors::GetCouchbaseKey::Success) { + return; + } + + CHECK(flow_file->getAttribute("couchbase.bucket").value() == expected_call_options.bucket_name); + CHECK(flow_file->getAttribute("couchbase.doc.id").value() == expected_call_options.doc_id); + CHECK(flow_file->getAttribute("couchbase.doc.cas").value() == std::to_string(COUCHBASE_GET_RESULT_CAS)); + CHECK(flow_file->getAttribute("couchbase.doc.expiry").value() == COUCHBASE_GET_RESULT_EXPIRY); + std::string value; + proc_->getProperty(processors::GetCouchbaseKey::PutValueToAttribute, value); + if (!value.empty()) { + CHECK(flow_file->getAttribute(value).value() == COUCHBASE_GET_RESULT_CONTENT); + CHECK(controller_.plan->getContent(flow_file) == input); + } else { + CHECK(controller_.plan->getContent(flow_file) == COUCHBASE_GET_RESULT_CONTENT); + } + } + + protected: + std::shared_ptr proc_ = std::make_shared("GetCouchbaseKey"); + minifi::test::SingleProcessorTestController controller_{proc_}; + std::shared_ptr mock_couchbase_cluster_service_; +}; + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Invalid Couchbase cluster controller service", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::CouchbaseClusterControllerService, "invalid"); + REQUIRE_THROWS_AS(controller_.trigger({minifi::test::InputFlowFileData{"couchbase_id"}}), minifi::Exception); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Invalid bucket name", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, ""); + auto results = controller_.trigger({minifi::test::InputFlowFileData{"couchbase_id"}}); + REQUIRE(results[processors::GetCouchbaseKey::Failure].size() == 1); + REQUIRE(LogTestController::getInstance().contains("Bucket '' is invalid or empty!", 1s)); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Document ID is empty and no content is present to use", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket"); + auto results = controller_.trigger({minifi::test::InputFlowFileData{""}}); + REQUIRE(results.at(processors::GetCouchbaseKey::Success).empty()); + REQUIRE(results.at(processors::GetCouchbaseKey::Failure).size() == 1); + REQUIRE(results.at(processors::GetCouchbaseKey::Retry).empty()); + REQUIRE(LogTestController::getInstance().contains("Document ID is empty, transferring FlowFile to failure relationship", 1s)); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Get succeeeds with default properties", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket"); + const std::string input = "couchbase_id"; + auto results = controller_.trigger({minifi::test::InputFlowFileData{input}}); + verifyResults(results, processors::GetCouchbaseKey::Success, ExpectedCallOptions{"mybucket", "_default", "_default", input, couchbase::CouchbaseValueType::Json}, input); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Get succeeeds with optional properties", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket"); + proc_->setProperty(processors::GetCouchbaseKey::ScopeName, "scope1"); + proc_->setProperty(processors::GetCouchbaseKey::CollectionName, "collection1"); + proc_->setProperty(processors::GetCouchbaseKey::DocumentId, "important_doc"); + proc_->setProperty(processors::GetCouchbaseKey::DocumentType, "Binary"); + auto results = controller_.trigger({minifi::test::InputFlowFileData{""}}); + verifyResults(results, processors::GetCouchbaseKey::Success, ExpectedCallOptions{"mybucket", "scope1", "collection1", "important_doc", couchbase::CouchbaseValueType::Binary}, ""); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Get fails with default properties", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket"); + mock_couchbase_cluster_service_->setGetError(CouchbaseErrorType::FATAL); + const std::string input = "couchbase_id"; + auto results = controller_.trigger({minifi::test::InputFlowFileData{input}}); + verifyResults(results, processors::GetCouchbaseKey::Failure, ExpectedCallOptions{"mybucket", "_default", "_default", input, couchbase::CouchbaseValueType::Json}, input); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "FlowFile is transferred to retry relationship when temporary error is returned", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket"); + mock_couchbase_cluster_service_->setGetError(CouchbaseErrorType::TEMPORARY); + const std::string input = "couchbase_id"; + auto results = controller_.trigger({minifi::test::InputFlowFileData{input}}); + verifyResults(results, processors::GetCouchbaseKey::Retry, ExpectedCallOptions{"mybucket", "_default", "_default", input, couchbase::CouchbaseValueType::Json}, input); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Get result is written to attribute", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket"); + proc_->setProperty(processors::GetCouchbaseKey::DocumentType, "String"); + proc_->setProperty(processors::GetCouchbaseKey::PutValueToAttribute, "myattribute"); + const std::string input = "couchbase_id"; + auto results = controller_.trigger({minifi::test::InputFlowFileData{input}}); + verifyResults(results, processors::GetCouchbaseKey::Success, ExpectedCallOptions{"mybucket", "_default", "_default", input, couchbase::CouchbaseValueType::String}, input); +} + +} // namespace org::apache::nifi::minifi::couchbase::test diff --git a/extensions/couchbase/tests/MockCouchbaseClusterService.h b/extensions/couchbase/tests/MockCouchbaseClusterService.h index a2757f16c0..2938f47fa3 100644 --- a/extensions/couchbase/tests/MockCouchbaseClusterService.h +++ b/extensions/couchbase/tests/MockCouchbaseClusterService.h @@ -29,6 +29,9 @@ const std::uint64_t COUCHBASE_PUT_RESULT_CAS = 9876; const std::uint64_t COUCHBASE_PUT_RESULT_SEQUENCE_NUMBER = 345; const std::uint64_t COUCHBASE_PUT_RESULT_PARTITION_UUID = 7890123456; const std::uint16_t COUCHBASE_PUT_RESULT_PARTITION_ID = 1234; +const std::string COUCHBASE_GET_RESULT_EXPIRY = "2024/10/14 09:37:43.000Z"; +const std::string COUCHBASE_GET_RESULT_CONTENT = "abc"; +const uint64_t COUCHBASE_GET_RESULT_CAS = 1234567; struct UpsertParameters { CouchbaseValueType document_type; @@ -36,6 +39,10 @@ struct UpsertParameters { std::vector buffer; ::couchbase::upsert_options options; }; +struct GetParameters { + std::string document_id; + CouchbaseValueType document_type; +}; class MockCouchbaseClusterService : public controllers::CouchbaseClusterService { public: @@ -61,10 +68,30 @@ class MockCouchbaseClusterService : public controllers::CouchbaseClusterService } } + nonstd::expected get(const CouchbaseCollection& collection, const std::string& document_id, CouchbaseValueType document_type) override { + collection_ = collection; + get_parameters_.document_id = document_id; + get_parameters_.document_type = document_type; + + if (get_error_) { + return nonstd::make_unexpected(*get_error_); + } else { + if (document_type == CouchbaseValueType::String) { + return CouchbaseGetResult{collection_.bucket_name, COUCHBASE_GET_RESULT_CAS, COUCHBASE_GET_RESULT_EXPIRY, COUCHBASE_GET_RESULT_CONTENT}; + } + return CouchbaseGetResult{collection_.bucket_name, COUCHBASE_GET_RESULT_CAS, COUCHBASE_GET_RESULT_EXPIRY, + std::vector{static_cast('a'), static_cast('b'), static_cast('c')}}; + } + } + UpsertParameters getUpsertParameters() const { return upsert_parameters_; } + GetParameters getGetParameters() const { + return get_parameters_; + } + CouchbaseCollection getCollectionParameter() const { return collection_; } @@ -73,9 +100,15 @@ class MockCouchbaseClusterService : public controllers::CouchbaseClusterService upsert_error_ = upsert_error; } + void setGetError(const CouchbaseErrorType get_error) { + get_error_ = get_error; + } + private: CouchbaseCollection collection_; UpsertParameters upsert_parameters_; + GetParameters get_parameters_; std::optional upsert_error_; + std::optional get_error_; }; } // namespace org::apache::nifi::minifi::couchbase::test diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index f720733e8d..6f25c176c3 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -238,6 +238,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state std::string cron_period_; gsl::not_null> metrics_; + std::shared_ptr logger_; + private: mutable std::mutex mutex_; std::atomic yield_expiration_{}; @@ -254,8 +256,6 @@ class Processor : public Connectable, public ConfigurableComponent, public state // an outgoing connection allows us to reach these nodes std::unordered_map> reachable_processors_; - - std::shared_ptr logger_; }; } // namespace core