Skip to content

Commit

Permalink
Add Document Type property
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Oct 14, 2024
1 parent cda0f92 commit 25a84f8
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 16 deletions.
1 change: 1 addition & 0 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2218,6 +2218,7 @@ In the list below, the names of required properties appear in bold. Any other pr
| **Bucket Name** | default | | The name of bucket to access.<br/>**Supports Expression Language: true** |
| Scope Name | | | Scope to use inside the bucket. If not specified, the _default scope is used.<br/>**Supports Expression Language: true** |
| Collection Name | | | Collection to use inside the bucket scope. If not specified, the _default collection is used.<br/>**Supports Expression Language: true** |
| **Document Type** | Json | Json<br/>Binary<br/>String | Content type to store data as. |
| Document Id | | | A static, fixed Couchbase document id, or an expression to construct the Couchbase document id. If not specified, the FlowFile UUID will be used.<br/>**Supports Expression Language: true** |
| **Persist To** | NONE | NONE<br/>ACTIVE<br/>ONE<br/>TWO<br/>THREE<br/>FOUR | Durability constraint about disk persistence. |
| **Replicate To** | NONE | NONE<br/>ONE<br/>TWO<br/>THREE | Durability constraint about replication. |
Expand Down
1 change: 1 addition & 0 deletions docker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ azure-storage-blob==12.13.0
prometheus-api-client==0.5.0
humanfriendly==10.0
requests<2.29 # https://github.com/docker/docker-py/issues/3113
couchbase==4.3.2
5 changes: 5 additions & 0 deletions docker/test/integration/cluster/DockerTestCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from .checkers.SplunkChecker import SplunkChecker
from .checkers.GrafanaLokiChecker import GrafanaLokiChecker
from .checkers.ModbusChecker import ModbusChecker
from .checkers.CouchbaseChecker import CouchbaseChecker
from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage, retry_check


Expand All @@ -54,6 +55,7 @@ def __init__(self, context, feature_id):
self.grafana_loki_checker = GrafanaLokiChecker()
self.minifi_controller_executor = MinifiControllerExecutor(self.container_communicator)
self.modbus_checker = ModbusChecker(self.container_communicator)
self.couchbase_checker = CouchbaseChecker()

def cleanup(self):
self.container_store.cleanup()
Expand Down Expand Up @@ -424,3 +426,6 @@ def set_value_on_plc_with_modbus(self, container_name, modbus_cmd):

def enable_ssl_in_nifi(self):
self.container_store.enable_ssl_in_nifi()

def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
return self.couchbase_checker.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type)
69 changes: 69 additions & 0 deletions docker/test/integration/cluster/checkers/CouchbaseChecker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import json
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.transcoder import RawBinaryTranscoder, RawStringTranscoder


class CouchbaseChecker:
def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
try:
cluster = Cluster('couchbase://localhost', ClusterOptions(
PasswordAuthenticator('Administrator', 'password123')))

bucket = cluster.bucket(bucket_name)
collection = bucket.default_collection()

if expected_data_type.lower() == "binary":
binary_flag = 0x03 << 24
result = collection.get(doc_id, transcoder=RawBinaryTranscoder())
flags = result.flags
if not flags & binary_flag:
logging.error(f"Expected binary data for document '{doc_id}' but no binary flags were found.")
return False

content = result.content_as[bytes]
return content.decode('utf-8') == expected_data

if expected_data_type.lower() == "json":
json_flag = 0x02 << 24
result = collection.get(doc_id)
flags = result.flags
if not flags & json_flag:
logging.error(f"Expected JSON data for document '{doc_id}' but no JSON flags were found.")
return False

content = result.content_as[dict]
return content == json.loads(expected_data)

if expected_data_type.lower() == "string":
string_flag = 0x04 << 24
result = collection.get(doc_id, transcoder=RawStringTranscoder())
flags = result.flags
if not flags & string_flag:
logging.error(f"Expected string data for document '{doc_id}' but no string flags were found.")
return False

content = result.content_as[str]
return content == expected_data

logging.error(f"Unsupported data type '{expected_data_type}'")
return False
except Exception as e:
logging.error(f"Error while fetching document '{doc_id}' from bucket '{bucket_name}': {e}")
return False
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ def deploy(self):
detach=True,
name=self.name,
network=self.network.name,
ports={'11210/tcp': 11210},
entrypoint=self.command)
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,6 @@ def set_value_on_plc_with_modbus(self, container_name, modbus_cmd):

def enable_ssl_in_nifi(self):
self.cluster.enable_ssl_in_nifi()

def check_is_data_present_on_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
assert self.cluster.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type)
28 changes: 27 additions & 1 deletion docker/test/integration/features/couchbase.feature
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ Feature: Executing Couchbase operations from MiNiFi-C++
Background:
Given the content of "/tmp/output" is monitored

Scenario: A MiNiFi instance can insert data to test bucket with PutCouchbaseKey processor
Scenario: A MiNiFi instance can insert json data to test bucket with PutCouchbaseKey processor
Given a Couchbase server is set up
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
And the "Document Type" property of the PutCouchbaseKey processor is set to "Json"
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And a CouchbaseClusterService is setup up for PutCouchbaseKey with the name "CouchbaseClusterService"

Expand All @@ -39,3 +40,28 @@ Feature: Executing Couchbase operations from MiNiFi-C++
And the Minifi logs match the following regex: "key:couchbase.doc.sequence.number value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.partition.uuid value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.partition.id value:[1-9][0-9]*" in less than 1 seconds
And a document with id "test_doc_id" in bucket "test_bucket" is present with data '{"field1": "value1", "field2": "value2"}' of type "Json" in Couchbase

Scenario: A MiNiFi instance can insert binary data to test bucket with PutCouchbaseKey processor
Given a Couchbase server is set up
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content '{"field1": "value1"}' is present in '/tmp/input'
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
And the "Document Type" property of the PutCouchbaseKey processor is set to "Binary"
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And a CouchbaseClusterService is setup up for PutCouchbaseKey with the name "CouchbaseClusterService"

And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
And the "success" relationship of the PutCouchbaseKey processor is connected to the LogAttribute

When a Couchbase server is started
And all instances start up

Then the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 60 seconds
And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.sequence.number value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.partition.uuid value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.partition.id value:[1-9][0-9]*" in less than 1 seconds
And a document with id "test_doc_id" in bucket "test_bucket" is present with data '{"field1": "value1"}' of type "Binary" in Couchbase
5 changes: 5 additions & 0 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1377,3 +1377,8 @@ def step_impl(context, processor_name, service_name):
processor = context.test.get_node_by_name(processor_name)
processor.controller_services.append(couchbase_cluster_controller_service)
processor.set_property("Couchbase Cluster Controller Service", couchbase_cluster_controller_service.name)


@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase")
def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str):
context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, data_type)
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/

#include "CouchbaseClusterService.h"
#include "couchbase/codec/raw_binary_transcoder.hxx"
#include "couchbase/codec/raw_string_transcoder.hxx"
#include "couchbase/codec/raw_json_transcoder.hxx"

#include "core/Resource.h"

Expand All @@ -39,13 +42,22 @@ nonstd::expected<::couchbase::collection, CouchbaseErrorType> CouchbaseClient::g
}

nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> CouchbaseClient::upsert(const CouchbaseCollection& collection,
const std::string& document_id, const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) {
CouchbaseValueType document_type, const std::string& document_id, const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) {
auto collection_result = getCollection(collection);
if (!collection_result.has_value()) {
return nonstd::make_unexpected(collection_result.error());
}

auto [upsert_err, upsert_resp] = collection_result->upsert<::couchbase::codec::raw_binary_transcoder>(document_id, buffer, options).get();
std::pair<::couchbase::error, ::couchbase::mutation_result> result;
if (document_type == CouchbaseValueType::Json) {
result = collection_result->upsert<::couchbase::codec::raw_json_transcoder>(document_id, buffer, options).get();
} else if (document_type == CouchbaseValueType::String) {
std::string data_str(reinterpret_cast<const char*>(buffer.data()), buffer.size());
result = collection_result->upsert<::couchbase::codec::raw_string_transcoder>(document_id, data_str, options).get();
} else {
result = collection_result->upsert<::couchbase::codec::raw_binary_transcoder>(document_id, buffer, options).get();
}
auto& [upsert_err, upsert_resp] = result;
if (upsert_err.ec()) {
// ambiguous_timeout should not be retried as we do not know if the insert was successful or not
if (getErrorType(upsert_err.ec()) == CouchbaseErrorType::TEMPORARY && upsert_err.ec().value() != static_cast<int>(::couchbase::errc::common::ambiguous_timeout)) {
Expand Down
16 changes: 10 additions & 6 deletions extensions/couchbase/controllerservices/CouchbaseClusterService.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
#include "couchbase/cluster.hxx"
#include "core/ProcessContext.h"
#include "core/logging/LoggerConfiguration.h"
#include "couchbase/codec/raw_binary_transcoder.hxx"
#include "couchbase/error.hxx"

namespace org::apache::nifi::minifi::couchbase {

Expand All @@ -48,6 +46,12 @@ struct CouchbaseUpsertResult {
std::uint16_t partition_id{0};
};

enum class CouchbaseValueType {
Json,
Binary,
String
};

enum class CouchbaseErrorType {
FATAL,
TEMPORARY,
Expand All @@ -59,8 +63,8 @@ class CouchbaseClient {
: connection_string_(std::move(connection_string)), username_(std::move(username)), password_(std::move(password)), logger_(logger) {
}

nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const CouchbaseCollection& collection, const std::string& document_id, const std::vector<std::byte>& buffer,
const ::couchbase::upsert_options& options);
nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const CouchbaseCollection& collection, CouchbaseValueType document_type, const std::string& document_id,
const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options);
std::optional<CouchbaseErrorType> establishConnection();
void close();

Expand Down Expand Up @@ -142,10 +146,10 @@ class CouchbaseClusterService : public core::controller::ControllerService {
}
}

virtual nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const CouchbaseCollection& collection,
virtual nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const CouchbaseCollection& collection, CouchbaseValueType document_type,
const std::string& document_id, const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) {
gsl_Expects(client_);
return client_->upsert(collection, document_id, buffer, options);
return client_->upsert(collection, document_type, document_id, buffer, options);
}

static gsl::not_null<std::shared_ptr<CouchbaseClusterService>> getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property);
Expand Down
3 changes: 2 additions & 1 deletion extensions/couchbase/processors/PutCouchbaseKey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace org::apache::nifi::minifi::couchbase::processors {

void PutCouchbaseKey::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
couchbase_cluster_service_ = controllers::CouchbaseClusterService::getFromProperty(context, PutCouchbaseKey::CouchbaseClusterControllerService);
document_type_ = utils::parseEnumProperty<CouchbaseValueType>(context, PutCouchbaseKey::DocumentType);
persist_to_ = utils::parseEnumProperty<::couchbase::persist_to>(context, PutCouchbaseKey::PersistTo);
replicate_to_ = utils::parseEnumProperty<::couchbase::replicate_to>(context, PutCouchbaseKey::ReplicateTo);
}
Expand Down Expand Up @@ -61,7 +62,7 @@ void PutCouchbaseKey::onTrigger(core::ProcessContext& context, core::ProcessSess
::couchbase::upsert_options options;
options.durability(persist_to_, replicate_to_);
auto result = session.readBuffer(flow_file);
if (auto upsert_result = couchbase_cluster_service_->upsert(collection, document_id, result.buffer, options)) {
if (auto upsert_result = couchbase_cluster_service_->upsert(collection, document_type_, document_id, result.buffer, options)) {
session.putAttribute(*flow_file, "couchbase.bucket", upsert_result->bucket_name);
session.putAttribute(*flow_file, "couchbase.doc.id", document_id);
session.putAttribute(*flow_file, "couchbase.doc.cas", std::to_string(upsert_result->cas));
Expand Down
8 changes: 8 additions & 0 deletions extensions/couchbase/processors/PutCouchbaseKey.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ class PutCouchbaseKey final : public core::AbstractProcessor<PutCouchbaseKey> {
.withDescription("Collection to use inside the bucket scope. If not specified, the _default collection is used.")
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto DocumentType = core::PropertyDefinitionBuilder<3>::createProperty("Document Type")
.withDescription("Content type to store data as.")
.isRequired(true)
.withDefaultValue(magic_enum::enum_name(CouchbaseValueType::Json))
.withAllowedValues(magic_enum::enum_names<CouchbaseValueType>())
.build();
EXTENSIONAPI static constexpr auto DocumentId = core::PropertyDefinitionBuilder<>::createProperty("Document Id")
.withDescription("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id. If not specified, the FlowFile UUID will be used.")
.supportsExpressionLanguage(true)
Expand All @@ -115,6 +121,7 @@ class PutCouchbaseKey final : public core::AbstractProcessor<PutCouchbaseKey> {
BucketName,
ScopeName,
CollectionName,
DocumentType,
DocumentId,
PersistTo,
ReplicateTo
Expand Down Expand Up @@ -145,6 +152,7 @@ class PutCouchbaseKey final : public core::AbstractProcessor<PutCouchbaseKey> {
private:
std::shared_ptr<controllers::CouchbaseClusterService> couchbase_cluster_service_;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutCouchbaseKey>::getLogger(uuid_);
CouchbaseValueType document_type_ = CouchbaseValueType::Json;
::couchbase::persist_to persist_to_ = ::couchbase::persist_to::none;
::couchbase::replicate_to replicate_to_ = ::couchbase::replicate_to::none;
};
Expand Down
6 changes: 4 additions & 2 deletions extensions/couchbase/tests/MockCouchbaseClusterService.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const std::uint64_t COUCHBASE_PUT_RESULT_PARTITION_UUID = 7890123456;
const std::uint16_t COUCHBASE_PUT_RESULT_PARTITION_ID = 1234;

struct UpsertParameters {
CouchbaseValueType document_type;
std::string document_id;
std::vector<std::byte> buffer;
::couchbase::upsert_options options;
Expand All @@ -45,9 +46,10 @@ class MockCouchbaseClusterService : public controllers::CouchbaseClusterService
void onEnable() override {}
void notifyStop() override {}

nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const CouchbaseCollection& collection, const std::string& document_id, const std::vector<std::byte>& buffer,
const ::couchbase::upsert_options& options) override {
nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const CouchbaseCollection& collection, CouchbaseValueType document_type, const std::string& document_id,
const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) override {
collection_ = collection;
upsert_parameters_.document_type = document_type;
upsert_parameters_.document_id = document_id;
upsert_parameters_.buffer = buffer;
upsert_parameters_.options = options;
Expand Down
Loading

0 comments on commit 25a84f8

Please sign in to comment.