Skip to content

Commit

Permalink
MINIFICPP-2435 Add attributes to original flow file on failed transfo…
Browse files Browse the repository at this point in the history
…rm in python

Closes #1849

Signed-off-by: Marton Szasz <[email protected]>
  • Loading branch information
lordgamez authored and szaszm committed Aug 13, 2024
1 parent 64e0134 commit 15e9a96
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 4 deletions.
4 changes: 3 additions & 1 deletion docker/test/integration/cluster/ImageStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def __build_minifi_cpp_image_with_nifi_python_processors(self, python_option):
COPY SpecialPropertyTypeChecker.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/SpecialPropertyTypeChecker.py
COPY ProcessContextInterfaceChecker.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/ProcessContextInterfaceChecker.py
COPY CreateFlowFile.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateFlowFile.py
COPY FailureWithAttributes.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithAttributes.py
RUN wget {parse_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\
wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\
echo 'langchain<=0.17.0' > /opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt && \\
Expand All @@ -186,7 +187,8 @@ def __build_minifi_cpp_image_with_nifi_python_processors(self, python_option):
return self.__build_image(dockerfile, [os.path.join(self.test_dir, "resources", "python", "RotatingForwarder.py"),
os.path.join(self.test_dir, "resources", "python", "SpecialPropertyTypeChecker.py"),
os.path.join(self.test_dir, "resources", "python", "ProcessContextInterfaceChecker.py"),
os.path.join(self.test_dir, "resources", "python", "CreateFlowFile.py")])
os.path.join(self.test_dir, "resources", "python", "CreateFlowFile.py"),
os.path.join(self.test_dir, "resources", "python", "FailureWithAttributes.py")])

def __build_http_proxy_image(self):
dockerfile = dedent("""\
Expand Down
18 changes: 18 additions & 0 deletions docker/test/integration/features/python.feature
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,21 @@ Feature: MiNiFi can use python processors in its flows
When all instances start up

Then one flowfile with the contents "Check successful!" is placed in the monitored directory in less than 30 seconds

@USE_NIFI_PYTHON_PROCESSORS
Scenario: NiiFi native python processor can update attributes of a flow file transferred to failure relationship
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a UpdateAttribute processor with the "my.attribute" property set to "my.value"
And the "error.message" property of the UpdateAttribute processor is set to "Old error"
And a FailureWithAttributes processor
And a LogAttribute processor
And python is installed on the MiNiFi agent with a pre-created virtualenv

And the "success" relationship of the GenerateFlowFile processor is connected to the UpdateAttribute
And the "success" relationship of the UpdateAttribute processor is connected to the FailureWithAttributes
And the "failure" relationship of the FailureWithAttributes processor is connected to the LogAttribute

When all instances start up

Then the Minifi logs contain the following message: "key:error.message value:Error" in less than 60 seconds
And the Minifi logs contain the following message: "key:my.attribute value:my.value" in less than 10 seconds
26 changes: 26 additions & 0 deletions docker/test/integration/minifi/processors/FailureWithAttributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..core.Processor import Processor


class FailureWithAttributes(Processor):
def __init__(self, context):
super(FailureWithAttributes, self).__init__(
context=context,
clazz='FailureWithAttributes',
class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.',
properties={},
schedule={'scheduling strategy': 'EVENT_DRIVEN'},
auto_terminate=[])
24 changes: 24 additions & 0 deletions docker/test/integration/resources/python/FailureWithAttributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult


class FailureWithAttributes(FlowFileTransform):
def __init__(self, **kwargs):
pass

def transform(self, context, flowFile):
return FlowFileTransformResult("failure", attributes={"error.message": "Error"})
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,18 @@ def onTrigger(self, context: ProcessContext, session: ProcessSession):
session.transfer(original_flow_file, self.REL_FAILURE)
return

result_attributes = result.getAttributes()
if result.getRelationship() == "failure":
session.remove(flow_file)
if result_attributes is not None:
for name, value in result_attributes.items():
original_flow_file.setAttribute(name, value)
session.transfer(original_flow_file, self.REL_FAILURE)
return

result_attributes = result.getAttributes()
if result_attributes is not None:
for attribute in result_attributes:
flow_file.addAttribute(attribute, result_attributes[attribute])
for name, value in result_attributes.items():
flow_file.setAttribute(name, value)

result_content = result.getContents()
if result_content is not None:
Expand Down

0 comments on commit 15e9a96

Please sign in to comment.