diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index caca3e97da..781ce420d9 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -171,6 +171,7 @@ def __build_minifi_cpp_image_with_nifi_python_processors(self, python_option): COPY CreateNothing.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateNothing.py COPY FailureWithContent.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithContent.py COPY TransferToOriginal.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/TransferToOriginal.py + COPY SetRecordField.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/SetRecordField.py RUN wget {parse_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ echo 'langchain<=0.17.0' > /opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt && \\ @@ -205,6 +206,7 @@ def build_full_python_resource_path(resource): build_full_python_resource_path("CreateNothing.py"), build_full_python_resource_path("FailureWithContent.py"), build_full_python_resource_path("TransferToOriginal.py"), + build_full_python_resource_path("SetRecordField.py"), ]) def __build_http_proxy_image(self): diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature index ab73628dd6..c7ca8d3580 100644 --- a/docker/test/integration/features/python.feature +++ b/docker/test/integration/features/python.feature @@ -222,3 +222,27 @@ Feature: MiNiFi can use python processors in its flows When all instances start up Then the Minifi logs contain the following message: "Result relationship cannot be 'original', it is reserved for the original flow file, and transferred automatically in non-failure cases." in less than 60 seconds + + @USE_NIFI_PYTHON_PROCESSORS + Scenario: MiNiFi C++ supports RecordTransform native python processors + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content '{"group": "group1", "name": "John"}\n{"group": "group1", "name": "Jane"}\n{"group": "group2", "name": "Kyle"}\n{"name": "Zoe"}' is present in '/tmp/input' + And a file with the content '{"group": "group1", "name": "Steve"}\n{}' is present in '/tmp/input' + And a SetRecordField processor with the "Record Reader" property set to "JsonRecordSetReader" + And the "Record Writer" property of the SetRecordField processor is set to "JsonRecordSetWriter" + And a JsonRecordSetReader controller service is set up + And a JsonRecordSetWriter controller service is set up + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And the "Log Payload" property of the LogAttribute processor is set to "true" + And python is installed on the MiNiFi agent with a pre-created virtualenv + + And the "success" relationship of the GetFile processor is connected to the SetRecordField + And the "success" relationship of the SetRecordField processor is connected to the LogAttribute + + When all instances start up + + Then the Minifi logs contain the following message: '[{"group":"group1","name":"John"},{"group":"group1","name":"Jane"}]' in less than 60 seconds + And the Minifi logs contain the following message: '[{"group":"group2","name":"Kyle"}]' in less than 5 seconds + And the Minifi logs contain the following message: '[{"name":"Zoe"}]' in less than 5 seconds + And the Minifi logs contain the following message: '[{"group":"group1","name":"Steve"}]' in less than 5 seconds + And the Minifi logs contain the following message: '[{}]' in less than 5 seconds diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 75cf19a34b..ddaa3c78c4 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -25,6 +25,7 @@ from minifi.controllers.ODBCService import ODBCService from minifi.controllers.KubernetesControllerService import KubernetesControllerService from minifi.controllers.JsonRecordSetWriter import JsonRecordSetWriter +from minifi.controllers.JsonRecordSetReader import JsonRecordSetReader from behave import given, then, when from behave.model_describe import ModelDescriptor @@ -271,6 +272,7 @@ def step_impl(context): @given("a file with the content \"{content}\" is present in \"{path}\"") +@given("a file with the content '{content}' is present in '{path}'") @then("a file with the content \"{content}\" is placed in \"{path}\"") def step_impl(context, content, path): context.test.add_test_data(path, content) @@ -396,7 +398,7 @@ def step_impl(context, processor_name): processor.set_property('SSL Context Service', ssl_context_service.name) -# RecordSetWriters +# Record set reader and writer @given("a JsonRecordSetWriter controller service is set up for {processor_name}") def step_impl(context, processor_name): json_record_set_writer = JsonRecordSetWriter() @@ -404,6 +406,21 @@ def step_impl(context, processor_name): processor = context.test.get_node_by_name(processor_name) processor.controller_services.append(json_record_set_writer) processor.set_property('Record Set Writer', json_record_set_writer.name) + processor.set_property('Record Writer', json_record_set_writer.name) + + +@given("a JsonRecordSetWriter controller service is set up") +def step_impl(context): + json_record_set_writer = JsonRecordSetWriter(name="JsonRecordSetWriter", output_grouping="Array") + container = context.test.acquire_container(context=context, name="minifi-cpp-flow") + container.add_controller(json_record_set_writer) + + +@given("a JsonRecordSetReader controller service is set up") +def step_impl(context): + json_record_set_reader = JsonRecordSetReader("JsonRecordSetReader") + container = context.test.acquire_container(context=context, name="minifi-cpp-flow") + container.add_controller(json_record_set_reader) # Kubernetes diff --git a/docker/test/integration/minifi/controllers/JsonRecordSetReader.py b/docker/test/integration/minifi/controllers/JsonRecordSetReader.py new file mode 100644 index 0000000000..909bf0e7ce --- /dev/null +++ b/docker/test/integration/minifi/controllers/JsonRecordSetReader.py @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ..core.ControllerService import ControllerService + + +class JsonRecordSetReader(ControllerService): + def __init__(self, name=None): + super(JsonRecordSetReader, self).__init__(name=name) + self.service_class = 'JsonRecordSetReader' diff --git a/docker/test/integration/minifi/controllers/JsonRecordSetWriter.py b/docker/test/integration/minifi/controllers/JsonRecordSetWriter.py index 9708c6cb63..ca00764b5a 100644 --- a/docker/test/integration/minifi/controllers/JsonRecordSetWriter.py +++ b/docker/test/integration/minifi/controllers/JsonRecordSetWriter.py @@ -18,7 +18,7 @@ class JsonRecordSetWriter(ControllerService): - def __init__(self, name=None, cert=None, key=None, ca_cert=None, passphrase=None, use_system_cert_store=None): + def __init__(self, name=None, output_grouping='One Line Per Object'): super(JsonRecordSetWriter, self).__init__(name=name) self.service_class = 'JsonRecordSetWriter' - self.properties['Output Grouping'] = 'One Line Per Object' + self.properties['Output Grouping'] = output_grouping diff --git a/docker/test/integration/minifi/processors/SetRecordField.py b/docker/test/integration/minifi/processors/SetRecordField.py new file mode 100644 index 0000000000..71875b2a40 --- /dev/null +++ b/docker/test/integration/minifi/processors/SetRecordField.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.Processor import Processor + + +class SetRecordField(Processor): + def __init__(self, context): + super(SetRecordField, self).__init__( + context=context, + clazz='SetRecordField', + class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.', + properties={}, + schedule={'scheduling strategy': 'EVENT_DRIVEN'}, + auto_terminate=[]) diff --git a/docker/test/integration/resources/python/SetRecordField.py b/docker/test/integration/resources/python/SetRecordField.py new file mode 100644 index 0000000000..dbb3a2ae64 --- /dev/null +++ b/docker/test/integration/resources/python/SetRecordField.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.properties import PropertyDescriptor +from nifiapi.properties import StandardValidators +from nifiapi.properties import ExpressionLanguageScope +from nifiapi.recordtransform import RecordTransformResult +from nifiapi.recordtransform import RecordTransform + + +class SetRecordField(RecordTransform): + class Java: + implements = ['org.apache.nifi.python.processor.RecordTransform'] + + class ProcessorDetails: + version = '0.0.1-SNAPSHOT' + + def __init__(self, **kwargs): + super().__init__() + + def transform(self, context, record, schema, attributemap): + # Update dictionary based on the dynamic properties provided by user + for key in context.getProperties().keys(): + if not key.dynamic: + continue + + propname = key.name + record[propname] = context.getProperty(propname).evaluateAttributeExpressions(attributemap).getValue() + + # Determine the partition + if 'group' in record: + partition = {'group': record['group']} + else: + partition = None + + # Return the result + return RecordTransformResult(record=record, relationship='success', partition=partition) + + def getDynamicPropertyDescriptor(self, name): + return PropertyDescriptor( + name=name, + description="Specifies the value to set for the '" + name + "' field", + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + validators=[StandardValidators.ALWAYS_VALID] + ) diff --git a/extensions/python/PYTHON.md b/extensions/python/PYTHON.md index 2b0f7c0767..eb0289bfe1 100644 --- a/extensions/python/PYTHON.md +++ b/extensions/python/PYTHON.md @@ -155,14 +155,14 @@ The SentimentAnalysis processor will perform a Vader Sentiment Analysis. This re ## Using NiFi Python Processors -MiNiFi C++ supports the use of NiFi Python processors, that are inherited from the FlowFileTransform or the FlowFileSource base class. To use these processors, copy the Python processor module to the nifi_python_processors subdirectory of the python directory. By default, the python directory is ${minifi_root}/minifi-python. To see how to write NiFi Python processors, please refer to the Python Developer Guide under the [Apache NiFi documentation](https://nifi.apache.org/documentation/v2/). +MiNiFi C++ supports the use of NiFi Python processors, that are inherited from the FlowFileTransform, RecordTransform or the FlowFileSource base class. To use these processors, copy the Python processor module to the nifi_python_processors subdirectory of the python directory. By default, the python directory is ${minifi_root}/minifi-python. To see how to write NiFi Python processors, please refer to the Python Developer Guide under the [Apache NiFi documentation](https://nifi.apache.org/documentation/v2/). In the flow configuration these Python processors can be referenced by their fully qualified class name, which looks like this: org.apache.nifi.minifi.processors.nifi_python_processors... For example, the fully qualified class name of the PromptChatGPT processor implemented in the file nifi_python_processors/PromptChatGPT.py is org.apache.nifi.minifi.processors.nifi_python_processors.PromptChatGPT. If a processor is copied under a subdirectory, because it is part of a python submodule, the submodule name will be appended to the fully qualified class name. For example, if the QueryPinecone processor is implemented in the QueryPinecone.py file that is copied to nifi_python_processors/vectorstores/QueryPinecone.py, the fully qualified class name will be org.apache.nifi.minifi.processors.nifi_python_processors.vectorstores.QueryPinecone in the configuration file. **NOTE:** The name of the NiFi Python processor file should match the class name in the file, otherwise the processor will not be found. Due to some differences between the NiFi and MiNiFi C++ processors and implementation, there are some limitations using the NiFi Python processors: -- Record based processors are not yet supported in MiNiFi C++, so the NiFi Python processors inherited from RecordTransform are not supported. +- Schemas are not supported in MiNiFi C++, so the schemas are ignored and passed as None in the `transform` method in the RecordTransform NiFi Python processors. - There are some validators in NiFi that are not present in MiNiFi C++, so some property validations will be missing using the NiFi Python processors. - MiNiFi C++ only supports expression language with flow file attributes, so only FLOWFILE_ATTRIBUTES expression language scope is supported, otherwise the expression language will not be evaluated. - MiNiFi C++ does not support property dependencies, so the property dependencies will be ignored. If a property depends on another property, the property will not be required. diff --git a/extensions/python/PythonBindings.cpp b/extensions/python/PythonBindings.cpp index 6cd3d069dc..873188faf6 100644 --- a/extensions/python/PythonBindings.cpp +++ b/extensions/python/PythonBindings.cpp @@ -28,6 +28,8 @@ #include "types/PyStateManager.h" #include "types/PyDataConverter.h" #include "types/PySSLContextService.h" +#include "types/PyRecordSetReader.h" +#include "types/PyRecordSetWriter.h" namespace org::apache::nifi::minifi::extensions::python { extern "C" { @@ -62,7 +64,9 @@ PyInit_minifi_native(void) { std::make_pair(PyInputStream::typeObject(), "InputStream"), std::make_pair(PyOutputStream::typeObject(), "OutputStream"), std::make_pair(PyStateManager::typeObject(), "StateManager"), - std::make_pair(PySSLContextService::typeObject(), "SSLContextService") + std::make_pair(PySSLContextService::typeObject(), "SSLContextService"), + std::make_pair(PyRecordSetReader::typeObject(), "RecordSetReader"), + std::make_pair(PyRecordSetWriter::typeObject(), "RecordSetWriter") }); for (const auto& type : types) { diff --git a/extensions/python/PythonInterpreter.cpp b/extensions/python/PythonInterpreter.cpp index 267db16fd5..87345db68d 100644 --- a/extensions/python/PythonInterpreter.cpp +++ b/extensions/python/PythonInterpreter.cpp @@ -50,8 +50,9 @@ void initThreads() { #pragma warning(push) #pragma warning(disable: 4996) #endif - if (!PyEval_ThreadsInitialized()) + if (PyEval_ThreadsInitialized() == 0) { PyEval_InitThreads(); + } #if defined(__clang__) #pragma clang diagnostic pop #elif defined(__GNUC__) diff --git a/extensions/python/pythonprocessors/nifiapi/flowfilesource.py b/extensions/python/pythonprocessors/nifiapi/flowfilesource.py index be676d9ff4..74041d7804 100644 --- a/extensions/python/pythonprocessors/nifiapi/flowfilesource.py +++ b/extensions/python/pythonprocessors/nifiapi/flowfilesource.py @@ -40,12 +40,6 @@ def getAttributes(self): class FlowFileSource(ProcessorBase): - # These will be added through the python bindings using C API - logger = None - REL_SUCCESS = None - REL_FAILURE = None - REL_ORIGINAL = None - def onTrigger(self, context: ProcessContext, session: ProcessSession): context_proxy = ProcessContextProxy(context, self) try: diff --git a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py index be17a266e6..c72ebfb54c 100644 --- a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py +++ b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py @@ -41,12 +41,6 @@ def getAttributes(self): class FlowFileTransform(ProcessorBase): - # These will be added through the python bindings using C API - logger = None - REL_SUCCESS = None - REL_FAILURE = None - REL_ORIGINAL = None - def onTrigger(self, context: ProcessContext, session: ProcessSession): original_flow_file = session.get() if not original_flow_file: diff --git a/extensions/python/pythonprocessors/nifiapi/recordtransform.py b/extensions/python/pythonprocessors/nifiapi/recordtransform.py new file mode 100644 index 0000000000..9ff8367c26 --- /dev/null +++ b/extensions/python/pythonprocessors/nifiapi/recordtransform.py @@ -0,0 +1,152 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import traceback +import json +from abc import abstractmethod +from minifi_native import ProcessContext, ProcessSession, Processor +from .processorbase import ProcessorBase +from .properties import FlowFile as FlowFileProxy +from .properties import ProcessContext as ProcessContextProxy +from .properties import PropertyDescriptor + + +class __RecordTransformResult__: + def __init__(self, processor_result, recordJson): + self.processor_result = processor_result + self.recordJson = recordJson + + def getRecordJson(self): + return self.recordJson + + def getSchema(self): + return self.processor_result.schema + + def getRelationship(self): + return self.processor_result.relationship + + def getPartition(self): + return self.processor_result.partition + + +class RecordTransformResult: + def __init__(self, record=None, schema=None, relationship="success", partition=None): + self.record = record + self.schema = schema + self.relationship = relationship + self.partition = partition + + def getRecord(self): + return self.record + + def getSchema(self): + return self.schema + + def getRelationship(self): + return self.relationship + + def getPartition(self): + return self.partition + + +class RecordTransform(ProcessorBase): + RECORD_READER = PropertyDescriptor( + name='Record Reader', + display_name='Record Reader', + description='''Specifies the Controller Service to use for reading incoming data''', + required=True, + controller_service_definition='RecordSetReader' + ) + RECORD_WRITER = PropertyDescriptor( + name='Record Writer', + display_name='Record Writer', + description='''Specifies the Controller Service to use for writing out the records''', + required=True, + controller_service_definition='RecordSetWriter', + ) + + def onInitialize(self, processor: Processor): + super(RecordTransform, self).onInitialize(processor) + processor.addProperty(self.RECORD_READER.name, self.RECORD_READER.description, None, self.RECORD_READER.required, False, False, None, self.RECORD_READER.controllerServiceDefinition) + processor.addProperty(self.RECORD_WRITER.name, self.RECORD_WRITER.description, None, self.RECORD_WRITER.required, False, False, None, self.RECORD_WRITER.controllerServiceDefinition) + + def onTrigger(self, context: ProcessContext, session: ProcessSession): + flow_file = session.get() + if not flow_file: + return + + context_proxy = ProcessContextProxy(context, self) + record_reader = context_proxy.getProperty(self.RECORD_READER).asControllerService() + if not record_reader: + self.logger.error("Record Reader property is invalid") + session.transfer(flow_file, self.REL_FAILURE) + return + record_writer = context_proxy.getProperty(self.RECORD_WRITER).asControllerService() + if not record_writer: + self.logger.error("Record Writer property is invalid") + session.transfer(flow_file, self.REL_FAILURE) + return + + try: + record_list = record_reader.read(flow_file, session) + if record_list is None: + self.logger.error("Reading flow file records returned None") + session.transfer(flow_file, self.REL_FAILURE) + return + except Exception: + self.logger.error("Failed to read flow file records due to the following error:\n{}".format(traceback.format_exc())) + session.transfer(flow_file, self.REL_FAILURE) + return + + flow_file_proxy = FlowFileProxy(session, flow_file) + results = [] + for record in record_list: + record_json = json.loads(record) + try: + result = self.transform(context_proxy, record_json, None, flow_file_proxy) + result_record = result.getRecord() + resultjson = None if result_record is None else json.dumps(result_record) + results.append(__RecordTransformResult__(result, resultjson)) + except Exception: + self.logger.error("Failed to transform record due to the following error:\n{}".format(traceback.format_exc())) + session.transfer(flow_file, self.REL_FAILURE) + return + + partitions = [] + partitioned_results_list = [] + for result in results: + if result.getRecordJson() is None: + continue + record_partition = result.getPartition() + try: + partition_index = partitions.index(record_partition) + partitioned_results_list[partition_index].append(result) + except ValueError: + partitions.append(record_partition) + partitioned_results_list.append([result]) + + for single_partition_results in partitioned_results_list: + partitioned_flow_file = session.create(flow_file) + record_writer.write([result.getRecordJson() for result in single_partition_results], partitioned_flow_file, session) + if result.getRelationship() == "success": + session.transfer(partitioned_flow_file, self.REL_SUCCESS) + else: + session.transferToCustomRelationship(partitioned_flow_file, result.getRelationship()) + + session.transfer(flow_file, self.REL_ORIGINAL) + + @abstractmethod + def transform(self, context: ProcessContextProxy, flowFile: FlowFileProxy) -> RecordTransformResult: + pass diff --git a/extensions/python/types/PyProcessContext.cpp b/extensions/python/types/PyProcessContext.cpp index 5dd9628ff7..253b90208f 100644 --- a/extensions/python/types/PyProcessContext.cpp +++ b/extensions/python/types/PyProcessContext.cpp @@ -22,6 +22,8 @@ a * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #include "PyStateManager.h" #include "PyScriptFlowFile.h" #include "core/Processor.h" +#include "controllers/RecordSetReader.h" +#include "controllers/RecordSetWriter.h" extern "C" { namespace org::apache::nifi::minifi::extensions::python { @@ -127,6 +129,12 @@ PyObject* PyProcessContext::getControllerService(PyProcessContext* self, PyObjec if (controller_service_type_str == "SSLContextService") { auto ssl_ctx_service = std::dynamic_pointer_cast(controller_service); return object::returnReference(std::weak_ptr(ssl_ctx_service)); + } else if (controller_service_type_str == "RecordSetReader") { + auto record_set_reader = std::dynamic_pointer_cast(controller_service); + return object::returnReference(std::weak_ptr(record_set_reader)); + } else if (controller_service_type_str == "RecordSetWriter") { + auto record_set_writer = std::dynamic_pointer_cast(controller_service); + return object::returnReference(std::weak_ptr(record_set_writer)); } } diff --git a/extensions/python/types/PyProcessContext.h b/extensions/python/types/PyProcessContext.h index 00c6a04e69..119cfbc29f 100644 --- a/extensions/python/types/PyProcessContext.h +++ b/extensions/python/types/PyProcessContext.h @@ -20,6 +20,8 @@ #include "core/ProcessContext.h" #include "PySSLContextService.h" +#include "PyRecordSetReader.h" +#include "PyRecordSetWriter.h" #include "../PythonBindings.h" namespace org::apache::nifi::minifi::extensions::python { diff --git a/extensions/python/types/PyProcessSession.cpp b/extensions/python/types/PyProcessSession.cpp index 00b9bf8a5b..85e8d6dfb8 100644 --- a/extensions/python/types/PyProcessSession.cpp +++ b/extensions/python/types/PyProcessSession.cpp @@ -178,14 +178,24 @@ PyObject* PyProcessSessionObject::get(PyProcessSessionObject* self, PyObject*) { return object::returnReference(nullptr); } -PyObject* PyProcessSessionObject::create(PyProcessSessionObject* self, PyObject*) { +PyObject* PyProcessSessionObject::create(PyProcessSessionObject* self, PyObject* args) { auto session = self->process_session_.lock(); if (!session) { PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'"); return nullptr; } - if (auto flow_file = session->create(nullptr)) + std::shared_ptr parent_flow_file; + auto arg_size = PyTuple_Size(args); + if (arg_size > 0) { + PyObject* script_flow_file = nullptr; + if (!PyArg_ParseTuple(args, "O!", PyScriptFlowFile::typeObject(), &script_flow_file)) { + return nullptr; + } + parent_flow_file = reinterpret_cast(script_flow_file)->script_flow_file_.lock(); + } + + if (auto flow_file = session->create(parent_flow_file)) return object::returnReference(std::weak_ptr(flow_file)); return object::returnReference(nullptr); } diff --git a/extensions/python/types/PyProcessSession.h b/extensions/python/types/PyProcessSession.h index 1a88966793..0a83bb720b 100644 --- a/extensions/python/types/PyProcessSession.h +++ b/extensions/python/types/PyProcessSession.h @@ -39,6 +39,7 @@ class PyProcessSession { void remove(const std::shared_ptr& flow_file); std::string getContentsAsString(const std::shared_ptr& flow_file); void putAttribute(const std::shared_ptr& flow_file, std::string_view key, const std::string& value); + gsl::not_null getSession() const { return session_; } private: std::vector> flow_files_; diff --git a/extensions/python/types/PyRecordSetReader.cpp b/extensions/python/types/PyRecordSetReader.cpp new file mode 100644 index 0000000000..a0b89bfb3c --- /dev/null +++ b/extensions/python/types/PyRecordSetReader.cpp @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, +a * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PyRecordSetReader.h" +#include "rapidjson/writer.h" +#include "rapidjson/stream.h" + +#include "PyProcessSession.h" +#include "PyScriptFlowFile.h" + +extern "C" { +namespace org::apache::nifi::minifi::extensions::python { + +static PyMethodDef PyRecordSetReader_methods[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) + {"read", (PyCFunction) PyRecordSetReader::read, METH_VARARGS, nullptr}, + {} /* Sentinel */ +}; + +static PyType_Slot PyRecordSetReaderTypeSpecSlots[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) + {Py_tp_dealloc, reinterpret_cast(pythonAllocatedInstanceDealloc)}, + {Py_tp_init, reinterpret_cast(PyRecordSetReader::init)}, + {Py_tp_methods, reinterpret_cast(PyRecordSetReader_methods)}, + {Py_tp_new, reinterpret_cast(newPythonAllocatedInstance)}, + {} /* Sentinel */ +}; + +static PyType_Spec PyRecordSetReaderTypeSpec{ + .name = "minifi_native.RecordSetReader", + .basicsize = sizeof(PyRecordSetReader), + .itemsize = 0, + .flags = Py_TPFLAGS_DEFAULT, + .slots = PyRecordSetReaderTypeSpecSlots +}; + +int PyRecordSetReader::init(PyRecordSetReader* self, PyObject* args, PyObject*) { + PyObject* weak_ptr_capsule = nullptr; + if (!PyArg_ParseTuple(args, "O", &weak_ptr_capsule)) { + return -1; + } + + auto record_set_reader = PyCapsule_GetPointer(weak_ptr_capsule, HeldTypeName); + if (!record_set_reader) + return -1; + self->record_set_reader_ = *static_cast(record_set_reader); + return 0; +} + +PyObject* PyRecordSetReader::read(PyRecordSetReader* self, PyObject* args) { + auto record_set_reader = self->record_set_reader_.lock(); + if (!record_set_reader) { + PyErr_SetString(PyExc_AttributeError, "tried reading ssl context service outside 'on_trigger'"); + return nullptr; + } + + PyObject* script_flow_file = nullptr; + PyObject* py_session = nullptr; + if (!PyArg_ParseTuple(args, "O!O!", PyScriptFlowFile::typeObject(), &script_flow_file, PyProcessSessionObject::typeObject(), &py_session)) { + return nullptr; + } + + const auto flow_file = reinterpret_cast(script_flow_file)->script_flow_file_.lock(); + if (!flow_file) { + PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 'on_trigger'"); + return nullptr; + } + + const auto process_session = reinterpret_cast(py_session)->process_session_.lock(); + if (!process_session) { + PyErr_SetString(PyExc_AttributeError, "tried reading ProcessSession outside 'on_trigger'"); + return nullptr; + } + + auto read_result = record_set_reader->read(flow_file, *process_session->getSession()); + if (!read_result) { + std::string error_message = "failed to read record set with the following error: " + read_result.error().message(); + PyErr_SetString(PyExc_RuntimeError, error_message.c_str()); + return nullptr; + } + + auto records = OwnedList::create(); + for (const auto& record : read_result.value()) { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + record.toJson().Accept(writer); + records.append(std::string{buffer.GetString(), buffer.GetSize()}); + } + return object::returnReference(records); +} + +PyTypeObject* PyRecordSetReader::typeObject() { + static OwnedObject PyRecordSetReaderType{PyType_FromSpec(&PyRecordSetReaderTypeSpec)}; + return reinterpret_cast(PyRecordSetReaderType.get()); +} + +} // namespace org::apache::nifi::minifi::extensions::python +} // extern "C" diff --git a/extensions/python/types/PyRecordSetReader.h b/extensions/python/types/PyRecordSetReader.h new file mode 100644 index 0000000000..b0488ff5e6 --- /dev/null +++ b/extensions/python/types/PyRecordSetReader.h @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "controllers/RecordSetReader.h" +#include "../PythonBindings.h" + +namespace org::apache::nifi::minifi::extensions::python { + +struct PyRecordSetReader { + PyRecordSetReader() {} + using HeldType = std::weak_ptr; + static constexpr const char* HeldTypeName = "PyRecordSetReader::HeldType"; + + PyObject_HEAD + HeldType record_set_reader_; + + static int init(PyRecordSetReader* self, PyObject* args, PyObject* kwds); + + static PyObject* read(PyRecordSetReader* self, PyObject* args); + + static PyTypeObject* typeObject(); +}; + +namespace object { +template<> +struct Converter : public HolderTypeConverter {}; +} // namespace object +} // namespace org::apache::nifi::minifi::extensions::python diff --git a/extensions/python/types/PyRecordSetWriter.cpp b/extensions/python/types/PyRecordSetWriter.cpp new file mode 100644 index 0000000000..14ebee1d08 --- /dev/null +++ b/extensions/python/types/PyRecordSetWriter.cpp @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, +a * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PyRecordSetWriter.h" + +#include "PyScriptFlowFile.h" +#include "PyProcessSession.h" +#include "core/Record.h" +#include "rapidjson/document.h" + +extern "C" { +namespace org::apache::nifi::minifi::extensions::python { + +static PyMethodDef PyRecordSetWriter_methods[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) + {"write", (PyCFunction) PyRecordSetWriter::write, METH_VARARGS, nullptr}, + {} /* Sentinel */ +}; + +static PyType_Slot PyRecordSetWriterTypeSpecSlots[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) + {Py_tp_dealloc, reinterpret_cast(pythonAllocatedInstanceDealloc)}, + {Py_tp_init, reinterpret_cast(PyRecordSetWriter::init)}, + {Py_tp_methods, reinterpret_cast(PyRecordSetWriter_methods)}, + {Py_tp_new, reinterpret_cast(newPythonAllocatedInstance)}, + {} /* Sentinel */ +}; + +static PyType_Spec PyRecordSetWriterTypeSpec{ + .name = "minifi_native.RecordSetWriter", + .basicsize = sizeof(PyRecordSetWriter), + .itemsize = 0, + .flags = Py_TPFLAGS_DEFAULT, + .slots = PyRecordSetWriterTypeSpecSlots +}; + +int PyRecordSetWriter::init(PyRecordSetWriter* self, PyObject* args, PyObject*) { + PyObject* weak_ptr_capsule = nullptr; + if (!PyArg_ParseTuple(args, "O", &weak_ptr_capsule)) { + return -1; + } + + auto record_set_writer = PyCapsule_GetPointer(weak_ptr_capsule, HeldTypeName); + if (!record_set_writer) + return -1; + self->record_set_writer_ = *static_cast(record_set_writer); + return 0; +} + +PyObject* PyRecordSetWriter::write(PyRecordSetWriter* self, PyObject* args) { + auto record_set_writer = self->record_set_writer_.lock(); + if (!record_set_writer) { + PyErr_SetString(PyExc_AttributeError, "tried reading record set writer outside 'on_trigger'"); + return nullptr; + } + + PyObject* py_recordset = nullptr; + PyObject* script_flow_file = nullptr; + PyObject* py_session = nullptr; + if (!PyArg_ParseTuple(args, "OO!O!", &py_recordset, PyScriptFlowFile::typeObject(), &script_flow_file, PyProcessSessionObject::typeObject(), &py_session)) { + return nullptr; + } + + if (!py_recordset) { + PyErr_SetString(PyExc_AttributeError, "Recordset is invalid!"); + return nullptr; + } + + const auto flow_file = reinterpret_cast(script_flow_file)->script_flow_file_.lock(); + if (!flow_file) { + PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 'on_trigger'"); + return nullptr; + } + + const auto process_session = reinterpret_cast(py_session)->process_session_.lock(); + if (!process_session) { + PyErr_SetString(PyExc_AttributeError, "tried reading ProcessSession outside 'on_trigger'"); + return nullptr; + } + + auto record_list = BorrowedList(py_recordset); + std::vector record_set; + for (size_t i = 0; i < record_list.length(); ++i) { + auto record_json_str = BorrowedStr(record_list[i].get()).toUtf8String(); + rapidjson::Document document; + document.Parse<0>(record_json_str.c_str()); + record_set.push_back(core::Record::fromJson(document)); + } + + record_set_writer->write(record_set, flow_file, *process_session->getSession()); + Py_RETURN_NONE; +} + +PyTypeObject* PyRecordSetWriter::typeObject() { + static OwnedObject PyRecordSetWriterType{PyType_FromSpec(&PyRecordSetWriterTypeSpec)}; + return reinterpret_cast(PyRecordSetWriterType.get()); +} + +} // namespace org::apache::nifi::minifi::extensions::python +} // extern "C" diff --git a/extensions/python/types/PyRecordSetWriter.h b/extensions/python/types/PyRecordSetWriter.h new file mode 100644 index 0000000000..6611738744 --- /dev/null +++ b/extensions/python/types/PyRecordSetWriter.h @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "controllers/RecordSetWriter.h" +#include "../PythonBindings.h" + +namespace org::apache::nifi::minifi::extensions::python { + +struct PyRecordSetWriter { + PyRecordSetWriter() {} + using HeldType = std::weak_ptr; + static constexpr const char* HeldTypeName = "PyRecordSetWriter::HeldType"; + + PyObject_HEAD + HeldType record_set_writer_; + + static int init(PyRecordSetWriter* self, PyObject* args, PyObject* kwds); + + static PyObject* write(PyRecordSetWriter* self, PyObject* args); + + static PyTypeObject* typeObject(); +}; + +namespace object { +template<> +struct Converter : public HolderTypeConverter {}; +} // namespace object +} // namespace org::apache::nifi::minifi::extensions::python diff --git a/extensions/python/types/Types.h b/extensions/python/types/Types.h index 0c14039f23..54769b44a5 100644 --- a/extensions/python/types/Types.h +++ b/extensions/python/types/Types.h @@ -243,6 +243,15 @@ class List : public ReferenceHolder { : ReferenceHolder(object) { } + static OwnedList create() requires(reference_type == ReferenceType::OWNED) { + return OwnedList(PyList_New(0)); + } + + template + void append(T value) { + PyList_Append(this->ref_.get(), object::from(std::move(value)).releaseReference()); + } + size_t length() { return PyList_Size(this->ref_.get()); } diff --git a/extensions/standard-processors/tests/unit/RecordSetTests.cpp b/extensions/standard-processors/tests/unit/RecordSetTests.cpp new file mode 100644 index 0000000000..e33822b42b --- /dev/null +++ b/extensions/standard-processors/tests/unit/RecordSetTests.cpp @@ -0,0 +1,177 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License.c + */ + +#include +#include +#include + +#include "unit/Catch.h" +#include "unit/TestBase.h" +#include "unit/TestRecord.h" +#include "utils/TimeUtil.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::standard::test { + +TEST_CASE("Test JSON serialization of a RecordField") { + { + minifi::core::RecordField field{1}; + rapidjson::Document doc; + auto value = field.toJson(doc.GetAllocator()); + CHECK(value.GetUint64() == 1); + } + { + minifi::core::RecordField field{-1}; + rapidjson::Document doc; + auto value = field.toJson(doc.GetAllocator()); + CHECK(value.GetInt64() == -1); + } + { + minifi::core::RecordField field{false}; + rapidjson::Document doc; + auto value = field.toJson(doc.GetAllocator()); + CHECK(value.GetBool() == false); + } + { + minifi::core::RecordField field{'a'}; + rapidjson::Document doc; + auto value = field.toJson(doc.GetAllocator()); + CHECK(value.GetString() == std::string("a")); + } + { + minifi::core::RecordField field{1.2}; + rapidjson::Document doc; + auto value = field.toJson(doc.GetAllocator()); + CHECK(value.GetDouble() == 1.2); + } + { + minifi::core::RecordField field{std::string("hello")}; + rapidjson::Document doc; + auto value = field.toJson(doc.GetAllocator()); + CHECK(value.GetString() == std::string("hello")); + } + { + std::chrono::time_point test_time = utils::timeutils::parseDateTimeStr("2021-09-01T12:34:56Z").value(); + minifi::core::RecordField field{test_time}; + rapidjson::Document doc; + auto value = field.toJson(doc.GetAllocator()); + CHECK(value.GetString() == std::string("2021-09-01T12:34:56Z")); + } + { + minifi::core::RecordObject obj; + obj.emplace("key1", std::make_unique(1)); + obj.emplace("key2", std::make_unique(std::string("hello"))); + minifi::core::RecordField field{std::move(obj)}; + rapidjson::Document doc; + auto value = field.toJson(doc.GetAllocator()); + CHECK(value["key1"].GetUint64() == 1); + CHECK(value["key2"].GetString() == std::string("hello")); + } + + { + minifi::core::RecordField field1{-1}; + minifi::core::RecordField field2{true}; + std::vector arr; + arr.push_back(std::move(field1)); + arr.push_back(std::move(field2)); + minifi::core::RecordField array_field{std::move(arr)}; + rapidjson::Document doc; + auto value = array_field.toJson(doc.GetAllocator()); + CHECK(value.GetArray()[0].GetInt64() == -1); + CHECK(value.GetArray()[1].GetBool() == true); + } +} + +TEST_CASE("Test JSON serialization of a Record") { + minifi::core::Record record; + record.emplace("key1", minifi::core::RecordField{1}); + record.emplace("key2", minifi::core::RecordField{std::string("hello")}); + record.emplace("key3", minifi::core::RecordField{true}); + record.emplace("key4", minifi::core::RecordField{1.2}); + std::chrono::time_point test_time = utils::timeutils::parseDateTimeStr("2021-09-01T12:34:56Z").value(); + record.emplace("key5", minifi::core::RecordField{test_time}); + + minifi::core::RecordField field1{-1}; + minifi::core::RecordField field2{true}; + std::vector arr; + arr.push_back(std::move(field1)); + arr.push_back(std::move(field2)); + record.emplace("key6", minifi::core::RecordField{std::move(arr)}); + + minifi::core::RecordObject subobj; + subobj.emplace("subkey3", std::make_unique(1)); + subobj.emplace("subkey4", std::make_unique(std::string("subhello"))); + minifi::core::RecordObject obj; + obj.emplace("subkey1", std::make_unique(-2)); + obj.emplace("subkey2", std::make_unique(std::move(subobj))); + record.emplace("key7", minifi::core::RecordField{std::move(obj)}); + + rapidjson::Document doc = record.toJson(); + CHECK(doc["key1"].GetUint64() == 1); + CHECK(doc["key2"].GetString() == std::string("hello")); + CHECK(doc["key3"].GetBool() == true); + CHECK(doc["key4"].GetDouble() == 1.2); + CHECK(doc["key5"].GetString() == std::string("2021-09-01T12:34:56Z")); + CHECK(doc["key6"].GetArray()[0] == -1); + CHECK(doc["key6"].GetArray()[1] == true); + CHECK(doc["key7"]["subkey1"].GetInt64() == -2); + CHECK(doc["key7"]["subkey2"]["subkey3"].GetUint64() == 1); + CHECK(doc["key7"]["subkey2"]["subkey4"].GetString() == std::string("subhello")); +} + +TEST_CASE("Test Record deserialization from JSON") { + std::string json_str = R"( +{ + "number": 1, + "string": "hello", + "bool": false, + "double": 1.2, + "array": [1.1, false], + "time_point": "2021-09-01T12:34:56Z", + "obj": { + "number2": 2, + "message": "mymessage" + } +} + )"; + rapidjson::Document doc; + doc.Parse<0>(json_str); + auto record = minifi::core::Record::fromJson(doc); + CHECK(record.at("number") == minifi::core::RecordField{1}); + CHECK(record.at("string") == minifi::core::RecordField{std::string("hello")}); + CHECK(record.at("bool") == minifi::core::RecordField{false}); + CHECK(record.at("double") == minifi::core::RecordField{1.2}); + std::chrono::time_point test_time = utils::timeutils::parseDateTimeStr("2021-09-01T12:34:56Z").value(); + CHECK(record.at("time_point") == minifi::core::RecordField{test_time}); + + minifi::core::RecordObject subobj; + subobj.emplace("number2", std::make_unique(2)); + subobj.emplace("message", std::make_unique(std::string("mymessage"))); + minifi::core::RecordField obj_field{std::move(subobj)}; + CHECK(record.at("obj") == obj_field); + + minifi::core::RecordField field1{1.1}; + minifi::core::RecordField field2{false}; + std::vector arr; + arr.push_back(std::move(field1)); + arr.push_back(std::move(field2)); + minifi::core::RecordField array_field{std::move(arr)}; + CHECK(record.at("array") == array_field); +} + +} // namespace org::apache::nifi::minifi::standard::test diff --git a/libminifi/include/core/Record.h b/libminifi/include/core/Record.h index 7e2a5a89df..684da33acf 100644 --- a/libminifi/include/core/Record.h +++ b/libminifi/include/core/Record.h @@ -23,8 +23,9 @@ #include "RecordField.h" -namespace org::apache::nifi::minifi::core { +#undef GetObject // windows.h #defines GetObject = GetObjectA or GetObjectW, which conflicts with rapidjson +namespace org::apache::nifi::minifi::core { class Record final { public: @@ -55,6 +56,25 @@ class Record final { bool operator==(const Record& rhs) const = default; + rapidjson::Document toJson() const { + rapidjson::Document doc; + auto& alloc = doc.GetAllocator(); + rapidjson::Value obj(rapidjson::kObjectType); + for (const auto& [key, field] : fields_) { + obj.AddMember(rapidjson::Value(key.c_str(), alloc), field.toJson(alloc), alloc); + } + doc.Swap(obj); + return doc; + } + + static Record fromJson(const rapidjson::Document& document) { + Record record; + for (const auto& member : document.GetObject()) { + record.emplace(member.name.GetString(), RecordField::fromJson(member.value)); + } + return record; + } + private: std::unordered_map fields_; }; diff --git a/libminifi/include/core/RecordField.h b/libminifi/include/core/RecordField.h index ae89e67c2c..ca6bf2e714 100644 --- a/libminifi/include/core/RecordField.h +++ b/libminifi/include/core/RecordField.h @@ -24,6 +24,7 @@ #include #include #include +#include "rapidjson/document.h" namespace org::apache::nifi::minifi::core { @@ -78,6 +79,8 @@ struct RecordField { ~RecordField() = default; + rapidjson::Value toJson(rapidjson::Document::AllocatorType& alloc) const; + static RecordField fromJson(const rapidjson::Value& value); bool operator==(const RecordField& rhs) const = default; diff --git a/libminifi/src/core/RecordField.cpp b/libminifi/src/core/RecordField.cpp new file mode 100644 index 0000000000..0da1818d9a --- /dev/null +++ b/libminifi/src/core/RecordField.cpp @@ -0,0 +1,99 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "core/RecordField.h" + +#include "utils/GeneralUtils.h" +#include "utils/TimeUtil.h" + +namespace org::apache::nifi::minifi::core { + +rapidjson::Value RecordField::toJson(rapidjson::Document::AllocatorType& allocator) const { + rapidjson::Value value; + std::visit(utils::overloaded { + [&value, &allocator](const std::string& str) { + value.SetString(str.c_str(), allocator); + }, + [&value](int64_t i64) { + value.SetInt64(i64); + }, + [&value](uint64_t u64) { + value.SetUint64(u64); + }, + [&value](double d) { + value.SetDouble(d); + }, + [&value](bool b) { + value.SetBool(b); + }, + [&value, &allocator](const std::chrono::system_clock::time_point& time_point) { + value.SetString(utils::timeutils::getDateTimeStr(std::chrono::time_point_cast(time_point)).c_str(), allocator); + }, + [&value, &allocator](const RecordArray& arr) { + value.SetArray(); + for (const auto& elem : arr) { + rapidjson::Value elem_value = elem.toJson(allocator); + value.PushBack(elem_value, allocator); + } + }, + [&value, &allocator](const RecordObject& obj) { + value.SetObject(); + for (const auto& [key, boxed_field] : obj) { + rapidjson::Value keyValue; + keyValue.SetString(key.c_str(), allocator); + + rapidjson::Value fieldValue = boxed_field.field->toJson(allocator); + value.AddMember(keyValue, fieldValue, allocator); + } + } + }, value_); + + return value; +} + +RecordField RecordField::fromJson(const rapidjson::Value& value) { + if (value.IsString()) { + std::string str_value = value.GetString(); + if (auto test_time = utils::timeutils::parseDateTimeStr(str_value)) { + return RecordField(std::chrono::time_point{*test_time}); + } + return RecordField(str_value); + } else if (value.IsInt64()) { + return RecordField(value.GetInt64()); + } else if (value.IsUint64()) { + return RecordField(value.GetUint64()); + } else if (value.IsDouble()) { + return RecordField(value.GetDouble()); + } else if (value.IsBool()) { + return RecordField(value.GetBool()); + } else if (value.IsArray()) { + RecordArray arr; + for (const auto& elem : value.GetArray()) { + arr.push_back(RecordField::fromJson(elem)); + } + return RecordField(std::move(arr)); + } else if (value.IsObject()) { + RecordObject obj; + for (const auto& member : value.GetObject()) { + obj.emplace(member.name.GetString(), BoxedRecordField(std::make_unique(RecordField::fromJson(member.value)))); + } + return RecordField(std::move(obj)); + } else { + throw std::runtime_error("Invalid JSON value type"); + } +} + +} // namespace org::apache::nifi::minifi::core