From db9a4c9225833344b1425595d761cbfeb93824e3 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Wed, 15 May 2024 12:53:39 -0400 Subject: [PATCH] NIFI-10752: Add Couchbase 3.x components --- nifi-assembly/pom.xml | 24 + .../nifi-couchbase-nar/pom.xml | 41 ++ .../src/main/resources/META-INF/NOTICE | 30 + .../nifi-couchbase-processors/pom.xml | 71 ++ .../AbstractCouchbaseLookupService.java | 79 +++ .../couchbase/CouchbaseClusterService.java | 204 ++++++ .../CouchbaseKeyValueLookupService.java | 89 +++ .../couchbase/CouchbaseMapCacheClient.java | 201 ++++++ .../CouchbaseRecordLookupService.java | 146 +++++ .../apache/nifi/couchbase/CouchbaseUtils.java | 61 ++ .../couchbase/AbstractCouchbaseProcessor.java | 197 ++++++ .../couchbase/CouchbaseAttributes.java | 67 ++ .../couchbase/CouchbaseExceptionMappings.java | 118 ++++ .../couchbase/ErrorHandlingStrategy.java | 81 +++ .../processors/couchbase/GetCouchbaseKey.java | 243 +++++++ .../processors/couchbase/PutCouchbaseKey.java | 174 +++++ ...g.apache.nifi.controller.ControllerService | 18 + .../org.apache.nifi.processor.Processor | 16 + .../additionalDetails.html | 35 + .../TestCouchbaseClusterService.java | 64 ++ .../TestCouchbaseMapCacheClient.java | 77 +++ .../nifi/couchbase/TestCouchbaseUtils.java | 85 +++ .../couchbase/TestGetCouchbaseKey.java | 618 ++++++++++++++++++ .../couchbase/TestPutCouchbaseKey.java | 365 +++++++++++ .../nifi-couchbase-services-api-nar/pom.xml | 41 ++ .../src/main/resources/META-INF/NOTICE | 21 + .../nifi-couchbase-services-api/pom.xml | 38 ++ .../CouchbaseClusterControllerService.java | 36 + .../CouchbaseConfigurationProperties.java | 69 ++ .../apache/nifi/couchbase/DocumentType.java | 36 + .../nifi-couchbase-bundle/pom.xml | 54 ++ nifi-extension-bundles/pom.xml | 1 + 32 files changed, 3400 insertions(+) create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/AbstractCouchbaseLookupService.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseKeyValueLookupService.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseRecordLookupService.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseUtils.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseAttributes.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseClusterService.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseMapCacheClient.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseUtils.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/pom.xml create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/pom.xml create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseConfigurationProperties.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/DocumentType.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/pom.xml diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index e06bf20559948..b70083c93f769 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -1072,6 +1072,30 @@ language governing permissions and limitations under the License. --> + + include-couchbase + + + false + + allProfiles + + + + + org.apache.nifi + nifi-couchbase-services-api-nar + 2.0.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-couchbase-nar + 2.0.0-SNAPSHOT + nar + + + headless diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml new file mode 100644 index 0000000000000..5401cf072f88c --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml @@ -0,0 +1,41 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-couchbase-bundle + 2.0.0-SNAPSHOT + + + nifi-couchbase-nar + nar + + + + org.apache.nifi + nifi-couchbase-services-api-nar + 2.0.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-couchbase-processors + 2.0.0-SNAPSHOT + + + diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/src/main/resources/META-INF/NOTICE b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000000..c23e7bccab020 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,30 @@ +nifi-couchbase-nar +Copyright 2014-2024 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Couchbase Java SDK + The following NOTICE information applies: + Couchbase Java SDK + Copyright 2014 Couchbase, Inc. + + (ASLv2) RxJava + The following NOTICE information applies: + Couchbase Java SDK + Copyright 2012 Netflix, Inc. + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2017 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml new file mode 100644 index 0000000000000..981f8ff00d60f --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml @@ -0,0 +1,71 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-couchbase-bundle + 2.0.0-SNAPSHOT + + + nifi-couchbase-processors + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + 2.0.0-SNAPSHOT + + + org.apache.nifi + nifi-couchbase-services-api + 2.0.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-distributed-cache-client-service-api + + + org.apache.commons + commons-lang3 + + + org.apache.nifi + nifi-lookup-service-api + + + org.apache.nifi + nifi-record + + + org.apache.nifi + nifi-record-serialization-service-api + + + org.apache.nifi + nifi-mock + 2.0.0-SNAPSHOT + test + + + diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/AbstractCouchbaseLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/AbstractCouchbaseLookupService.java new file mode 100644 index 0000000000000..0190bd89c1a45 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/AbstractCouchbaseLookupService.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.reporting.InitializationException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COLLECTION_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; + +public class AbstractCouchbaseLookupService extends AbstractControllerService { + + protected static final String KEY = "key"; + protected static final Set REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet())); + + protected List properties; + protected volatile CouchbaseClusterControllerService couchbaseClusterService; + protected volatile String bucketName; + protected volatile String collectionName; + + @Override + protected void init(final ControllerServiceInitializationContext context) throws InitializationException { + final List properties = new ArrayList<>(); + properties.add(COUCHBASE_CLUSTER_SERVICE); + properties.add(BUCKET_NAME); + properties.add(COLLECTION_NAME); + addProperties(properties); + this.properties = Collections.unmodifiableList(properties); + } + + protected void addProperties(List properties) { + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + + couchbaseClusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE) + .asControllerService(CouchbaseClusterControllerService.class); + bucketName = context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue(); + collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue(); + } + + public Set getRequiredKeys() { + return REQUIRED_KEYS; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java new file mode 100644 index 0000000000000..45496c7799c10 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.couchbase.client.core.error.CouchbaseException; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.DeprecationNotice; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; + +/** + * Provides a centralized Couchbase connection and bucket passwords management. + */ +@CapabilityDescription("Provides a centralized Couchbase connection and bucket passwords management." + + " Bucket passwords can be specified via dynamic properties.") +@Tags({ "nosql", "couchbase", "database", "connection" }) +@DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket password", + description = "Specify bucket password if necessary." + + " Couchbase Server 5.0 or later should use 'User Name' and 'User Password' instead.") +@DeprecationNotice(reason = "This component is deprecated and will be removed in NiFi 2.x.") +public class CouchbaseClusterService extends AbstractControllerService implements CouchbaseClusterControllerService { + + public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor + .Builder() + .name("Connection String") + .description("The hostnames or ip addresses of the bootstraping nodes and optional parameters." + + " Syntax) couchbase://node1,node2,nodeN?param1=value1¶m2=value2¶mN=valueN") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor USER_NAME = new PropertyDescriptor + .Builder() + .name("user-name") + .displayName("User Name") + .description("The user name to authenticate NiFi as a Couchbase client." + + " This configuration can be used against Couchbase Server 5.0 or later" + + " supporting Roll-Based Access Control.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor USER_PASSWORD = new PropertyDescriptor + .Builder() + .name("user-password") + .displayName("User Password") + .description("The user password to authenticate NiFi as a Couchbase client." + + " This configuration can be used against Couchbase Server 5.0 or later" + + " supporting Roll-Based Access Control.") + .required(false) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final List properties; + + static { + final List props = new ArrayList<>(); + props.add(CONNECTION_STRING); + props.add(USER_NAME); + props.add(USER_PASSWORD); + + properties = Collections.unmodifiableList(props); + } + + private static final String DYNAMIC_PROP_BUCKET_PASSWORD = "Bucket Password for "; + + private Map bucketPasswords; + private volatile Cluster cluster; + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor( + String propertyDescriptorName) { + if (propertyDescriptorName.startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)) { + return new PropertyDescriptor + .Builder().name(propertyDescriptorName) + .description("Bucket password.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + } + return null; + } + + @Override + protected Collection customValidate(ValidationContext context) { + final Collection results = new ArrayList<>(); + + final boolean isUserNameSet = context.getProperty(USER_NAME).isSet(); + final boolean isUserPasswordSet = context.getProperty(USER_PASSWORD).isSet(); + if ((isUserNameSet && !isUserPasswordSet) || (!isUserNameSet && isUserPasswordSet)) { + results.add(new ValidationResult.Builder() + .subject("User Name and Password") + .explanation("Both User Name and Password are required to use.") + .build()); + } + + final boolean isBucketPasswordSet = context.getProperties().keySet().stream() + .anyMatch(p -> p.isDynamic() && p.getName().startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)); + + if (isUserNameSet && isUserPasswordSet && isBucketPasswordSet) { + results.add(new ValidationResult.Builder() + .subject("Authentication methods") + .explanation("Different authentication methods can not be used at the same time," + + " Use either one of User Name and Password, or Bucket Password.") + .build()); + } + + return results; + } + + /** + * Establish a connection to a Couchbase cluster. + * @param context the configuration context + * @throws InitializationException if unable to connect a Couchbase cluster + */ + @OnEnabled + public void onConfigured(final ConfigurationContext context) throws InitializationException { + + bucketPasswords = new HashMap<>(); + for(PropertyDescriptor p : context.getProperties().keySet()){ + if(p.isDynamic() && p.getName().startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){ + String bucketName = p.getName().substring(DYNAMIC_PROP_BUCKET_PASSWORD.length()); + String password = context.getProperty(p).evaluateAttributeExpressions().getValue(); + bucketPasswords.put(bucketName, password); + } + } + + final String userName = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue(); + final String userPassword = context.getProperty(USER_PASSWORD).evaluateAttributeExpressions().getValue(); + + try { + cluster = Cluster.connect(context.getProperty(CONNECTION_STRING).evaluateAttributeExpressions().getValue(), + userName, userPassword); + } catch (CouchbaseException e) { + throw new InitializationException(e); + } + } + + @Override + public Bucket openBucket(String bucketName) { + if (bucketPasswords.containsKey(bucketName)) { + return cluster.bucket(bucketName);//, bucketPasswords.get(bucketName)); + } + + return cluster.bucket(bucketName); + } + + /** + * Disconnect from the Couchbase cluster. + */ + @OnDisabled + public void shutdown() { + if(cluster != null){ + cluster.disconnect(); + cluster = null; + } + } + +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseKeyValueLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseKeyValueLookupService.java new file mode 100644 index 0000000000000..c429db40873cc --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseKeyValueLookupService.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import com.couchbase.client.core.error.CouchbaseException; +import com.couchbase.client.core.error.DocumentNotFoundException; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.kv.LookupInSpec; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.DeprecationNotice; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.StringLookupService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.StringUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.LOOKUP_SUB_DOC_PATH; + +@Tags({"lookup", "enrich", "key", "value", "couchbase"}) +@CapabilityDescription("Lookup a string value from Couchbase Server associated with the specified key." + + " The coordinates that are passed to the lookup must contain the key 'key'.") +@DeprecationNotice(reason = "This component is deprecated and will be removed in NiFi 2.x.") +public class CouchbaseKeyValueLookupService extends AbstractCouchbaseLookupService implements StringLookupService { + + private volatile String subDocPath; + + @Override + protected void addProperties(List properties) { + properties.add(LOOKUP_SUB_DOC_PATH); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + super.onEnabled(context); + subDocPath = context.getProperty(LOOKUP_SUB_DOC_PATH).evaluateAttributeExpressions().getValue(); + } + + @Override + public Optional lookup(Map coordinates) throws LookupFailureException { + + try { + final Bucket bucket = couchbaseClusterService.openBucket(bucketName); + final Collection collection = bucket.collection(collectionName); + final Optional docId = Optional.ofNullable(coordinates.get(KEY)).map(Object::toString); + + if (!StringUtils.isBlank(subDocPath)) { + return docId.map(key -> { + try { + return collection.lookupIn(key, Collections.singletonList(LookupInSpec.get(subDocPath))); + } catch (DocumentNotFoundException e) { + getLogger().debug("Document was not found for {}", new Object[]{key}); + return null; + } + }).map(fragment -> fragment.contentAsObject(0)).map(Object::toString); + + } else { + return docId.map(key -> CouchbaseUtils.getStringContent(collection, key)); + } + } catch (CouchbaseException e) { + throw new LookupFailureException("Failed to lookup from Couchbase using this coordinates: " + coordinates); + } + + + } + +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java new file mode 100644 index 0000000000000..b890c0aceb175 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import com.couchbase.client.core.error.CasMismatchException; +import com.couchbase.client.core.error.DocumentExistsException; +import com.couchbase.client.core.error.DocumentNotFoundException; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.codec.RawBinaryTranscoder; +import com.couchbase.client.java.kv.GetOptions; +import com.couchbase.client.java.kv.GetResult; +import com.couchbase.client.java.kv.InsertOptions; +import com.couchbase.client.java.kv.ReplaceOptions; +import com.couchbase.client.java.kv.UpsertOptions; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COLLECTION_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; + +@Tags({"distributed", "cache", "map", "cluster", "couchbase"}) +@CapabilityDescription("Provides the ability to communicate with a Couchbase Server cluster as a DistributedMapCacheServer." + + " This can be used in order to share a Map between nodes in a NiFi cluster." + + " Couchbase Server cluster can provide a high available and persistent cache storage.") +public class CouchbaseMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient { + + private Collection collection; + + @Override + protected List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(COUCHBASE_CLUSTER_SERVICE); + descriptors.add(BUCKET_NAME); + descriptors.add(COLLECTION_NAME); + return descriptors; + } + + @OnEnabled + public void configure(final ConfigurationContext context) { + CouchbaseClusterControllerService clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE).asControllerService(CouchbaseClusterControllerService.class); + final String bucketName = context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue(); + Bucket bucket = clusterService.openBucket(bucketName); + final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue(); + collection = bucket.collection(collectionName); + } + + private byte[] toDocument(V value, Serializer valueSerializer) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + valueSerializer.serialize(value, bos); + return bos.toByteArray(); + } + + private String toDocumentId(K key, Serializer keySerializer) throws IOException { + final String docId; + if (key instanceof String) { + docId = (String) key; + } else { + // Coerce conversion from byte[] to String, this may generate unreadable String or exceed max key size. + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + keySerializer.serialize(key, bos); + final byte[] keyBytes = bos.toByteArray(); + docId = new String(keyBytes); + } + return docId; + } + + @Override + public boolean putIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException { + final String docId = toDocumentId(key, keySerializer); + final byte[] doc = toDocument(value, valueSerializer); + try { + collection.insert(docId, doc, InsertOptions.insertOptions().transcoder(RawBinaryTranscoder.INSTANCE)); + return true; + } catch (DocumentExistsException e) { + return false; + } + } + + @Override + public AtomicCacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + final String docId = toDocumentId(key, keySerializer); + final GetResult doc = collection.get(docId, GetOptions.getOptions().transcoder(RawBinaryTranscoder.INSTANCE)); + if (doc == null) { + return null; + } + final V value = deserialize(doc, valueDeserializer); + return new AtomicCacheEntry<>(key, value, doc.cas()); + } + + @Override + public V getAndPutIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer, Deserializer valueDeserializer) throws IOException { + final V existing = get(key, keySerializer, valueDeserializer); + if (existing != null) { + return existing; + } + + // If there's no existing value, put this value. + if (!putIfAbsent(key, value, keySerializer, valueSerializer)) { + // If putting this value failed, it's possible that other client has put different doc, so return that. + return get(key, keySerializer, valueDeserializer); + } + + // If successfully put this value, return this. + return value; + } + + @Override + public boolean replace(AtomicCacheEntry entry, Serializer keySerializer, Serializer valueSerializer) throws IOException { + final long revision = entry.getRevision().orElse(-1L); + final String docId = toDocumentId(entry.getKey(), keySerializer); + final Object doc = toDocument(entry.getValue(), valueSerializer); + try { + if (revision < 0) { + // If the document does not exist yet, try to create one. + try { + collection.insert(docId, doc, InsertOptions.insertOptions().transcoder(RawBinaryTranscoder.INSTANCE)); + return true; + } catch (DocumentExistsException e) { + return false; + } + } + collection.replace(docId, doc, ReplaceOptions.replaceOptions().transcoder(RawBinaryTranscoder.INSTANCE)); + return true; + } catch (DocumentNotFoundException | CasMismatchException e) { + return false; + } + } + + @Override + public boolean containsKey(K key, Serializer keySerializer) throws IOException { + return collection.exists(toDocumentId(key, keySerializer)).exists(); + } + + @Override + public void put(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException { + final String docId = toDocumentId(key, keySerializer); + final byte[] doc = toDocument(value, valueSerializer); + collection.upsert(docId, doc, UpsertOptions.upsertOptions() + .transcoder(RawBinaryTranscoder.INSTANCE) + .clientContext(new HashMap<>())); + } + + @Override + public V get(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + final String docId = toDocumentId(key, keySerializer); + final GetResult doc = collection.get(docId, GetOptions.getOptions().transcoder(RawBinaryTranscoder.INSTANCE)); + return deserialize(doc, valueDeserializer); + } + + private V deserialize(GetResult doc, Deserializer valueDeserializer) throws IOException { + if (doc == null) { + return null; + } + final byte[] bytes = doc.contentAsBytes(); + return valueDeserializer.deserialize(bytes); + } + + @Override + public void close() throws IOException { + } + + @Override + public boolean remove(K key, Serializer serializer) throws IOException { + try { + collection.remove(toDocumentId(key, serializer)); + return true; + } catch (DocumentNotFoundException e) { + return false; + } + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseRecordLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseRecordLookupService.java new file mode 100644 index 0000000000000..829d778f1dbee --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseRecordLookupService.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf; +import com.couchbase.client.core.deps.io.netty.buffer.ByteBufInputStream; +import com.couchbase.client.core.error.CouchbaseException; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.codec.DefaultJsonSerializer; +import com.couchbase.client.java.codec.JsonTranscoder; +import com.couchbase.client.java.codec.RawBinaryTranscoder; +import com.couchbase.client.java.kv.GetOptions; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.DeprecationNotice; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.RecordLookupService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.Tuple; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.DOCUMENT_TYPE; + +@Tags({"lookup", "enrich", "couchbase"}) +@CapabilityDescription("Lookup a record from Couchbase Server associated with the specified key." + + " The coordinates that are passed to the lookup must contain the key 'key'.") +@DeprecationNotice(reason = "This component is deprecated and will be removed in NiFi 2.x.") +public class CouchbaseRecordLookupService extends AbstractCouchbaseLookupService implements RecordLookupService { + + private volatile RecordReaderFactory readerFactory; + private volatile DocumentType documentType; + + private static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("The Record Reader to use for parsing fetched document from Couchbase Server.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + @Override + protected void addProperties(List properties) { + properties.add(DOCUMENT_TYPE); + properties.add(RECORD_READER); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + super.onEnabled(context); + readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + } + + @Override + public Optional lookup(Map coordinates) throws LookupFailureException { + + final Bucket bucket = couchbaseClusterService.openBucket(bucketName); + final Collection collection = bucket.collection(collectionName); + final Optional docId = Optional.ofNullable(coordinates.get(KEY)).map(Object::toString); + + final Optional inputStream; + try { + switch (documentType) { + + case Binary: + inputStream = docId + .map(key -> collection.get(docId.get(), GetOptions.getOptions().transcoder(RawBinaryTranscoder.INSTANCE))) + .map(doc -> new ByteBufInputStream(doc.contentAs(ByteBuf.class))); + break; + + case Json: + inputStream= docId + .map(key -> collection.get(docId.get(), GetOptions.getOptions() + .transcoder(JsonTranscoder.create(DefaultJsonSerializer.create())))) + .map(doc -> new ByteArrayInputStream(doc.contentAsBytes())); + break; + + default: + return Optional.empty(); + } + } catch (CouchbaseException e) { + throw new LookupFailureException("Failed to lookup from Couchbase using this coordinates: " + coordinates); + } + + final Optional> errOrReader = inputStream.map(in -> { + try { + // Pass coordinates to initiate RecordReader, so that the reader can resolve schema dynamically. + // This allow using the same RecordReader service with different schemas if RecordReader is configured to + // access schema based on Expression Language. + final Map recordReaderVariables = new HashMap<>(coordinates.size()); + coordinates.keySet().forEach(k -> { + final Object value = coordinates.get(k); + if (value != null) { + recordReaderVariables.put(k, value.toString()); + } + }); + return new Tuple<>(null, readerFactory.createRecordReader(recordReaderVariables, in, -1, getLogger())); + } catch (Exception e) { + return new Tuple<>(e, null); + } + }); + + if (!errOrReader.isPresent()) { + return Optional.empty(); + } + + final Exception exception = errOrReader.get().getKey(); + if (exception != null) { + throw new LookupFailureException(String.format("Failed to lookup with %s", coordinates), exception); + } + + try { + return Optional.ofNullable(errOrReader.get().getValue().nextRecord()); + } catch (Exception e) { + throw new LookupFailureException(String.format("Failed to read Record when looking up with %s", coordinates), e); + } + } + +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseUtils.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseUtils.java new file mode 100644 index 0000000000000..a8c7e49e84409 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.kv.GetResult; + +import java.nio.charset.StandardCharsets; + +public class CouchbaseUtils { + + /** + * A convenient method to retrieve String value when Document type is unknown. + * This method uses LegacyDocument to get, then tries to convert content based on its class. + * @param collection the collection from which to get a document + * @param id the id of the target document + * @return String representation of the stored value, or null if not found + */ + public static String getStringContent(Collection collection, String id) { + final GetResult doc = collection.get(id); + if (doc == null) { + return null; + } + final Object content = doc.contentAsObject(); + return getStringContent(content); + } + + public static String getStringContent(Object content) { + if (content == null) { + return null; + } + if (content instanceof String) { + return (String) content; + } else if (content instanceof byte[]) { + return new String((byte[]) content, StandardCharsets.UTF_8); + } else if (content instanceof ByteBuf) { + final ByteBuf byteBuf = (ByteBuf) content; + byte[] bytes = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(bytes); + byteBuf.release(); + return new String(bytes, StandardCharsets.UTF_8); + } + return content.toString(); + } + +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java new file mode 100644 index 0000000000000..b9dc45d5a0c9c --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.couchbase.client.core.error.CouchbaseException; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COLLECTION_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; + +/** + * Provides common functionality for Couchbase processors. + */ +public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { + + static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder() + .name("document-id") + .displayName("Document Id") + .description("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").build(); + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").build(); + static final Relationship REL_RETRY = new Relationship.Builder().name("retry").build(); + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").build(); + + private List descriptors; + + private Set relationships; + + private CouchbaseClusterControllerService clusterService; + + @Override + protected final void init(final ProcessorInitializationContext context) { + + final List descriptors = new ArrayList<>(); + descriptors.add(COUCHBASE_CLUSTER_SERVICE); + descriptors.add(BUCKET_NAME); + descriptors.add(COLLECTION_NAME); + addSupportedProperties(descriptors); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + addSupportedRelationships(relationships); + this.relationships = Collections.unmodifiableSet(relationships); + + } + + /** + * Add processor specific properties. + * + * @param descriptors add properties to this list + */ + protected void addSupportedProperties(List descriptors) { + } + + /** + * Add processor specific relationships. + * + * @param relationships add relationships to this list + */ + protected void addSupportedRelationships(Set relationships) { + } + + @Override + public final Set getRelationships() { + return filterRelationships(this.relationships); + } + + protected Set filterRelationships(Set rels) { + return rels; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + private CouchbaseClusterControllerService getClusterService(final ProcessContext context) { + synchronized (AbstractCouchbaseProcessor.class) { + if (clusterService == null) { + clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE) + .asControllerService(CouchbaseClusterControllerService.class); + } + } + + return clusterService; + } + + /** + * Open a collection connection using a CouchbaseClusterControllerService. + * + * @param context a process context + * @return a collection instance + */ + protected final Bucket openBucket(final ProcessContext context) { + return getClusterService(context).openBucket(context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue()); + } + + protected final Collection openCollection(final ProcessContext context, final Bucket bucket) { + return bucket.collection(context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue()); + } + + /** + * Generate a transit url. + * + * @param bucket the target collection + * @return a transit url based on the collection name and the CouchbaseClusterControllerService name + */ + protected String getTransitUrl(final Bucket bucket, final String docId) { + return "couchbase://" + bucket.name() + "/" + docId; + } + + /** + * Handles the thrown CouchbaseException accordingly. + * + * @param context a process context + * @param session a process session + * @param logger a logger + * @param inFile an input FlowFile + * @param e the thrown CouchbaseException + * @param errMsg a message to be logged + */ + protected void handleCouchbaseException(final ProcessContext context, final ProcessSession session, + final ComponentLog logger, FlowFile inFile, CouchbaseException e, + String errMsg) { + logger.error(errMsg, e); + if (inFile != null) { + ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e); + switch (strategy.penalty()) { + case Penalize: + if (logger.isDebugEnabled()) { + logger.debug("Penalized: {}", inFile); + } + inFile = session.penalize(inFile); + break; + case Yield: + if (logger.isDebugEnabled()) { + logger.debug("Yielded context: {}", inFile); + } + context.yield(); + break; + case None: + break; + } + + switch (strategy.result()) { + case ProcessException: + throw new ProcessException(errMsg, e); + case Failure: + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); + session.transfer(inFile, REL_FAILURE); + break; + case Retry: + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); + session.transfer(inFile, REL_RETRY); + break; + } + } + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseAttributes.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseAttributes.java new file mode 100644 index 0000000000000..e9e81134ef3e4 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseAttributes.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; + +/** + * Couchbase related attribute keys. + */ +public enum CouchbaseAttributes implements FlowFileAttributeKey { + + /** + * A reference to the related cluster. + */ + Cluster("couchbase.cluster"), + /** + * A related bucket name. + */ + Bucket("couchbase.bucket"), + /** + * A related collection name. + */ + Collection("couchbase.collection"), + /** + * The id of a related document. + */ + DocId("couchbase.doc.id"), + /** + * The CAS value of a related document. + */ + Cas("couchbase.doc.cas"), + /** + * The expiration of a related document. + */ + Expiry("couchbase.doc.expiry"), + /** + * The thrown CouchbaseException class. + */ + Exception("couchbase.exception"), + ; + + private final String key; + + private CouchbaseAttributes(final String key) { + this.key = key; + } + + @Override + public String key() { + return key; + } + +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java new file mode 100644 index 0000000000000..c5761486f75f0 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import com.couchbase.client.core.error.AuthenticationFailureException; +import com.couchbase.client.core.error.BucketNotFoundException; +import com.couchbase.client.core.error.ConfigException; +import com.couchbase.client.core.error.CouchbaseException; +import com.couchbase.client.core.error.DocumentMutationLostException; +import com.couchbase.client.core.error.DurabilityAmbiguousException; +import com.couchbase.client.core.error.DurabilityImpossibleException; +import com.couchbase.client.core.error.DurabilityLevelNotAvailableException; +import com.couchbase.client.core.error.DurableWriteInProgressException; +import com.couchbase.client.core.error.DurableWriteReCommitInProgressException; +import com.couchbase.client.core.error.InvalidRequestException; +import com.couchbase.client.core.error.ReplicaNotAvailableException; +import com.couchbase.client.core.error.RequestCanceledException; +import com.couchbase.client.core.error.SecurityException; +import com.couchbase.client.core.error.ServerOutOfMemoryException; +import com.couchbase.client.core.error.ServiceNotAvailableException; +import com.couchbase.client.core.error.TemporaryFailureException; +import com.couchbase.client.core.error.TimeoutException; +import com.couchbase.client.core.error.transaction.ConcurrentOperationsDetectedOnSameDocumentException; + +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.ConfigurationError; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.InvalidInput; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.TemporalClusterError; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.TemporalFlowFileError; + +import java.util.HashMap; +import java.util.Map; + +public class CouchbaseExceptionMappings { + + private static final Map, ErrorHandlingStrategy> mapping = new HashMap<>(); + + /* + * - Won't happen + * BucketAlreadyExistsException: never create a bucket + * CASMismatchException: cas-id and replace is not used yet + * DesignDocumentException: View is not used yet + * DocumentAlreadyExistsException: insert is not used yet + * DocumentDoesNotExistException: replace is not used yet + * FlushDisabledException: never call flush + * RepositoryMappingException: EntityDocument is not used + * TemporaryLockFailureException: we don't obtain locks + * ViewDoesNotExistException: View is not used yet + * NamedPreparedStatementException: N1QL is not used yet + * QueryExecutionException: N1QL is not used yet + */ + static { + /* + * ConfigurationError + */ + mapping.put(AuthenticationFailureException.class, ConfigurationError); + mapping.put(SecurityException.class, ConfigurationError); + mapping.put(BucketNotFoundException.class, ConfigurationError); + mapping.put(ConfigException.class, ConfigurationError); + // when Couchbase doesn't have enough replica + mapping.put(ReplicaNotAvailableException.class, ConfigurationError); + // when a particular Service(KV, View, Query, DCP) isn't running in a cluster + mapping.put(ServiceNotAvailableException.class, ConfigurationError); + + /* + * InvalidInput + */ + mapping.put(InvalidRequestException.class, InvalidInput); + + /* + * Temporal Cluster Error + */ + mapping.put(TimeoutException.class, TemporalClusterError); + mapping.put(ServerOutOfMemoryException.class, TemporalClusterError); + mapping.put(TemporaryFailureException.class, TemporalClusterError); + // occurs when a connection gets lost + mapping.put(RequestCanceledException.class, TemporalClusterError); + + /* + * Temporal FlowFile Error + */ + mapping.put(ConcurrentOperationsDetectedOnSameDocumentException.class, TemporalFlowFileError); + mapping.put(DocumentMutationLostException.class, TemporalFlowFileError); + mapping.put(DurabilityAmbiguousException.class, TemporalFlowFileError); + mapping.put(DurabilityImpossibleException.class, TemporalFlowFileError); + mapping.put(DurabilityLevelNotAvailableException.class, TemporalFlowFileError); + mapping.put(DurableWriteInProgressException.class, TemporalFlowFileError); + mapping.put(DurableWriteReCommitInProgressException.class, TemporalFlowFileError); + } + + /** + * Returns a registered error handling strategy. + * @param e the CouchbaseException + * @return a registered strategy, if it's not registered, then return Fatal + */ + public static ErrorHandlingStrategy getStrategy(CouchbaseException e){ + ErrorHandlingStrategy strategy = mapping.get(e.getClass()); + if (strategy == null) { + // Treat unknown Exception as Fatal. + return ErrorHandlingStrategy.Fatal; + } + return strategy; + } + +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java new file mode 100644 index 0000000000000..4c1dd20d4b861 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.None; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Penalize; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Yield; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.Failure; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.ProcessException; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.Retry; + + +public enum ErrorHandlingStrategy { + + /** + * Processor setting has to be fixed, in order to NOT call failing processor + * frequently, this it be yielded. + */ + ConfigurationError(ProcessException, Yield), + /** + * The input FlowFile will be sent to the failure relationship for further + * processing without penalizing. Basically, the FlowFile shouldn't be sent + * this processor again unless the issue has been solved. + */ + InvalidInput(Failure, None), + /** + * Couchbase cluster is in unhealthy state. Retrying maybe successful, + * but it should be yielded for a while. + */ + TemporalClusterError(Retry, Yield), + /** + * The FlowFile was not processed successfully due to some temporal error + * related to this specific FlowFile or document. Retrying maybe successful, + * but it should be penalized for a while. + */ + TemporalFlowFileError(Retry, Penalize), + /** + * The error can't be recovered without DataFlow Manager intervention. + */ + Fatal(Retry, Yield); + + private final Result result; + private final Penalty penalty; + ErrorHandlingStrategy(Result result, Penalty penalty){ + this.result = result; + this.penalty = penalty; + } + + public enum Result { + ProcessException, Failure, Retry + } + + /** + * Indicating yield or penalize the processing when transfer the input FlowFile. + */ + public enum Penalty { + Yield, Penalize, None + } + + public Result result(){ + return this.result; + } + + public Penalty penalty(){ + return this.penalty; + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java new file mode 100644 index 0000000000000..9dd7256c76ab8 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf; +import com.couchbase.client.core.error.CouchbaseException; +import com.couchbase.client.core.error.DocumentNotFoundException; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.codec.DefaultJsonSerializer; +import com.couchbase.client.java.codec.JsonTranscoder; +import com.couchbase.client.java.codec.RawBinaryTranscoder; +import com.couchbase.client.java.kv.GetOptions; +import com.couchbase.client.java.kv.GetResult; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.couchbase.CouchbaseUtils; +import org.apache.nifi.couchbase.DocumentType; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; + +import com.couchbase.client.java.Bucket; + +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.DOCUMENT_TYPE; + +@Tags({"nosql", "couchbase", "database", "get"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the property. " + + "NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of the entire " + + "FlowFile will be buffered in memory.") +@WritesAttributes({ + @WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was retrieved from."), + @WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was retrieved from."), + @WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."), + @WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."), + @WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."), + @WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.") +}) +@SystemResourceConsideration(resource = SystemResource.MEMORY) +public class GetCouchbaseKey extends AbstractCouchbaseProcessor { + + public static final PropertyDescriptor PUT_VALUE_TO_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("put-to-attribute") + .displayName("Put Value to Attribute") + .description("If set, the retrieved value will be put into an attribute of the FlowFile instead of a the content of the FlowFile." + + " The attribute key to put to is determined by evaluating value of this property.") + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + private volatile boolean putToAttribute = false; + + @Override + protected void addSupportedProperties(final List descriptors) { + descriptors.add(DOCUMENT_TYPE); + descriptors.add(DOC_ID); + descriptors.add(PUT_VALUE_TO_ATTRIBUTE); + } + + @Override + protected void addSupportedRelationships(final Set relationships) { + relationships.add(new Relationship.Builder().name(REL_ORIGINAL.getName()) + .description("The original input FlowFile is routed to this relationship" + + " when the value is retrieved from Couchbase Server and routed to 'success'.").build()); + relationships.add(new Relationship.Builder().name(REL_SUCCESS.getName()) + .description("Values retrieved from Couchbase Server are written as outgoing FlowFiles content" + + " or put into an attribute of the incoming FlowFile and routed to this relationship.").build()); + relationships.add(new Relationship.Builder().name(REL_RETRY.getName()) + .description("All FlowFiles failed to fetch from Couchbase Server but can be retried are routed to this relationship.").build()); + relationships.add(new Relationship.Builder().name(REL_FAILURE.getName()) + .description("All FlowFiles failed to fetch from Couchbase Server and not retry-able are routed to this relationship.").build()); + } + + @Override + protected Set filterRelationships(Set rels) { + // If destination is attribute, then success == original. + return rels.stream().filter(rel -> !REL_ORIGINAL.equals(rel) || !putToAttribute).collect(Collectors.toSet()); + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + if (PUT_VALUE_TO_ATTRIBUTE.equals(descriptor)) { + putToAttribute = !isEmpty(newValue); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile inFile = session.get(); + if (inFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + final ComponentLog logger = getLogger(); + String docId = null; + if (context.getProperty(DOC_ID).isSet()) { + docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue(); + } else { + final byte[] content = new byte[(int) inFile.getSize()]; + session.read(inFile, in -> StreamUtils.fillBuffer(in, content, true)); + docId = new String(content, StandardCharsets.UTF_8); + } + + if (isEmpty(docId)) { + throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile); + } + + String putTargetAttr = null; + if (context.getProperty(PUT_VALUE_TO_ATTRIBUTE).isSet()) { + putTargetAttr = context.getProperty(PUT_VALUE_TO_ATTRIBUTE).evaluateAttributeExpressions(inFile).getValue(); + if (isEmpty(putTargetAttr)) { + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), "InvalidPutTargetAttributeName"); + session.transfer(inFile, REL_FAILURE); + return; + } + } + + try { + final Bucket bucket = openBucket(context); + final Collection collection = openCollection(context, bucket); + final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + final GetResult result; + // A function to write a document into outgoing FlowFile content. + OutputStreamCallback outputStreamCallback = null; + final Map updatedAttrs = new HashMap<>(); + switch (documentType) { + case Json: + result = collection.get(docId, GetOptions.getOptions() + .transcoder(JsonTranscoder.create(DefaultJsonSerializer.create()))); + if (result != null) { + outputStreamCallback = out -> { + final byte[] content = result.contentAsBytes(); + if (content == null || content.length == 0) { + out.write("{}".getBytes()); + } else { + out.write(content); + } + updatedAttrs.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + }; + } + break; + + case Binary: + result = collection.get(docId, GetOptions.getOptions().transcoder(RawBinaryTranscoder.INSTANCE)); + if (result != null) { + outputStreamCallback = out -> { + // Write to OutputStream without copying any to heap. + final ByteBuf byteBuf = result.contentAs(ByteBuf.class); + byteBuf.getBytes(byteBuf.readerIndex(), out, byteBuf.readableBytes()); + byteBuf.release(); + }; + } + break; + + default: + result = null; + } + + if (result == null) { + logger.warn("Document {} was not found in {}; routing {} to failure", docId, getTransitUrl(bucket, docId), inFile); + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentNotFoundException.class.getName()); + session.transfer(inFile, REL_FAILURE); + return; + } + + FlowFile outFile; + if (putToAttribute) { + outFile = inFile; + switch (documentType) { + case Json: + updatedAttrs.put(putTargetAttr, CouchbaseUtils.getStringContent(result.contentAsObject())); + break; + case Binary: + updatedAttrs.put(putTargetAttr, CouchbaseUtils.getStringContent(result.contentAsBytes())); + break; + } + } else { + outFile = session.create(inFile); + outFile = session.write(outFile, outputStreamCallback); + session.transfer(inFile, REL_ORIGINAL); + } + + updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()); + updatedAttrs.put(CouchbaseAttributes.Bucket.key(), bucket.name()); + updatedAttrs.put(CouchbaseAttributes.Collection.key(), collection.name()); + updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId); + updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(result.cas())); + updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(result.expiryTime().orElse(null))); + outFile = session.putAllAttributes(outFile, updatedAttrs); + + final long fetchMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().fetch(outFile, getTransitUrl(bucket, docId), fetchMillis); + session.transfer(outFile, REL_SUCCESS); + + } catch (final CouchbaseException e) { + String errMsg = String.format("Getting document %s from Couchbase Server using %s failed due to %s", docId, inFile, e); + handleCouchbaseException(context, session, logger, inFile, e, errMsg); + } + } + + +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java new file mode 100644 index 0000000000000..f330421296c4e --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.couchbase.client.core.error.CouchbaseException; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.codec.DefaultJsonSerializer; +import com.couchbase.client.java.codec.JsonTranscoder; +import com.couchbase.client.java.codec.RawBinaryTranscoder; +import com.couchbase.client.java.codec.Transcoder; +import com.couchbase.client.java.json.JsonObject; +import com.couchbase.client.java.kv.MutationResult; +import com.couchbase.client.java.kv.PersistTo; +import com.couchbase.client.java.kv.ReplicateTo; +import com.couchbase.client.java.kv.UpsertOptions; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.couchbase.DocumentType; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.stream.io.StreamUtils; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.DOCUMENT_TYPE; + +@Tags({"nosql", "couchbase", "database", "put"}) +@CapabilityDescription("Put a document to Couchbase Server via Key/Value access.") +@InputRequirement(Requirement.INPUT_REQUIRED) +@ReadsAttributes({ + @ReadsAttribute(attribute = "uuid", description = "Used as a document id if 'Document Id' is not specified"), +}) +@WritesAttributes({ + @WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was stored."), + @WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was stored."), + @WritesAttribute(attribute = "couchbase.collection", description = "Collection where the document was stored."), + @WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."), + @WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."), + @WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."), + @WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.") +}) +@SystemResourceConsideration(resource = SystemResource.MEMORY) +public class PutCouchbaseKey extends AbstractCouchbaseProcessor { + + public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor.Builder() + .name("persist-to") + .displayName("Persist To") + .description("Durability constraint about disk persistence.") + .required(true) + .allowableValues(PersistTo.values()) + .defaultValue(PersistTo.NONE.toString()) + .build(); + + public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor.Builder() + .name("replicate-to") + .displayName("Replicate To") + .description("Durability constraint about replication.") + .required(true) + .allowableValues(ReplicateTo.values()) + .defaultValue(ReplicateTo.NONE.toString()) + .build(); + + @Override + protected void addSupportedProperties(List descriptors) { + descriptors.add(DOCUMENT_TYPE); + descriptors.add(DOC_ID); + descriptors.add(PERSIST_TO); + descriptors.add(REPLICATE_TO); + } + + @Override + protected void addSupportedRelationships(Set relationships) { + relationships.add(new Relationship.Builder().name(REL_SUCCESS.getName()) + .description("All FlowFiles that are written to Couchbase Server are routed to this relationship.").build()); + relationships.add(new Relationship.Builder().name(REL_RETRY.getName()) + .description("All FlowFiles failed to be written to Couchbase Server but can be retried are routed to this relationship.").build()); + relationships.add(new Relationship.Builder().name(REL_FAILURE.getName()) + .description("All FlowFiles failed to be written to Couchbase Server and not retry-able are routed to this relationship.").build()); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ComponentLog logger = getLogger(); + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final byte[] content = new byte[(int) flowFile.getSize()]; + session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true)); + + String docId = flowFile.getAttribute(CoreAttributes.UUID.key()); + if (context.getProperty(DOC_ID).isSet()) { + docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue(); + } + + try { + Object doc = null; + Transcoder transcoder = null; + final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + switch (documentType) { + case Json: { + doc = JsonObject.fromJson(new String(content, StandardCharsets.UTF_8)); + transcoder = JsonTranscoder.create(DefaultJsonSerializer.create()); + break; + } + case Binary: { + doc = content; + transcoder = RawBinaryTranscoder.INSTANCE; + break; + } + } + + final PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue()); + final ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue()); + final Bucket bucket = openBucket(context); + MutationResult result = openCollection(context, bucket).upsert(docId, doc, + UpsertOptions.upsertOptions() + .durability(persistTo, replicateTo) + .transcoder(transcoder) + .clientContext(new HashMap<>())); + + if (result == null) { + throw new CouchbaseException("UPSERT result was null"); + } + final Map updatedAttrs = new HashMap<>(); + updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()); + updatedAttrs.put(CouchbaseAttributes.Bucket.key(), bucket.name()); + updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId); + updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(result.cas())); + + flowFile = session.putAllAttributes(flowFile, updatedAttrs); + session.getProvenanceReporter().send(flowFile, getTransitUrl(bucket, docId)); + session.transfer(flowFile, REL_SUCCESS); + } catch (final CouchbaseException e) { + String errMsg = String.format("Writing document %s to Couchbase Server using %s failed due to %s", docId, flowFile, e); + handleCouchbaseException(context, session, logger, flowFile, e, errMsg); + } + } + +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000000..f33bf6ca0b304 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.couchbase.CouchbaseClusterService +org.apache.nifi.couchbase.CouchbaseMapCacheClient +org.apache.nifi.couchbase.CouchbaseKeyValueLookupService +org.apache.nifi.couchbase.CouchbaseRecordLookupService diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000000..1304435296aa3 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.couchbase.GetCouchbaseKey +org.apache.nifi.processors.couchbase.PutCouchbaseKey \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html new file mode 100644 index 0000000000000..b8999677daae5 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html @@ -0,0 +1,35 @@ + + + + + + CouchbaseMapCacheClient + + + + +

CouchbaseMapCacheClient

+ +

Requirements

+ +

Couchbase Server 4.0 or higher is required for some operation using N1QL

+ +Following cache operations require N1QL query, thus you need to deploy Couchbase Server 4.0 or higher for those operations. However, as of this writing (May 2017) there are only few processors using these operations. Most cache APIs are implemented using document id lookup and should work with older version of Couchbase Server. + +In order to make N1QL work correctly you need to create a Primary index or an index covering N1QL queries performed by CouchbaseMapCacheClient. Please refer Couchbase Server documentations for how to create those. + + + diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseClusterService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseClusterService.java new file mode 100644 index 0000000000000..ce03c7596d55f --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseClusterService.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertThrows; + + +public class TestCouchbaseClusterService { + + private static final String SERVICE_ID = "couchbaseClusterService"; + private TestRunner testRunner; + + public static class SampleProcessor extends AbstractProcessor { + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + } + } + + @BeforeEach + public void init() { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.CouchbaseClusterService", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.TestCouchbaseClusterService", "debug"); + + testRunner = TestRunners.newTestRunner(SampleProcessor.class); + testRunner.setValidateExpressionUsage(false); + } + + @Test + public void testConnectionFailure() throws InitializationException { + String connectionString = "invalid-protocol://invalid-hostname"; + CouchbaseClusterControllerService service = new CouchbaseClusterService(); + testRunner.addControllerService(SERVICE_ID, service); + testRunner.setProperty(service, CouchbaseClusterService.CONNECTION_STRING, connectionString); + assertThrows(AssertionError.class, () -> testRunner.enableControllerService(service)); + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseMapCacheClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseMapCacheClient.java new file mode 100644 index 0000000000000..2592ea4f40a75 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseMapCacheClient.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.kv.GetResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.MockControllerServiceInitializationContext; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COLLECTION_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestCouchbaseMapCacheClient { + + private final Serializer stringSerializer = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8)); + private final Deserializer stringDeserializer = input -> new String(input, StandardCharsets.UTF_8); + + // TODO: Add more tests + + @Test + public void testGet() throws Exception { + final CouchbaseMapCacheClient client = new CouchbaseMapCacheClient(); + final CouchbaseClusterControllerService couchbaseService = mock(CouchbaseClusterControllerService.class); + final Bucket bucket = mock(Bucket.class); + final Collection collection = mock(Collection.class); + final GetResult getResult = mock(GetResult.class); + + final MockControllerServiceInitializationContext serviceInitializationContext + = new MockControllerServiceInitializationContext(couchbaseService, "couchbaseService"); + final Map properties = new HashMap<>(); + properties.put(COUCHBASE_CLUSTER_SERVICE, "couchbaseService"); + properties.put(BUCKET_NAME, "bucketA"); + properties.put(COLLECTION_NAME, "collectionA"); + + final byte[] contents = "value".getBytes(StandardCharsets.UTF_8); + when(couchbaseService.openBucket(eq("bucketA"))).thenReturn(bucket); + when(bucket.collection("collectionA")).thenReturn(collection); + when(getResult.contentAsBytes()).thenReturn(contents); + when(collection.get(anyString(), any())).thenReturn(getResult); + + final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + client.configure(context); + final String cacheEntry = client.get("key", stringSerializer, stringDeserializer); + + assertEquals("value", cacheEntry); + } +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseUtils.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseUtils.java new file mode 100644 index 0000000000000..62b60fe062108 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import com.couchbase.client.core.deps.io.netty.buffer.Unpooled; +import com.couchbase.client.core.error.DecodingFailureException; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.json.JsonArray; +import com.couchbase.client.java.json.JsonObject; +import com.couchbase.client.java.kv.GetResult; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestCouchbaseUtils { + + @Disabled("This test method requires a live Couchbase Server instance") + @Test + public void testDocumentTypesAndStringConversion() { + try (final Cluster cluster = Cluster.connect("couchbase://192.168.99.100:8091", "couchbase", "b1password")) { + final Bucket bucket = cluster.bucket("b1"); + final Collection collection = bucket.collection("c1"); + + collection.upsert("JsonDocument", JsonObject.create().put("one", 1)); + collection.upsert("JsonArray", JsonArray.create().add(1).add(2).add(3)); + collection.upsert("JsonDouble", 0.123); + collection.upsert("JsonString", "value"); + collection.upsert("JsonBoolean", true); + collection.upsert("JsonLong", 123L); + + collection.upsert("RawJsonDocument", "value"); + collection.upsert("StringDocument", "value"); + + collection.upsert("BinaryDocument", Unpooled.copiedBuffer("value".getBytes(StandardCharsets.UTF_8))); + collection.upsert("ByteArrayDocument", "value".getBytes(StandardCharsets.UTF_8)); + + final String[][] expectations = { + {"JsonDocument", "String", "{\"one\":1}"}, + {"JsonArray", "String", "[1,2,3]"}, + {"JsonDouble", "String", "0.123"}, + {"JsonString", "String", "\"value\""}, + {"JsonBoolean", "String", "true"}, + {"JsonLong", "String", "123"}, + {"RawJsonDocument", "String", "value"}, + {"StringDocument", "String", "value"}, + {"BinaryDocument", "byte[]", "value"}, + {"ByteArrayDocument", "byte[]", "value"}, + }; + + for (String[] expectation : expectations) { + final GetResult document = collection.get(expectation[0]); + assertEquals(expectation[1], document.contentAsObject().getClass().getSimpleName()); + assertEquals(expectation[2], CouchbaseUtils.getStringContent(document.contentAsObject())); + } + + final GetResult binaryDocument = collection.get("BinaryDocument"); + final String stringFromByteBuff = CouchbaseUtils.getStringContent(binaryDocument.contentAsObject()); + assertEquals("value", stringFromByteBuff); + + DecodingFailureException e = assertThrows(DecodingFailureException.class, () -> collection.get("JsonDocument")); + assertTrue(e.getMessage().contains("Flags (0x2000000) indicate non-binary document for id JsonDocument")); + } + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java new file mode 100644 index 0000000000000..ae7ec68fba960 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import com.couchbase.client.core.Core; +import com.couchbase.client.core.CoreContext; +import com.couchbase.client.core.cnc.tracing.NoopRequestSpan; +import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf; +import com.couchbase.client.core.deps.io.netty.buffer.Unpooled; +import com.couchbase.client.core.env.CoreEnvironment; +import com.couchbase.client.core.env.PasswordAuthenticator; +import com.couchbase.client.core.error.AmbiguousTimeoutException; +import com.couchbase.client.core.error.AuthenticationFailureException; +import com.couchbase.client.core.error.CouchbaseException; +import com.couchbase.client.core.error.DocumentNotFoundException; +import com.couchbase.client.core.error.DurabilityImpossibleException; +import com.couchbase.client.core.error.InvalidRequestException; +import com.couchbase.client.core.error.ServiceNotAvailableException; +import com.couchbase.client.core.error.context.CancellationErrorContext; +import com.couchbase.client.core.error.context.GenericRequestErrorContext; +import com.couchbase.client.core.error.context.KeyValueErrorContext; +import com.couchbase.client.core.io.CollectionIdentifier; +import com.couchbase.client.core.io.netty.kv.MemcacheProtocol; +import com.couchbase.client.core.msg.ResponseStatus; +import com.couchbase.client.core.msg.kv.GetRequest; +import com.couchbase.client.core.retry.BestEffortRetryStrategy; +import com.couchbase.client.core.util.ConnectionString; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.kv.GetOptions; +import com.couchbase.client.java.kv.GetResult; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.couchbase.CouchbaseConfigurationProperties; +import org.apache.nifi.couchbase.DocumentType; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.DOCUMENT_TYPE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_ORIGINAL; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS; +import static org.apache.nifi.processors.couchbase.CouchbaseAttributes.Exception; +import static org.apache.nifi.processors.couchbase.GetCouchbaseKey.PUT_VALUE_TO_ATTRIBUTE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestGetCouchbaseKey { + + private static final String SERVICE_ID = "couchbaseClusterService"; + + private final CoreEnvironment coreEnvironment = CoreEnvironment.builder().build(); + private final PasswordAuthenticator passwordAuthenticator = + new PasswordAuthenticator.Builder().username("couchbase").password("b1password").build(); + private final ConnectionString connectionString = ConnectionString.create("couchbase://192.168.99.100"); + private final Core core = Core.create(coreEnvironment, passwordAuthenticator, connectionString); + private final CoreContext coreContext = new CoreContext(core, 1, coreEnvironment, passwordAuthenticator); + + private TestRunner testRunner; + + @BeforeEach + public void init() throws Exception { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.GetCouchbaseKey", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.TestGetCouchbaseKey", "debug"); + + testRunner = TestRunners.newTestRunner(GetCouchbaseKey.class); + } + + private void setupMockBucket(Bucket bucket) throws InitializationException { + CouchbaseClusterControllerService service = mock(CouchbaseClusterControllerService.class); + when(service.getIdentifier()).thenReturn(SERVICE_ID); + when(service.openBucket(anyString())).thenReturn(bucket); + when(bucket.name()).thenReturn("bucket-1"); + testRunner.addControllerService(SERVICE_ID, service); + testRunner.enableControllerService(service); + testRunner.setProperty(COUCHBASE_CLUSTER_SERVICE, SERVICE_ID); + } + + @Test + public void testStaticDocId() throws Exception { + String bucketName = "bucket-1"; + String docId = "doc-a"; + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + GetResult getResult = mock(GetResult.class); + String content = "{\"key\":\"value\"}"; + long cas = 200L; + setupMockBucket(bucket); + when(bucket.collection(anyString())).thenReturn(collection); + when(getResult.cas()).thenReturn(cas); + Optional expiryTime = Optional.of(Instant.now()); + when(getResult.expiryTime()).thenReturn(expiryTime); + when(getResult.contentAsBytes()).thenReturn(content.getBytes(StandardCharsets.UTF_8)); + when(collection.get(any(), any(GetOptions.class))).thenReturn(getResult); + + testRunner.setProperty(BUCKET_NAME, bucketName); + testRunner.setProperty(DOC_ID, docId); + testRunner.enqueue(new byte[0]); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID); + outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName); + outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId); + outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas)); + outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), String.valueOf(expiryTime.get())); + } + + + @Test + public void testDocIdExp() throws Exception { + String docIdExp = "${'someProperty'}"; + String somePropertyValue = "doc-p"; + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + GetResult getResult = mock(GetResult.class); + String content = "{\"key\":\"value\"}"; + setupMockBucket(bucket); + when(bucket.collection(anyString())).thenReturn(collection); + when(getResult.expiryTime()).thenReturn(Optional.of(Instant.now())); + when(getResult.contentAsBytes()).thenReturn(content.getBytes(StandardCharsets.UTF_8)); + when(collection.get(any(), any(GetOptions.class))).thenReturn(getResult); + + testRunner.setProperty(DOC_ID, docIdExp); + + byte[] inFileData = "input FlowFile data".getBytes(StandardCharsets.UTF_8); + Map properties = new HashMap<>(); + properties.put("someProperty", somePropertyValue); + testRunner.enqueue(inFileData, properties); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + } + + @Test + public void testDocIdExpWithEmptyFlowFile() throws Exception { + String docIdExp = "doc-s"; + String docId = "doc-s"; + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + GetResult getResult = mock(GetResult.class); + String content = "{\"key\":\"value\"}"; + setupMockBucket(bucket); + when(bucket.collection(anyString())).thenReturn(collection); + when(getResult.expiryTime()).thenReturn(Optional.of(Instant.now())); + when(getResult.contentAsBytes()).thenReturn(content.getBytes(StandardCharsets.UTF_8)); + when(collection.get(any(), any(GetOptions.class))).thenReturn(getResult); + + testRunner.setProperty(DOC_ID, docIdExp); + + testRunner.enqueue(new byte[0]); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + } + + @Test + public void testDocIdExpWithInvalidExpression() throws Exception { + String docIdExp = "${nonExistingFunction('doc-s')}"; + String docId = "doc-s"; + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + GetResult getResult = mock(GetResult.class); + setupMockBucket(bucket); + when(bucket.collection(anyString())).thenReturn(collection); + when(getResult.expiryTime()).thenReturn(Optional.of(Instant.now())); + when(collection.get(any(), any(GetOptions.class))).thenReturn(getResult); + + testRunner.setProperty(DOC_ID, docIdExp); + testRunner.enqueue(new byte[0]); + + AssertionError e = assertThrows(AssertionError.class, () -> testRunner.run()); + assertEquals(e.getCause().getClass(), AttributeExpressionLanguageException.class); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + } + + @Test + public void testDocIdExpWithInvalidExpressionOnFlowFile() throws Exception { + String docIdExp = "${nonExistingFunction(someProperty)}"; + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + GetResult getResult = mock(GetResult.class); + setupMockBucket(bucket); + when(bucket.collection(anyString())).thenReturn(collection); + when(getResult.expiryTime()).thenReturn(Optional.of(Instant.now())); + when(collection.get(anyString(), any(GetOptions.class))).thenReturn(getResult); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + Map properties = new HashMap<>(); + properties.put("someProperty", "someValue"); + testRunner.enqueue(inFileData, properties); + + AssertionError e = assertThrows(AssertionError.class, () -> testRunner.run()); + assertEquals(e.getCause().getClass(), AttributeExpressionLanguageException.class); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + } + + @Test + public void testInputFlowFileContent() throws Exception { + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + GetResult getResult = mock(GetResult.class); + String inFileDataStr = "doc-in"; + String content = "{\"key\":\"value\"}"; + setupMockBucket(bucket); + when(bucket.collection(anyString())).thenReturn(collection); + when(getResult.expiryTime()).thenReturn(Optional.of(Instant.now())); + when(getResult.contentAsBytes()).thenReturn(content.getBytes(StandardCharsets.UTF_8)); + when(collection.get(anyString(), any(GetOptions.class))).thenReturn(getResult); + + + byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_ORIGINAL).get(0); + orgFile.assertContentEquals(inFileDataStr); + } + + @Test + public void testPutToAttribute() throws Exception { + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + GetResult getResult = mock(GetResult.class); + String inFileDataStr = "doc-in"; + String content = "some-value"; + setupMockBucket(bucket); + when(bucket.collection(anyString())).thenReturn(collection); + Optional expiryTime = Optional.of(Instant.now()); + when(getResult.expiryTime()).thenReturn(expiryTime); + final ByteBuf contents = Unpooled.copiedBuffer(content.getBytes(StandardCharsets.UTF_8)); + when(getResult.contentAs(ByteBuf.class)).thenReturn(contents); + when(getResult.contentAsBytes()).thenReturn(content.getBytes(StandardCharsets.UTF_8)); + + when(collection.get(anyString(), any(GetOptions.class))).thenReturn(getResult); + + byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.setProperty(PUT_VALUE_TO_ATTRIBUTE, "targetAttribute"); + testRunner.setProperty(CouchbaseConfigurationProperties.DOCUMENT_TYPE, "Binary"); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + // Result is put to Attribute, so no need to pass it to original. + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileDataStr); + outFile.assertAttributeEquals("targetAttribute", content); + + assertEquals(1, testRunner.getProvenanceEvents().size()); + assertEquals(ProvenanceEventType.FETCH, testRunner.getProvenanceEvents().get(0).getEventType()); + } + + @Test + public void testPutToAttributeNoTargetAttribute() throws Exception { + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + GetResult getResult = mock(GetResult.class); + String inFileDataStr = "doc-in"; + String content = "some-value"; + setupMockBucket(bucket); + when(bucket.collection(anyString())).thenReturn(collection); + when(getResult.expiryTime()).thenReturn(Optional.of(Instant.now())); + when(collection.get(anyString(), any(GetOptions.class))).thenReturn(getResult); + + byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.setProperty(PUT_VALUE_TO_ATTRIBUTE, "${expressionReturningNoValue}"); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 1); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); + outFile.assertContentEquals(inFileDataStr); + } + + @Test + public void testBinaryDocument() throws Exception { + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + GetResult getResult = mock(GetResult.class); + String inFileDataStr = "doc-in"; + String content = "binary"; + ByteBuf buf = Unpooled.copiedBuffer(content.getBytes(StandardCharsets.UTF_8)); + setupMockBucket(bucket); + when(bucket.collection(anyString())).thenReturn(collection); + when(getResult.expiryTime()).thenReturn(Optional.of(Instant.now())); + when(collection.get(anyString(), any(GetOptions.class))).thenReturn(getResult); + when(getResult.contentAs(ByteBuf.class)).thenReturn(buf); + + byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.setProperty(DOCUMENT_TYPE, DocumentType.Binary.toString()); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_ORIGINAL).get(0); + orgFile.assertContentEquals(inFileDataStr); + } + + @Test + public void testBinaryDocumentToAttribute() throws Exception { + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + GetResult getResult = mock(GetResult.class); + String inFileDataStr = "doc-in"; + String content = "binary"; + ByteBuf buf = Unpooled.copiedBuffer(content.getBytes(StandardCharsets.UTF_8)); + setupMockBucket(bucket); + when(bucket.collection(anyString())).thenReturn(collection); + when(getResult.expiryTime()).thenReturn(Optional.of(Instant.now())); + when(collection.get(anyString(), any(GetOptions.class))).thenReturn(getResult); + when(getResult.contentAs(ByteBuf.class)).thenReturn(buf); + when(getResult.contentAsBytes()).thenReturn(content.getBytes(StandardCharsets.UTF_8)); + + byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.setProperty(DOCUMENT_TYPE, DocumentType.Binary.toString()); + testRunner.setProperty(PUT_VALUE_TO_ATTRIBUTE, "targetAttribute"); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileDataStr); + outFile.assertAttributeEquals("targetAttribute", "binary"); + } + + + @Test + public void testCouchbaseFailure() throws Exception { + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + String inFileDataStr = "doc-in"; + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.get(anyString(), any(GetOptions.class))) + .thenThrow(new ServiceNotAvailableException("test", + new GenericRequestErrorContext( + createNewGetRequest("bucket-a", "collection-a") + ))); + setupMockBucket(bucket); + + + byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + + AssertionError e = assertThrows(AssertionError.class, () -> testRunner.run()); + assertEquals(e.getCause().getClass(), ProcessException.class); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + } + + @Test + public void testCouchbaseConfigurationError() throws Exception { + String docIdExp = "doc-c"; + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.get(anyString(), any(GetOptions.class))) + .thenThrow(new AuthenticationFailureException("test", new KeyValueErrorContext( + createNewGetRequest("bucket-a", "bucket-c"), + mock(ResponseStatus.class), + mock(MemcacheProtocol.FlexibleExtras.class)), + new RuntimeException())); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + + AssertionError e = assertThrows(AssertionError.class, () -> testRunner.run()); + assertEquals(e.getCause().getClass(), ProcessException.class); + assertEquals(e.getCause().getCause().getClass(), AuthenticationFailureException.class); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + } + + @Test + public void testCouchbaseInvalidInputError() throws Exception { + String docIdExp = "doc-c"; + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + CouchbaseException exception = new InvalidRequestException(new KeyValueErrorContext( + createNewGetRequest("bucket-a", "collection-a"), + mock(ResponseStatus.class), + mock(MemcacheProtocol.FlexibleExtras.class))); + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.get(any(), any(GetOptions.class))) + .thenThrow(exception); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 1); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); + orgFile.assertContentEquals(inputFileDataStr); + orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName()); + } + + @Test + public void testCouchbaseTempClusterError() throws Exception { + String docIdExp = "doc-c"; + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + CouchbaseException exception = new AmbiguousTimeoutException( + "test", + new CancellationErrorContext(new KeyValueErrorContext( + createNewGetRequest("bucket-a", "collection-a"), + mock(ResponseStatus.class), + mock(MemcacheProtocol.FlexibleExtras.class))) + ); + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.get(anyString(), any(GetOptions.class))) + .thenThrow(exception); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0); + orgFile.assertContentEquals(inputFileDataStr); + orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName()); + } + + + @Test + public void testCouchbaseTempFlowFileError() throws Exception { + String docIdExp = "doc-c"; + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + CouchbaseException exception = new DurabilityImpossibleException(new KeyValueErrorContext( + createNewGetRequest("bucket-a", "collection-a"), + mock(ResponseStatus.class), + mock(MemcacheProtocol.FlexibleExtras.class))); + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.get(anyString(), any(GetOptions.class))) + .thenThrow(exception); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0); + orgFile.assertContentEquals(inputFileDataStr); + orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName()); + assertTrue(orgFile.isPenalized()); + } + + @Test + public void testDocumentNotFound() throws Exception { + String docIdExp = "doc-n"; + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.get(anyString(), any(GetOptions.class))) + .thenReturn(null); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 1); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); + orgFile.assertContentEquals(inputFileDataStr); + orgFile.assertAttributeEquals(Exception.key(), DocumentNotFoundException.class.getName()); + } + + private GetRequest createNewGetRequest(final String bucketName, final String collectionName) { + return new GetRequest( + collectionName, + Duration.ofSeconds(3), + coreContext, + new CollectionIdentifier( + bucketName, + Optional.of(CollectionIdentifier.DEFAULT_COLLECTION), + Optional.of(CollectionIdentifier.DEFAULT_SCOPE)), + BestEffortRetryStrategy.INSTANCE, + NoopRequestSpan.INSTANCE); + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java new file mode 100644 index 0000000000000..f297661b170ef --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import com.couchbase.client.core.Core; +import com.couchbase.client.core.CoreContext; +import com.couchbase.client.core.cnc.tracing.NoopRequestSpan; +import com.couchbase.client.core.env.CoreEnvironment; +import com.couchbase.client.core.env.PasswordAuthenticator; +import com.couchbase.client.core.error.CouchbaseException; +import com.couchbase.client.core.error.DurabilityImpossibleException; +import com.couchbase.client.core.error.ServiceNotAvailableException; +import com.couchbase.client.core.error.context.KeyValueErrorContext; +import com.couchbase.client.core.io.CollectionIdentifier; +import com.couchbase.client.core.io.netty.kv.MemcacheProtocol; +import com.couchbase.client.core.msg.ResponseStatus; +import com.couchbase.client.core.msg.kv.GetRequest; +import com.couchbase.client.core.retry.BestEffortRetryStrategy; +import com.couchbase.client.core.util.ConnectionString; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.kv.MutationResult; +import com.couchbase.client.java.kv.PersistTo; +import com.couchbase.client.java.kv.ReplicateTo; +import com.couchbase.client.java.kv.UpsertOptions; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.couchbase.DocumentType; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.DOCUMENT_TYPE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS; +import static org.apache.nifi.processors.couchbase.CouchbaseAttributes.Exception; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class TestPutCouchbaseKey { + + private static final String SERVICE_ID = "couchbaseClusterService"; + private TestRunner testRunner; + + private final CoreEnvironment coreEnvironment = CoreEnvironment.builder().build(); + private final PasswordAuthenticator passwordAuthenticator = + new PasswordAuthenticator.Builder().username("couchbase").password("b1password").build(); + private final ConnectionString connectionString = ConnectionString.create("couchbase://192.168.99.100"); + private final Core core = Core.create(coreEnvironment, passwordAuthenticator, connectionString); + private final CoreContext coreContext = new CoreContext(core, 1, coreEnvironment, passwordAuthenticator); + + @BeforeEach + public void init() throws Exception { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.TestPutCouchbaseKey", "debug"); + + testRunner = TestRunners.newTestRunner(PutCouchbaseKey.class); + } + + private void setupMockBucket(Bucket bucket) throws InitializationException { + CouchbaseClusterControllerService service = mock(CouchbaseClusterControllerService.class); + when(service.getIdentifier()).thenReturn(SERVICE_ID); + when(service.openBucket(anyString())).thenReturn(bucket); + when(bucket.name()).thenReturn("bucket-1"); + testRunner.addControllerService(SERVICE_ID, service); + testRunner.enableControllerService(service); + testRunner.setProperty(COUCHBASE_CLUSTER_SERVICE, SERVICE_ID); + } + + @Test + public void testStaticDocId() throws Exception { + String bucketName = "bucket-1"; + String docId = "doc-a"; + long cas = 200L; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + MutationResult result = mock(MutationResult.class); + when(result.cas()).thenReturn(cas); + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.upsert(anyString(), any(), any(UpsertOptions.class))) + .thenReturn(result); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(BUCKET_NAME, bucketName); + testRunner.setProperty(DOC_ID, docId); + testRunner.run(); + + verify(collection, times(1)) + .upsert(anyString(), any(), any(UpsertOptions.class)); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID); + outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName); + outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId); + outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas)); + } + + @Test + public void testBinaryDoc() throws Exception { + String bucketName = "bucket-1"; + String docId = "doc-a"; + long cas = 200L; + + String inFileData = "12345"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + MutationResult result = mock(MutationResult.class); + when(result.cas()).thenReturn(cas); + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.upsert(anyString(), any(), any(UpsertOptions.class))) + .thenReturn(result); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(BUCKET_NAME, bucketName); + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(DOCUMENT_TYPE, DocumentType.Binary.name()); + testRunner.run(); + + verify(collection, times(1)) + .upsert(anyString(), any(), any(UpsertOptions.class)); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID); + outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName); + outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId); + outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas)); + } + + @Test + public void testDurabilityConstraint() throws Exception { + String docId = "doc-a"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + MutationResult result = mock(MutationResult.class); + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.upsert(anyString(), any(), any(UpsertOptions.class))) + .thenReturn(result); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(PutCouchbaseKey.PERSIST_TO, PersistTo.ACTIVE.toString()); + testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString()); + testRunner.run(); + + verify(collection, times(1)) + .upsert(anyString(), any(), any(UpsertOptions.class)); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + } + + @Test + public void testDocIdExp() throws Exception { + String docIdExp = "${'someProperty'}"; + String somePropertyValue = "doc-p"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + long cas = 200L; + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + MutationResult result = mock(MutationResult.class); + when(result.cas()).thenReturn(cas); + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.upsert(anyString(), any(), any(UpsertOptions.class))) + .thenReturn(result); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + Map properties = new HashMap<>(); + properties.put("someProperty", somePropertyValue); + testRunner.enqueue(inFileDataBytes, properties); + testRunner.run(); + + ArgumentCaptor capture = ArgumentCaptor.forClass(String.class); + verify(collection, times(1)) + .upsert(capture.capture(), any(), any(UpsertOptions.class)); + assertEquals(somePropertyValue, capture.getValue()); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + } + + @Test + public void testInvalidDocIdExp() throws Exception { + String docIdExp = "${invalid_function(someProperty)}"; + String somePropertyValue = "doc-p"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + MutationResult result = mock(MutationResult.class); + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.upsert(anyString(), any(UpsertOptions.class))) + .thenReturn(result); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + Map properties = new HashMap<>(); + properties.put("someProperty", somePropertyValue); + testRunner.enqueue(inFileDataBytes, properties); + + AssertionError e = assertThrows(AssertionError.class, () -> testRunner.run()); + assertEquals(e.getCause().getClass(), AttributeExpressionLanguageException.class); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + } + + @Test + public void testCouchbaseFailure() throws Exception { + + String docId = "doc-a"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.upsert(anyString(), any(), any(UpsertOptions.class))) + .thenThrow(new ServiceNotAvailableException("test", + new KeyValueErrorContext( + createNewGetRequest("bucket-a", "collection-a"), + mock(ResponseStatus.class), + mock(MemcacheProtocol.FlexibleExtras.class)))); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString()); + + AssertionError e = assertThrows(AssertionError.class, () -> testRunner.run()); + assertEquals(e.getCause().getClass(), ProcessException.class); + + verify(collection, times(1)) + .upsert(anyString(), any(), any(UpsertOptions.class)); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + } + + @Test + public void testCouchbaseTempFlowFileError() throws Exception { + + String docId = "doc-a"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + CouchbaseException exception = new DurabilityImpossibleException(new KeyValueErrorContext( + createNewGetRequest("bucket-a", "collection-a"), + mock(ResponseStatus.class), + mock(MemcacheProtocol.FlexibleExtras.class))); + Bucket bucket = mock(Bucket.class); + Collection collection = mock(Collection.class); + when(bucket.collection(anyString())).thenReturn(collection); + when(collection.upsert(anyString(), any(), any(UpsertOptions.class))) + .thenThrow(exception); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString()); + testRunner.run(); + + verify(collection, times(1)) + .upsert(anyString(), any(), any(UpsertOptions.class)); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_RETRY, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0); + orgFile.assertContentEquals(inFileData); + orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName()); + } + + private GetRequest createNewGetRequest(final String bucketName, final String collectionName) { + return new GetRequest( + collectionName, + Duration.ofSeconds(3), + coreContext, + new CollectionIdentifier( + bucketName, + Optional.of(CollectionIdentifier.DEFAULT_COLLECTION), + Optional.of(CollectionIdentifier.DEFAULT_SCOPE)), + BestEffortRetryStrategy.INSTANCE, + NoopRequestSpan.INSTANCE); + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/pom.xml new file mode 100644 index 0000000000000..9d904783c8148 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/pom.xml @@ -0,0 +1,41 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-couchbase-bundle + 2.0.0-SNAPSHOT + + + nifi-couchbase-services-api-nar + nar + + + + org.apache.nifi + nifi-standard-services-api-nar + 2.0.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-couchbase-services-api + 2.0.0-SNAPSHOT + + + diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/src/main/resources/META-INF/NOTICE b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000000..0b003b66649c0 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,21 @@ +nifi-couchbase-services-api-nar +Copyright 2014-2024 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Couchbase Java SDK + The following NOTICE information applies: + Couchbase Java SDK + Copyright 2014 Couchbase, Inc. + + (ASLv2) RxJava + The following NOTICE information applies: + Couchbase Java SDK + Copyright 2012 Netflix, Inc. \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/pom.xml new file mode 100644 index 0000000000000..cabde690c543e --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/pom.xml @@ -0,0 +1,38 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-couchbase-bundle + 2.0.0-SNAPSHOT + + + nifi-couchbase-services-api + jar + + + + org.apache.nifi + nifi-api + + + com.couchbase.client + java-client + + + diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java new file mode 100644 index 0000000000000..7969c5025b0c3 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import org.apache.nifi.controller.ControllerService; + +import com.couchbase.client.java.Bucket; + +/** + * Provides a connection to a Couchbase Server cluster throughout a NiFi Data + * flow. + */ +public interface CouchbaseClusterControllerService extends ControllerService { + + /** + * Open a bucket connection. + * @param bucketName the bucket name to access + * @return a connected bucket instance + */ + Bucket openBucket(String bucketName); + +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseConfigurationProperties.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseConfigurationProperties.java new file mode 100644 index 0000000000000..1850121d23029 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseConfigurationProperties.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; + +public class CouchbaseConfigurationProperties { + + public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor.Builder() + .name("cluster-controller-service") + .displayName("Couchbase Cluster Controller Service") + .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.") + .required(true) + .identifiesControllerService(CouchbaseClusterControllerService.class) + .build(); + + public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder() + .name("bucket-name") + .displayName("Bucket Name") + .description("The name of bucket to access.") + .required(true) + .addValidator(Validator.VALID) + .defaultValue("default") + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + public static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() + .name("Collection Name") + .description("The name of collection inside the specified bucket to access.") + .required(true) + .addValidator(Validator.VALID) + .defaultValue("default") + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder() + .name("document-type") + .displayName("Document Type") + .description("The type of contents.") + .required(true) + .allowableValues(DocumentType.values()) + .defaultValue(DocumentType.Json.toString()) + .build(); + + public static final PropertyDescriptor LOOKUP_SUB_DOC_PATH = new PropertyDescriptor.Builder() + .name("lookup-sub-doc-path") + .displayName("Lookup Sub-Document Path") + .description("The Sub-Document lookup path within the target JSON document.") + .required(false) + .addValidator(Validator.VALID) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/DocumentType.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/DocumentType.java new file mode 100644 index 0000000000000..77653cbb731ee --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/DocumentType.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + + +/** + * Supported Couchbase document types. + * + * In order to handle a variety type of document classes such as JsonDocument, + * JsonLongDocument or JsonStringDocument, Couchbase processors use + * RawJsonDocument for Json type. + * + * The distinction between Json and Binary exists because BinaryDocument doesn't + * set Json flag when it stored on Couchbase Server even if the content byte + * array represents a Json string, and it can't be retrieved as a Json document. + */ +public enum DocumentType { + + Json, + Binary + +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml new file mode 100644 index 0000000000000..3b6357d3c58c5 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml @@ -0,0 +1,54 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-standard-services-api-bom + 2.0.0-SNAPSHOT + ../nifi-standard-services-api-bom + + + nifi-couchbase-bundle + pom + + + 3.6.2 + 2.6.2 + + + + nifi-couchbase-services-api + nifi-couchbase-services-api-nar + nifi-couchbase-processors + nifi-couchbase-nar + + + + + com.couchbase.client + java-client + ${couchbase.version} + + + com.couchbase.client + core-io + ${couchbase.core-io.version} + + + + diff --git a/nifi-extension-bundles/pom.xml b/nifi-extension-bundles/pom.xml index fa85ffa68cefd..8220e1f1b1868 100755 --- a/nifi-extension-bundles/pom.xml +++ b/nifi-extension-bundles/pom.xml @@ -104,5 +104,6 @@ nifi-questdb-bundle nifi-protobuf-bundle nifi-github-bundle + nifi-couchbase-bundle