Skip to content

Commit

Permalink
MINIFICPP-2439 Restrict failure and original python relationship usage
Browse files Browse the repository at this point in the history
Closes #1852

Signed-off-by: Martin Zink <[email protected]>
  • Loading branch information
lordgamez authored and martinzink committed Aug 21, 2024
1 parent dc5c441 commit 1e19967
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 21 deletions.
8 changes: 6 additions & 2 deletions docker/test/integration/cluster/ImageStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def __build_minifi_cpp_image_with_nifi_python_processors(self, python_option):
COPY RelativeImporterProcessor.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/RelativeImporterProcessor.py
COPY multiplierutils.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/multiplierutils.py
COPY CreateNothing.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateNothing.py
COPY FailureWithContent.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithContent.py
COPY TransferToOriginal.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/TransferToOriginal.py
RUN wget {parse_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\
wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\
echo 'langchain<=0.17.0' > /opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt && \\
Expand All @@ -195,8 +197,10 @@ def __build_minifi_cpp_image_with_nifi_python_processors(self, python_option):
os.path.join(self.test_dir, "resources", "python", "FailureWithAttributes.py"),
os.path.join(self.test_dir, "resources", "python", "RelativeImporterProcessor.py"),
os.path.join(self.test_dir, "resources", "python", "subtractutils.py"),
os.path.join(self.test_dir, "resources", "python", "multiplierutils.py")])
os.path.join(self.test_dir, "resources", "python", "CreateNothing.py")])
os.path.join(self.test_dir, "resources", "python", "multiplierutils.py"),
os.path.join(self.test_dir, "resources", "python", "CreateNothing.py"),
os.path.join(self.test_dir, "resources", "python", "FailureWithContent.py"),
os.path.join(self.test_dir, "resources", "python", "TransferToOriginal.py")])

def __build_http_proxy_image(self):
dockerfile = dedent("""\
Expand Down
24 changes: 24 additions & 0 deletions docker/test/integration/features/python.feature
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,27 @@ Feature: MiNiFi can use python processors in its flows
When the MiNiFi instance starts up
Then no files are placed in the monitored directory in 10 seconds of running time
And the Minifi logs do not contain the following message: "Caught Exception during SchedulingAgent::onTrigger of processor CreateNothing" after 1 seconds

@USE_NIFI_PYTHON_PROCESSORS
Scenario: NiFi native python processor cannot specify content of failure result
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a FailureWithContent processor
And python is installed on the MiNiFi agent with a pre-created virtualenv

And the "success" relationship of the GenerateFlowFile processor is connected to the FailureWithContent

When all instances start up

Then the Minifi logs contain the following message: "'failure' relationship should not have content, the original flow file will be transferred automatically in this case." in less than 60 seconds

@USE_NIFI_PYTHON_PROCESSORS
Scenario: NiFi native python processor cannot transfer to original relationship
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a TransferToOriginal processor
And python is installed on the MiNiFi agent with a pre-created virtualenv

And the "success" relationship of the GenerateFlowFile processor is connected to the TransferToOriginal

When all instances start up

Then the Minifi logs contain the following message: "Result relationship cannot be 'original', it is reserved for the original flow file, and transferred automatically in non-failure cases." in less than 60 seconds
26 changes: 26 additions & 0 deletions docker/test/integration/minifi/processors/FailureWithContent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..core.Processor import Processor


class FailureWithContent(Processor):
def __init__(self, context):
super(FailureWithContent, self).__init__(
context=context,
clazz='FailureWithContent',
class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.',
properties={},
schedule={'scheduling strategy': 'EVENT_DRIVEN'},
auto_terminate=[])
26 changes: 26 additions & 0 deletions docker/test/integration/minifi/processors/TransferToOriginal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..core.Processor import Processor


class TransferToOriginal(Processor):
def __init__(self, context):
super(TransferToOriginal, self).__init__(
context=context,
clazz='TransferToOriginal',
class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.',
properties={},
schedule={'scheduling strategy': 'EVENT_DRIVEN'},
auto_terminate=[])
24 changes: 24 additions & 0 deletions docker/test/integration/resources/python/FailureWithContent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult


class FailureWithContent(FlowFileTransform):
def __init__(self, **kwargs):
pass

def transform(self, context, flowFile):
return FlowFileTransformResult("failure", contents="Content?")
Original file line number Diff line number Diff line change
Expand Up @@ -63,30 +63,30 @@ def getRelationships(self):
def transform(self, context, flowFile):
properties = context.getProperties()
if len(properties) != 3:
return FlowFileTransformResult("failure", contents="Property count is invalid")
return FlowFileTransformResult("failure")

property_names = [property.name for property in properties]
if "Secret Password" not in property_names or "Request Timeout" not in property_names or "Wish Count" not in property_names:
return FlowFileTransformResult("failure", contents="Missing properties")
return FlowFileTransformResult("failure")

for property in properties:
if property.name == "Secret Password" and properties[property] != "mysecret":
return FlowFileTransformResult("failure", contents="Secret Password value is invalid")
return FlowFileTransformResult("failure")
elif property.name == "Request Timeout" and properties[property] != "60 sec":
return FlowFileTransformResult("failure", contents="Request Timeout value is invalid")
return FlowFileTransformResult("failure")
elif property.name == "Wish Count" and properties[property] != "3":
return FlowFileTransformResult("failure", contents="Wish Count value is invalid")
return FlowFileTransformResult("failure")

secret_password = context.getProperty(self.SECRET_PASSWORD).getValue()
if secret_password != "mysecret":
return FlowFileTransformResult("failure", contents="Secret password is invalid")
return FlowFileTransformResult("failure")

timeout = context.getProperty(self.REQUEST_TIMEOUT).getValue()
if timeout != "60 sec":
return FlowFileTransformResult("failure", contents="Request timeout is invalid")
return FlowFileTransformResult("failure")

wish_count = context.getProperty(self.WISH_COUNT).getValue()
if wish_count != "3":
return FlowFileTransformResult("failure", contents="Wish count is invalid")
return FlowFileTransformResult("failure")

return FlowFileTransformResult("myrelationship", contents="Check successful!")
Original file line number Diff line number Diff line change
Expand Up @@ -56,57 +56,57 @@ def transform(self, context, flowFile):
time_in_microseconds = context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.MICROSECONDS)
if time_in_microseconds != 7200000000:
self.logger.error("Time period property conversion to microseconds is not working as expected")
return FlowFileTransformResult("failure", contents="Time period property conversion to microseconds is not working as expected")
return FlowFileTransformResult("failure")

time_in_milliseconds = context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.MILLISECONDS)
if time_in_milliseconds != 7200000:
self.logger.error("Time period property conversion to milliseconds is not working as expected")
return FlowFileTransformResult("failure", contents="Time period property conversion to milliseconds is not working as expected")
return FlowFileTransformResult("failure")

time_in_seconds = context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.SECONDS)
if time_in_seconds != 7200:
self.logger.error("Time period property conversion to seconds is not working as expected")
return FlowFileTransformResult("failure", contents="Time period property conversion to seconds is not working as expected")
return FlowFileTransformResult("failure")

time_in_minutes = context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.MINUTES)
if time_in_minutes != 120:
self.logger.error("Time period property conversion to minutes is not working as expected")
return FlowFileTransformResult("failure", contents="Time period property conversion to minutes is not working as expected")
return FlowFileTransformResult("failure")

time_in_hours = context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.HOURS)
if time_in_hours != 2:
self.logger.error("Time period property conversion to hours is not working as expected")
return FlowFileTransformResult("failure", contents="Time period property conversion to hours is not working as expected")
return FlowFileTransformResult("failure")

time_in_days = context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.DAYS)
if time_in_days != 0:
self.logger.error("Time period property conversion to days is not working as expected")
return FlowFileTransformResult("failure", contents="Time period property conversion to days is not working as expected")
return FlowFileTransformResult("failure")

data_size_in_bytes = context.getProperty(self.DATA_SIZE_PROPERTY).asDataSize(DataUnit.B)
if data_size_in_bytes != 104857600.0:
self.logger.error("Data size property conversion to bytes is not working as expected")
return FlowFileTransformResult("failure", contents="Data size property conversion to bytes is not working as expected")
return FlowFileTransformResult("failure")

data_size_in_kilobytes = context.getProperty(self.DATA_SIZE_PROPERTY).asDataSize(DataUnit.KB)
if data_size_in_kilobytes != 102400.0:
self.logger.error("Data size property conversion to kilobytes is not working as expected")
return FlowFileTransformResult("failure", contents="Data size property conversion to kilobytes is not working as expected")
return FlowFileTransformResult("failure")

data_size_in_megabytes = context.getProperty(self.DATA_SIZE_PROPERTY).asDataSize(DataUnit.MB)
if data_size_in_megabytes != 100.0:
self.logger.error("Data size property conversion to megabytes is not working as expected")
return FlowFileTransformResult("failure", contents="Data size property conversion to megabytes is not working as expected")
return FlowFileTransformResult("failure")

data_size_in_gigabytes = context.getProperty(self.DATA_SIZE_PROPERTY).asDataSize(DataUnit.GB)
if data_size_in_gigabytes != 0.09765625:
self.logger.error("Data size property conversion to gigabytes is not working as expected")
return FlowFileTransformResult("failure", contents="Data size property conversion to gigabytes is not working as expected")
return FlowFileTransformResult("failure")

ssl_context = context.getProperty(self.SSL_CONTEXT_PROPERTY).asControllerService()
cert = ssl_context.getCertificateFile()
if cert != "/tmp/resources/minifi_client.crt":
self.logger.error("SSL Context Service property is not working as expected")
return FlowFileTransformResult("failure", contents="SSL Context Service property is not working as expected")
return FlowFileTransformResult("failure")

return FlowFileTransformResult("success", contents="Check successful!")
24 changes: 24 additions & 0 deletions docker/test/integration/resources/python/TransferToOriginal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult


class TransferToOriginal(FlowFileTransform):
def __init__(self, **kwargs):
pass

def transform(self, context, flowFile):
return FlowFileTransformResult("original")
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,20 @@ def onTrigger(self, context: ProcessContext, session: ProcessSession):
session.transfer(original_flow_file, self.REL_FAILURE)
return

if result.getRelationship() == "original":
session.remove(flow_file)
self.logger.error("Result relationship cannot be 'original', it is reserved for the original flow file, and transferred automatically in non-failure cases.")
session.transfer(original_flow_file, self.REL_FAILURE)
return

result_attributes = result.getAttributes()
if result.getRelationship() == "failure":
session.remove(flow_file)
if result_attributes is not None:
for name, value in result_attributes.items():
original_flow_file.setAttribute(name, value)
if result.getContents() is not None:
self.logger.error("'failure' relationship should not have content, the original flow file will be transferred automatically in this case.")
session.transfer(original_flow_file, self.REL_FAILURE)
return

Expand Down

0 comments on commit 1e19967

Please sign in to comment.