diff --git a/python/morpheus/morpheus/_lib/common/__init__.pyi b/python/morpheus/morpheus/_lib/common/__init__.pyi index 7e11e81ccd..38f3c5fd66 100644 --- a/python/morpheus/morpheus/_lib/common/__init__.pyi +++ b/python/morpheus/morpheus/_lib/common/__init__.pyi @@ -110,7 +110,7 @@ class FilterSource(): __members__: dict # value = {'Auto': , 'TENSOR': , 'DATAFRAME': } pass class HttpEndpoint(): - def __init__(self, py_parse_fn: function, url: str, method: str) -> None: ... + def __init__(self, py_parse_fn: function, url: str, method: str, include_headers: bool = False) -> None: ... pass class HttpServer(): def __enter__(self) -> HttpServer: ... diff --git a/python/morpheus/morpheus/_lib/common/module.cpp b/python/morpheus/morpheus/_lib/common/module.cpp index b21ad3cfe2..cef1b250c7 100644 --- a/python/morpheus/morpheus/_lib/common/module.cpp +++ b/python/morpheus/morpheus/_lib/common/module.cpp @@ -153,7 +153,11 @@ PYBIND11_MODULE(common, _module) .value("DATAFRAME", FilterSource::DATAFRAME); py::class_>(_module, "HttpEndpoint") - .def(py::init<>(&HttpEndpointInterfaceProxy::init), py::arg("py_parse_fn"), py::arg("url"), py::arg("method")); + .def(py::init<>(&HttpEndpointInterfaceProxy::init), + py::arg("py_parse_fn"), + py::arg("url"), + py::arg("method"), + py::arg("include_headers") = false); py::class_>(_module, "HttpServer") .def(py::init<>(&HttpServerInterfaceProxy::init), diff --git a/python/morpheus/morpheus/_lib/include/morpheus/messages/control.hpp b/python/morpheus/morpheus/_lib/include/morpheus/messages/control.hpp index 2565bf4874..05e098a2c5 100644 --- a/python/morpheus/morpheus/_lib/include/morpheus/messages/control.hpp +++ b/python/morpheus/morpheus/_lib/include/morpheus/messages/control.hpp @@ -39,128 +39,6 @@ enum class MORPHEUS_EXPORT ControlMessageType TRAINING }; -// class PayloadManager -// { -// public: -// /** -// * @brief Get the tensor object identified by `name` -// * -// * @param name -// * @return TensorObject& -// * @throws std::runtime_error If no tensor matching `name` exists -// */ -// TensorObject& get_tensor(const std::string& name) -// { -// return m_tensors->get_tensor(name); -// } - -// /** -// * @brief Get the tensor object identified by `name` -// * -// * @param name -// * @return const TensorObject& -// * @throws std::runtime_error If no tensor matching `name` exists -// */ -// const TensorObject& get_tensor(const std::string& name) const -// { -// return m_tensors->get_tensor(name); -// } - -// /** -// * @brief Set the tensor object identified by `name` -// * -// * @param name -// * @param tensor -// * @throws std::length_error If the number of rows in `tensor` does not match `count`. -// */ -// void set_tensor(const std::string& name, TensorObject&& tensor) -// { -// m_tensors->set_tensor(name, std::move(tensor)); -// } - -// /** -// * @brief Get a reference to the internal tensors map -// * -// * @return const TensorMap& -// */ -// const TensorMap& get_tensors() const -// { -// return m_tensors->get_tensors(); -// } - -// /** -// * @brief Set the tensors object -// * -// * @param tensors -// * @throws std::length_error If the number of rows in the `tensors` do not match `count`. -// */ -// void set_tensors(TensorMap&& tensors) -// { -// m_tensors->set_tensors(std::move(tensors)); -// } - -// /** -// * @brief Get the tensor object identified by `name` -// * -// * @param name -// * @return TensorObject& -// * @throws std::runtime_error If no tensor matching `name` exists -// */ -// TensorObject& get_column(const std::string& name) -// { -// return m_tensors->get_tensor(name); -// } - -// /** -// * @brief Get the tensor object identified by `name` -// * -// * @param name -// * @return const TensorObject& -// * @throws std::runtime_error If no tensor matching `name` exists -// */ -// const TensorObject& get_column(const std::string& name) const -// { -// return m_tensors->get_tensor(name); -// } - -// /** -// * @brief Set the tensor object identified by `name` -// * -// * @param name -// * @param tensor -// * @throws std::length_error If the number of rows in `tensor` does not match `count`. -// */ -// void set_column(const std::string& name, TensorObject&& tensor) -// { -// m_tensors->set_tensor(name, std::move(tensor)); -// } - -// /** -// * @brief Get a reference to the internal tensors map -// * -// * @return const TensorMap& -// */ -// TableInfo get_columns() const -// { -// return m_df->get_info(); -// } - -// /** -// * @brief Set the tensors object -// * -// * @param tensors -// * @throws std::length_error If the number of rows in the `tensors` do not match `count`. -// */ -// void set_columns(TableInfo&& tensors) -// { -// m_tensors->set_tensors(std::move(tensors)); -// } - -// private: -// std::shared_ptr m_df; -// std::shared_ptr m_tensors; -// }; - class MORPHEUS_EXPORT TensorMemory; // System-clock for better compatibility with pybind11/chrono @@ -178,6 +56,7 @@ class MORPHEUS_EXPORT ControlMessage public: ControlMessage(); explicit ControlMessage(const morpheus::utilities::json_t& config); + explicit ControlMessage(const std::shared_ptr& payload, const morpheus::utilities::json_t& config); ControlMessage(const ControlMessage& other); // Copies config and metadata, but not payload @@ -387,6 +266,14 @@ struct MORPHEUS_EXPORT ControlMessageProxy */ static std::shared_ptr create(pybind11::dict& config); + /** + * @brief Creates a new ControlMessage instance from a Python MessageMeta and a configuration dictionary. + * @param meta Python instance of MessageMeta + * @param config A pybind11::dict representing the configuration for the ControlMessage. + * @return A shared_ptr to a newly created ControlMessage instance. + */ + static std::shared_ptr create(const pybind11::object& meta, pybind11::dict& config); + /** * @brief Creates a new ControlMessage instance as a copy of an existing one. * @param other A shared_ptr to another ControlMessage instance to copy. diff --git a/python/morpheus/morpheus/_lib/include/morpheus/stages/deserialize.hpp b/python/morpheus/morpheus/_lib/include/morpheus/stages/deserialize.hpp index 2a7f62403f..24059b3f89 100644 --- a/python/morpheus/morpheus/_lib/include/morpheus/stages/deserialize.hpp +++ b/python/morpheus/morpheus/_lib/include/morpheus/stages/deserialize.hpp @@ -22,6 +22,7 @@ #include "morpheus/messages/meta.hpp" #include "morpheus/messages/multi.hpp" #include "morpheus/types.hpp" // for TensorIndex +#include "morpheus/utilities/json_types.hpp" // for control_message_task_t #include "morpheus/utilities/python_util.hpp" // for show_warning_message #include "morpheus/utilities/string_util.hpp" // for MORPHEUS_CONCAT_STR @@ -50,18 +51,16 @@ namespace morpheus { * @file */ -using cm_task_t = std::pair; - void make_output_message(std::shared_ptr& incoming_message, TensorIndex start, TensorIndex stop, - cm_task_t* task, + control_message_task_t* task, std::shared_ptr& windowed_message); void make_output_message(std::shared_ptr& incoming_message, TensorIndex start, TensorIndex stop, - cm_task_t* task, + control_message_task_t* task, std::shared_ptr& windowed_message); /****** DeserializationStage********************************/ @@ -83,8 +82,8 @@ class MORPHEUS_EXPORT DeserializeStage * @param task Optional task to be added to all outgoing `ControlMessage`s, ignored when `OutputT` is `MultiMessage` */ DeserializeStage(TensorIndex batch_size, - bool ensure_sliceable_index = true, - std::unique_ptr task = nullptr) : + bool ensure_sliceable_index = true, + std::unique_ptr task = nullptr) : base_t(base_t::op_factory_from_sub_fn(build_operator())), m_batch_size(batch_size), m_ensure_sliceable_index(ensure_sliceable_index), @@ -95,7 +94,7 @@ class MORPHEUS_EXPORT DeserializeStage TensorIndex m_batch_size; bool m_ensure_sliceable_index{true}; - std::unique_ptr m_task{nullptr}; + std::unique_ptr m_task{nullptr}; }; /****** DeserializationStageInterfaceProxy******************/ diff --git a/python/morpheus/morpheus/_lib/include/morpheus/stages/http_server_source_stage.hpp b/python/morpheus/morpheus/_lib/include/morpheus/stages/http_server_source_stage.hpp index 95167f8b9b..ea2ab50eed 100644 --- a/python/morpheus/morpheus/_lib/include/morpheus/stages/http_server_source_stage.hpp +++ b/python/morpheus/morpheus/_lib/include/morpheus/stages/http_server_source_stage.hpp @@ -18,31 +18,62 @@ #pragma once #include "morpheus/export.h" // for exporting symbols +#include "morpheus/messages/control.hpp" // for ControlMessage #include "morpheus/messages/meta.hpp" // for MessageMeta #include "morpheus/utilities/http_server.hpp" // for HttpServer +#include "morpheus/utilities/json_types.hpp" // for control_message_task_t & json_t -#include // for buffered_channel -#include // for context -#include // for table_with_metadata -#include // for segment::Builder -#include // for segment::Object -#include // for PythonSource -#include // for subscriber - -#include // for atomic -#include // for duration -#include // for size_t -#include // for int64_t -#include // for shared_ptr & unique_ptr -#include // for string & to_string +#include // for int_to_status, status +#include // for buffered_channel +#include // for channel_op_status +#include // for sleep_for +#include // for json_reader_options & read_json +#include // for table_with_metadata +#include // for CHECK & LOG +#include // for segment::Builder +#include // for segment::Object +#include // for json +#include // for pybind11::object +#include // for PythonSource +#include // for subscriber + +#include // for atomic +#include // for duration +#include // for size_t +#include // for int64_t +#include // for std::exception +#include // for shared_ptr & unique_ptr +#include // needed by GLOG +#include // for std::runtime_error +#include // for string & to_string +#include // for make_tuple +#include // for std::move & pair +#include // for vector // IWYU thinks we're using thread::operator<< // IWYU pragma: no_include +// IWYU thinks we need the http.hpp header for int_to_status, but it's defined in status.hpp +// IWYU pragma: no_include namespace morpheus { -using table_t = std::unique_ptr; +using table_with_http_fields_t = std::pair; +using table_t = std::unique_ptr; + using request_queue_t = boost::fibers::buffered_channel; -/****** Component public implementations *******************/ +class SourceStageStopAfter : public std::exception +{}; + +// Per type overloads for producing the output message +void make_output_message(std::shared_ptr& incoming_message, + control_message_task_t* task, + morpheus::utilities::json_t&& http_fields, + std::shared_ptr& out_message); + +void make_output_message(std::shared_ptr& incoming_message, + control_message_task_t* task, + morpheus::utilities::json_t&& http_fields, + std::shared_ptr& out_message); + /****** HttpServerSourceStage *************************************/ /** @@ -50,34 +81,60 @@ using request_queue_t = boost::fibers::buffered_channel; * @{ * @file */ - -// TODO(dagardner): optionally add headers to the dataframe - -class MORPHEUS_EXPORT HttpServerSourceStage : public mrc::pymrc::PythonSource> +template +class MORPHEUS_EXPORT HttpServerSourceStage : public mrc::pymrc::PythonSource> { public: - using base_t = mrc::pymrc::PythonSource>; + using base_t = mrc::pymrc::PythonSource>; using typename base_t::source_type_t; using typename base_t::subscriber_fn_t; - HttpServerSourceStage(std::string bind_address = "127.0.0.1", - unsigned short port = 8080, - std::string endpoint = "/message", - std::string live_endpoint = "/live", - std::string ready_endpoint = "/ready", - std::string method = "POST", - std::string live_method = "GET", - std::string ready_method = "GET", - unsigned accept_status = 201, - float sleep_time = 0.1f, - long queue_timeout = 5, - std::size_t max_queue_size = 1024, - unsigned short num_server_threads = 1, - std::size_t max_payload_size = DefaultMaxPayloadSize, - std::chrono::seconds request_timeout = std::chrono::seconds(30), - bool lines = false, - std::size_t stop_after = 0); - ~HttpServerSourceStage() override; + /** + * @brief Constructor for the HttpServerSourceStage + * + * @param bind_address The IP address to bind the server to + * @param port The TCP port to bind the server to + * @param endpoint The endpoint to listen for messages on + * @param live_endpoint The endpoint to check if the server is running + * @param ready_endpoint The endpoint to check if the server is ready to accept messages + * @param method The HTTP method to accept requests on the `endpoint` + * @param live_method The HTTP method to accept requests on the `live_endpoint` + * @param ready_method The HTTP method accept requests on the `ready_endpoint` + * @param accept_status The HTTP status code to return when a message is accepted + * @param sleep_time The time to sleep when the queue is empty + * @param queue_timeout The time to wait for the queue to accept a message + * @param max_queue_size The maximum number of messages to queue prior to blocking incoming requests + * @param num_server_threads The number of threads to run the server on + * @param max_payload_size The maximum size of the payload + * @param request_timeout The time to wait for a request to complete + * @param lines If `false`, the HTTP server will expect each request to be a JSON array of objects. If `true`, the + * HTTP server will expect each request to be a JSON object per line. + * @param stop_after The number of records to emit before stopping. Useful for testing, disabled if `0`. + * @param task When `OutputT=ControlMessage`, optional task to be added to all outgoing messagess, triggers an + * assertion error for all other types. + */ + HttpServerSourceStage(std::string bind_address = "127.0.0.1", + unsigned short port = 8080, + std::string endpoint = "/message", + std::string live_endpoint = "/live", + std::string ready_endpoint = "/ready", + std::string method = "POST", + std::string live_method = "GET", + std::string ready_method = "GET", + unsigned accept_status = 201, + float sleep_time = 0.1f, + long queue_timeout = 5, + std::size_t max_queue_size = 1024, + unsigned short num_server_threads = 1, + std::size_t max_payload_size = DefaultMaxPayloadSize, + std::chrono::seconds request_timeout = std::chrono::seconds(30), + bool lines = false, + std::size_t stop_after = 0, + std::unique_ptr task = nullptr); + ~HttpServerSourceStage() override + { + close(); + } void close(); @@ -92,7 +149,8 @@ class MORPHEUS_EXPORT HttpServerSourceStage : public mrc::pymrc::PythonSource m_task{nullptr}; }; /****** HttpServerSourceStageInterfaceProxy***********************/ @@ -101,25 +159,330 @@ class MORPHEUS_EXPORT HttpServerSourceStage : public mrc::pymrc::PythonSource> init(mrc::segment::Builder& builder, - const std::string& name, - std::string bind_address, - unsigned short port, - std::string endpoint, - std::string live_endpoint, - std::string ready_endpoint, - std::string method, - std::string live_method, - std::string ready_method, - unsigned accept_status, - float sleep_time, - long queue_timeout, - std::size_t max_queue_size, - unsigned short num_server_threads, - std::size_t max_payload_size, - int64_t request_timeout, - bool lines, - std::size_t stop_after); + /** + * @brief Create and initialize a HttpServerSourceStage that emits MessageMeta's, and return the result + * + * @param builder : Pipeline context object reference + * @param name : Name of a stage reference + * @param bind_address The IP address to bind the server to + * @param port The TCP port to bind the server to + * @param endpoint The endpoint to listen for messages on + * @param live_endpoint The endpoint to check if the server is running + * @param ready_endpoint The endpoint to check if the server is ready to accept messages + * @param method The HTTP method to accept requests on the `endpoint` + * @param live_method The HTTP method to accept requests on the `live_endpoint` + * @param ready_method The HTTP method accept requests on the `ready_endpoint` + * @param accept_status The HTTP status code to return when a message is accepted + * @param sleep_time The time to sleep when the queue is empty + * @param queue_timeout The time to wait for the queue to accept a message + * @param max_queue_size The maximum number of messages to queue prior to blocking incoming requests + * @param num_server_threads The number of threads to run the server on + * @param max_payload_size The maximum size of the payload + * @param request_timeout The time to wait for a request to complete + * @param lines If `False`, the HTTP server will expect each request to be a JSON array of objects. If `True`, the + * HTTP server will expect each request to be a JSON object per line. + * @param stop_after The number of records to emit before stopping. Useful for testing, disabled if `0`. + */ + static std::shared_ptr>> init_meta( + mrc::segment::Builder& builder, + const std::string& name, + std::string bind_address, + unsigned short port, + std::string endpoint, + std::string live_endpoint, + std::string ready_endpoint, + std::string method, + std::string live_method, + std::string ready_method, + unsigned accept_status, + float sleep_time, + long queue_timeout, + std::size_t max_queue_size, + unsigned short num_server_threads, + std::size_t max_payload_size, + int64_t request_timeout, + bool lines, + std::size_t stop_after); + + /** + * @brief Create and initialize a HttpServerSourceStage that emits ControlMessage's, and return the result + * + * @param builder : Pipeline context object reference + * @param name : Name of a stage reference + * @param bind_address The IP address to bind the server to + * @param port The TCP port to bind the server to + * @param endpoint The endpoint to listen for messages on + * @param live_endpoint The endpoint to check if the server is running + * @param ready_endpoint The endpoint to check if the server is ready to accept messages + * @param method The HTTP method to accept requests on the `endpoint` + * @param live_method The HTTP method to accept requests on the `live_endpoint` + * @param ready_method The HTTP method accept requests on the `ready_endpoint` + * @param accept_status The HTTP status code to return when a message is accepted + * @param sleep_time The time to sleep when the queue is empty + * @param queue_timeout The time to wait for the queue to accept a message + * @param max_queue_size The maximum number of messages to queue prior to blocking incoming requests + * @param num_server_threads The number of threads to run the server on + * @param max_payload_size The maximum size of the payload + * @param request_timeout The time to wait for a request to complete + * @param lines If `False`, the HTTP server will expect each request to be a JSON array of objects. If `True`, the + * HTTP server will expect each request to be a JSON object per line. + * @param stop_after The number of records to emit before stopping. Useful for testing, disabled if `0`. + * @param task_type Optional task type to be added to all outgoing messages. When not `None`, then `task_payload` + * must also be not `None`, and vice versa. + * @param task_payload Optional json object describing the task to be added to all outgoing messages. When not + * `None`, then `task_type` must also be not `None`, and vice versa. + */ + static std::shared_ptr>> init_cm( + mrc::segment::Builder& builder, + const std::string& name, + std::string bind_address, + unsigned short port, + std::string endpoint, + std::string live_endpoint, + std::string ready_endpoint, + std::string method, + std::string live_method, + std::string ready_method, + unsigned accept_status, + float sleep_time, + long queue_timeout, + std::size_t max_queue_size, + unsigned short num_server_threads, + std::size_t max_payload_size, + int64_t request_timeout, + bool lines, + std::size_t stop_after, + const pybind11::object& task_type, + const pybind11::object& task_payload); }; + +template +HttpServerSourceStage::HttpServerSourceStage(std::string bind_address, + unsigned short port, + std::string endpoint, + std::string live_endpoint, + std::string ready_endpoint, + std::string method, + std::string live_method, + std::string ready_method, + unsigned accept_status, + float sleep_time, + long queue_timeout, + std::size_t max_queue_size, + unsigned short num_server_threads, + std::size_t max_payload_size, + std::chrono::seconds request_timeout, + bool lines, + std::size_t stop_after, + std::unique_ptr task) : + base_t(build()), + m_max_queue_size{max_queue_size}, + m_sleep_time{std::chrono::milliseconds(static_cast(sleep_time))}, + m_queue_timeout{queue_timeout}, + m_queue{max_queue_size}, + m_stop_after{stop_after}, + m_task{std::move(task)} +{ + CHECK(boost::beast::http::int_to_status(accept_status) != boost::beast::http::status::unknown) + << "Invalid HTTP status code: " << accept_status; + + request_handler_fn_t parser = [this, accept_status, lines](const tcp_endpoint_t& tcp_endpoint, + const request_t& request) { + // This function is called from one of the HTTPServer's worker threads, avoid performing any additional work + // here beyond what is strictly nessary to return a valid response to the client. We parse the payload here, + // that way we can return an appropriate error message if the payload is invalid however we stop avoid + // constructing a MessageMeta object since that would require grabbing the Python GIL, instead we push the + // libcudf table to the queue and let the subscriber handle the conversion to MessageMeta. + table_t table{nullptr}; + + try + { + std::string body{request.body()}; + cudf::io::source_info source{body.c_str(), body.size()}; + auto options = cudf::io::json_reader_options::builder(source).lines(lines); + auto cudf_table = cudf::io::read_json(options.build()); + + auto http_fields = request_headers_to_json(tcp_endpoint, request); + table = std::make_unique(std::move(cudf_table), std::move(http_fields)); + } catch (const std::exception& e) + { + // We want to log the exception locally, but we don't want to include the exception message in the response + // since that may leak sensitive information + std::string error_msg = "Error occurred converting HTTP payload to Dataframe"; + LOG(ERROR) << error_msg << ": " << e.what(); + return std::make_tuple(400u, "text/plain", error_msg, nullptr); + } + + try + { + // NOLINTNEXTLINE(clang-diagnostic-unused-value) + DCHECK_NOTNULL(table); + auto queue_status = m_queue.push_wait_for(std::move(table), m_queue_timeout); + + if (queue_status == boost::fibers::channel_op_status::success) + { + ++m_queue_cnt; + return std::make_tuple(accept_status, "text/plain", std::string(), nullptr); + } + + std::string error_msg = "HTTP payload queue is "; + switch (queue_status) + { + case boost::fibers::channel_op_status::full: + case boost::fibers::channel_op_status::timeout: { + error_msg += "full"; + break; + } + + case boost::fibers::channel_op_status::closed: { + error_msg += "closed"; + break; + } + default: { + error_msg += "in an unknown state"; + break; + } + } + + return std::make_tuple(503u, "text/plain", std::move(error_msg), nullptr); + } catch (const std::exception& e) + { + // Refer above comment about not including exception messages in the response + std::string error_msg = "Error occurred while pushing payload to queue"; + LOG(ERROR) << error_msg << ": " << e.what(); + return std::make_tuple(500u, "text/plain", error_msg, nullptr); + } + }; + + request_handler_fn_t live_parser = [this](const tcp_endpoint_t& tcp_endpoint, const request_t& request) { + if (!m_server->is_running()) + { + std::string error_msg = "Source server is not running"; + return std::make_tuple(500u, "text/plain", error_msg, nullptr); + } + + return std::make_tuple(200u, "text/plain", std::string(), nullptr); + }; + + request_handler_fn_t ready_parser = [this](const tcp_endpoint_t& tcp_endpoint, const request_t& request) { + if (!m_server->is_running()) + { + std::string error_msg = "Source server is not running"; + return std::make_tuple(500u, "text/plain", error_msg, nullptr); + } + + if (m_queue_cnt < m_max_queue_size) + { + return std::make_tuple(200u, "text/plain", std::string(), nullptr); + } + + std::string error_msg = "HTTP payload queue is full or unavailable to accept new values"; + return std::make_tuple(503u, "text/plain", std::move(error_msg), nullptr); + }; + + std::vector endpoints; + endpoints.emplace_back(parser, std::move(endpoint), std::move(method)); + endpoints.emplace_back(live_parser, std::move(live_endpoint), std::move(live_method)); + endpoints.emplace_back(ready_parser, std::move(ready_endpoint), std::move(ready_method)); + + m_server = std::make_unique( + std::move(endpoints), std::move(bind_address), port, num_server_threads, max_payload_size, request_timeout); +} + +template +HttpServerSourceStage::subscriber_fn_t HttpServerSourceStage::build() +{ + return [this](rxcpp::subscriber subscriber) -> void { + try + { + m_server->start(); + this->source_generator(subscriber); + } catch (const SourceStageStopAfter& e) + { + DLOG(INFO) << "Completed after emitting " << m_records_emitted << " records"; + } catch (const std::exception& e) + { + LOG(ERROR) << "Encountered error while listening for incoming HTTP requests: " << e.what() << std::endl; + subscriber.on_error(std::make_exception_ptr(e)); + return; + } + subscriber.on_completed(); + this->close(); + }; +} + +template +void HttpServerSourceStage::source_generator( + rxcpp::subscriber subscriber) +{ + // only check if the server is running when the queue is empty, allowing all queued messages to be processed prior + // to shutting down + bool server_running = true; + bool queue_closed = false; + while (subscriber.is_subscribed() && server_running && !queue_closed) + { + table_t table_ptr{nullptr}; + auto queue_status = m_queue.try_pop(table_ptr); + if (queue_status == boost::fibers::channel_op_status::success) + { + --m_queue_cnt; + // NOLINTNEXTLINE(clang-diagnostic-unused-value) + DCHECK_NOTNULL(table_ptr); + try + { + auto message = MessageMeta::create_from_cpp(std::move(table_ptr->first), 0); + auto num_records = message->count(); + + // When OutputT is MessageMeta, we just swap the pointers + std::shared_ptr out_message{nullptr}; + make_output_message(message, m_task.get(), std::move(table_ptr->second), out_message); + + subscriber.on_next(std::move(out_message)); + m_records_emitted += num_records; + } catch (const std::exception& e) + { + LOG(ERROR) << "Error occurred converting HTTP payload to Dataframe: " << e.what(); + } + + if (m_stop_after > 0 && m_records_emitted >= m_stop_after) + { + throw SourceStageStopAfter(); + } + } + else if (queue_status == boost::fibers::channel_op_status::empty) + { + // if the queue is empty, maybe it's because our server is not running + server_running = m_server->is_running(); + + if (server_running) + { + // Sleep when there are no messages + boost::this_fiber::sleep_for(m_sleep_time); + } + } + else if (queue_status == boost::fibers::channel_op_status::closed) + { + queue_closed = true; + } + else + { + std::string error_msg{"Unknown queue status: " + std::to_string(static_cast(queue_status))}; + LOG(ERROR) << error_msg; + throw std::runtime_error(error_msg); + } + } +} + +template +void HttpServerSourceStage::close() +{ + if (m_server) + { + m_server->stop(); // this is a no-op if the server is not running + m_server.reset(); + } + m_queue.close(); +} + /** @} */ // end of group } // namespace morpheus diff --git a/python/morpheus/morpheus/_lib/include/morpheus/utilities/http_server.hpp b/python/morpheus/morpheus/_lib/include/morpheus/utilities/http_server.hpp index cf1c631628..eee12639f9 100644 --- a/python/morpheus/morpheus/_lib/include/morpheus/utilities/http_server.hpp +++ b/python/morpheus/morpheus/_lib/include/morpheus/utilities/http_server.hpp @@ -17,12 +17,15 @@ #pragma once -#include "morpheus/export.h" // for exporting symbols - -#include // for io_context -#include // for tcp, tcp::acceptor, tcp::endpoint, tcp::socket -#include // for error_code -#include // for verb +#include "morpheus/export.h" // for exporting symbols +#include "morpheus/utilities/json_types.hpp" // for json_t + +#include // for io_context +#include // for tcp, tcp::acceptor, tcp::endpoint, tcp::socket +#include // for error_code +#include // for request +#include // for string_body +#include // for verb #include #include // for pybind11::function @@ -56,6 +59,26 @@ using parse_status_t = std::tuple; +// Note this is different than the http endpoint this represents the TCP connection +using tcp_endpoint_t = boost::asio::ip::tcp::endpoint; +using request_t = boost::beast::http::request; + +/** + * @brief A function that receives the TCP endpoint, and the request object. Returning an instance of `parse_status_t`. + * + * @details The function is expected to return a tuple conforming to `parse_status_t` consisting of the HTTP status + * code, mime type value for the Content-Type header, body of the response and optionally a callback function. + * If specified, the callback function which will be called once the response has been sent or failed to send, as + * indicated by a `boost::system::error_code` reference passed to the function. + * + * Refer to https://www.boost.org/doc/libs/1_74_0/libs/system/doc/html/system.html#ref_class_error_code for more + * information regarding `boost::system::error_code`. + * + * Note: This method is preferred over the payload_parse_fn_t as it provides access to the request headers. + */ +using request_handler_fn_t = + std::function; + /** * @brief A function that receives the post body and returns an HTTP status code, Content-Type string and body. * @@ -66,11 +89,24 @@ using parse_status_t = std::tuple; +using payload_parse_fn_t = std::function; constexpr std::size_t DefaultMaxPayloadSize{1024 * 1024 * 10}; // 10MB +/** + * @brief Convert the request headers to a JSON object. + * + * @param tcp_endpoint The TCP endpoint of the request. + * @param request The request object. + * @return The JSON object representing the request headers. + */ +utilities::json_t request_headers_to_json(const tcp_endpoint_t& tcp_endpoint, const request_t& request); + /** * @brief A struct that encapsulates the http endpoint attributes * @@ -78,11 +114,19 @@ constexpr std::size_t DefaultMaxPayloadSize{1024 * 1024 * 10}; // 10MB */ struct MORPHEUS_EXPORT HttpEndpoint { - HttpEndpoint(payload_parse_fn_t payload_parse_fn, std::string url, std::string method); + HttpEndpoint(request_handler_fn_t request_handler_fn, std::string&& url, const std::string& method); + HttpEndpoint(payload_parse_fn_t payload_parse_fn, std::string&& url, const std::string& method); + std::shared_ptr m_requet_handler; std::shared_ptr m_parser; std::string m_url; boost::beast::http::verb m_method; + + private: + HttpEndpoint(std::shared_ptr&& request_handler_fn, + std::shared_ptr&& payload_parse_fn, + std::string&& url, + const std::string& method); }; /** @@ -171,7 +215,10 @@ class MORPHEUS_EXPORT HttpServer */ struct MORPHEUS_EXPORT HttpEndpointInterfaceProxy { - static std::shared_ptr init(pybind11::function py_parse_fn, std::string m_url, std::string m_method); + static std::shared_ptr init(pybind11::function py_parse_fn, + std::string m_url, + std::string m_method, + bool include_headers = false); }; /****** HttpServerInterfaceProxy *************************/ diff --git a/python/morpheus/morpheus/_lib/include/morpheus/utilities/json_types.hpp b/python/morpheus/morpheus/_lib/include/morpheus/utilities/json_types.hpp index 7dd4fd4516..1df90c2e79 100644 --- a/python/morpheus/morpheus/_lib/include/morpheus/utilities/json_types.hpp +++ b/python/morpheus/morpheus/_lib/include/morpheus/utilities/json_types.hpp @@ -27,9 +27,11 @@ #include // for int64_t, uint64_t, uint8_t #include // for map #include // for allocator, string +#include // for pair #include // for vector -namespace morpheus::utilities { +namespace morpheus { +namespace utilities { /** * @brief A container class derived from std::vector to make it compatible with nlohmann::json to hold * arbitrary Python objects as bytes. @@ -113,7 +115,13 @@ class MORPHEUS_EXPORT json_dict_t : public morpheus::utilities::json_t class MORPHEUS_EXPORT json_list_t : public morpheus::utilities::json_t {}; // NOLINTEND(readability-identifier-naming) -} // namespace morpheus::utilities +} // namespace utilities + +// A task for a control message consists of a task type and a task payload, defined here as a pair since it is invalid +// to define one without the other. +using control_message_task_t = std::pair; + +} // namespace morpheus namespace nlohmann { // NOLINTBEGIN(readability-identifier-naming) diff --git a/python/morpheus/morpheus/_lib/messages/__init__.pyi b/python/morpheus/morpheus/_lib/messages/__init__.pyi index f4ef520152..28d18d8e09 100644 --- a/python/morpheus/morpheus/_lib/messages/__init__.pyi +++ b/python/morpheus/morpheus/_lib/messages/__init__.pyi @@ -44,6 +44,8 @@ class ControlMessage(): def __init__(self, arg0: ControlMessage) -> None: ... @typing.overload def __init__(self, arg0: dict) -> None: ... + @typing.overload + def __init__(self, arg0: object, arg1: dict) -> None: ... def add_task(self, task_type: str, task: object) -> None: ... @typing.overload def config(self) -> object: ... diff --git a/python/morpheus/morpheus/_lib/messages/module.cpp b/python/morpheus/morpheus/_lib/messages/module.cpp index 270e52d0a5..ccbd290335 100644 --- a/python/morpheus/morpheus/_lib/messages/module.cpp +++ b/python/morpheus/morpheus/_lib/messages/module.cpp @@ -405,6 +405,7 @@ PYBIND11_MODULE(messages, _module) py::class_>(_module, "ControlMessage") .def(py::init<>()) .def(py::init(py::overload_cast(&ControlMessageProxy::create))) + .def(py::init(py::overload_cast(&ControlMessageProxy::create))) .def(py::init(py::overload_cast>(&ControlMessageProxy::create))) .def("add_task", &ControlMessage::add_task, py::arg("task_type"), py::arg("task")) .def( diff --git a/python/morpheus/morpheus/_lib/src/messages/control.cpp b/python/morpheus/morpheus/_lib/src/messages/control.cpp index ca23c5f9f8..4d5aaebc04 100644 --- a/python/morpheus/morpheus/_lib/src/messages/control.cpp +++ b/python/morpheus/morpheus/_lib/src/messages/control.cpp @@ -51,6 +51,13 @@ ControlMessage::ControlMessage(const morpheus::utilities::json_t& _config) : config(_config); } +ControlMessage::ControlMessage(const std::shared_ptr& _payload, + const morpheus::utilities::json_t& config) : + ControlMessage(config) +{ + payload(_payload); +} + ControlMessage::ControlMessage(const ControlMessage& other) { m_config = other.m_config; @@ -262,6 +269,12 @@ std::shared_ptr ControlMessageProxy::create(py::dict& config) return std::make_shared(mrc::pymrc::cast_from_pyobject(config)); } +std::shared_ptr ControlMessageProxy::create(const pybind11::object& meta, py::dict& config) +{ + return std::make_shared(MessageMetaInterfaceProxy::init_python_meta(meta), + mrc::pymrc::cast_from_pyobject(config)); +} + std::shared_ptr ControlMessageProxy::create(std::shared_ptr other) { return std::make_shared(*other); diff --git a/python/morpheus/morpheus/_lib/src/stages/deserialize.cpp b/python/morpheus/morpheus/_lib/src/stages/deserialize.cpp index c5356251e0..890140101a 100644 --- a/python/morpheus/morpheus/_lib/src/stages/deserialize.cpp +++ b/python/morpheus/morpheus/_lib/src/stages/deserialize.cpp @@ -29,7 +29,7 @@ namespace morpheus { void make_output_message(std::shared_ptr& incoming_message, TensorIndex start, TensorIndex stop, - cm_task_t* task, + control_message_task_t* task, std::shared_ptr& windowed_message) { DCHECK_EQ(task, nullptr) << "Task is not supported for MultiMessage"; @@ -40,7 +40,7 @@ void make_output_message(std::shared_ptr& incoming_message, void make_output_message(std::shared_ptr& incoming_message, TensorIndex start, TensorIndex stop, - cm_task_t* task, + control_message_task_t* task, std::shared_ptr& windowed_message) { auto slidced_meta = std::make_shared(incoming_message, start, stop); @@ -68,12 +68,12 @@ std::shared_ptr>> Deserial const pybind11::object& task_type, const pybind11::object& task_payload) { - std::unique_ptr task{nullptr}; + std::unique_ptr task{nullptr}; if (!task_type.is_none() && !task_payload.is_none()) { - task = std::make_unique(pybind11::cast(task_type), - mrc::pymrc::cast_from_pyobject(task_payload)); + task = std::make_unique(pybind11::cast(task_type), + mrc::pymrc::cast_from_pyobject(task_payload)); } auto stage = builder.construct_object>( diff --git a/python/morpheus/morpheus/_lib/src/stages/http_server_source_stage.cpp b/python/morpheus/morpheus/_lib/src/stages/http_server_source_stage.cpp index 5a09763393..977c815036 100644 --- a/python/morpheus/morpheus/_lib/src/stages/http_server_source_stage.cpp +++ b/python/morpheus/morpheus/_lib/src/stages/http_server_source_stage.cpp @@ -17,30 +17,81 @@ #include "morpheus/stages/http_server_source_stage.hpp" -#include // for int_to_status, status -#include // for channel_op_status -#include // for sleep_for -#include // for json_reader_options & read_json -#include // for CHECK & LOG - -#include // for std::exception -#include // needed by GLOG -#include // for std::runtime_error -#include // for std::this_thread::sleep_for -#include // for make_tuple -#include // for std::move -#include // for vector -// IWYU thinks we need more boost headers than we need as int_to_status is defined in status.hpp -// IWYU pragma: no_include +#include // for cast +#include // for cast_from_pyobject namespace morpheus { -class SourceStageStopAfter : public std::exception -{}; +void make_output_message(std::shared_ptr& incoming_message, + control_message_task_t* task, + morpheus::utilities::json_t&& http_fields, + std::shared_ptr& out_message) +{ + DCHECK_EQ(task, nullptr) << "Tasks are not supported for MessageMeta"; + out_message.swap(incoming_message); +} + +void make_output_message(std::shared_ptr& incoming_message, + control_message_task_t* task, + morpheus::utilities::json_t&& http_fields, + std::shared_ptr& out_message) +{ + utilities::json_t cm_config = {{"metadata", {{"http_fields", http_fields}}}}; + auto cm_msg = std::make_shared(incoming_message, cm_config); + if (task) + { + cm_msg->add_task(task->first, task->second); + } + out_message.swap(cm_msg); +} + +// ************ HttpServerSourceStageInterfaceProxy ************ // +std::shared_ptr>> +HttpServerSourceStageInterfaceProxy::init_meta(mrc::segment::Builder& builder, + const std::string& name, + std::string bind_address, + unsigned short port, + std::string endpoint, + std::string live_endpoint, + std::string ready_endpoint, + std::string method, + std::string live_method, + std::string ready_method, + unsigned accept_status, + float sleep_time, + long queue_timeout, + std::size_t max_queue_size, + unsigned short num_server_threads, + std::size_t max_payload_size, + int64_t request_timeout, + bool lines, + std::size_t stop_after) +{ + return builder.construct_object>(name, + std::move(bind_address), + port, + std::move(endpoint), + std::move(live_endpoint), + std::move(ready_endpoint), + std::move(method), + std::move(live_method), + std::move(ready_method), + accept_status, + sleep_time, + queue_timeout, + max_queue_size, + num_server_threads, + max_payload_size, + std::chrono::seconds(request_timeout), + lines, + stop_after, + nullptr); +} -// Component public implementations -// ************ HttpServerSourceStage ************* // -HttpServerSourceStage::HttpServerSourceStage(std::string bind_address, +std::shared_ptr>> +HttpServerSourceStageInterfaceProxy::init_cm(mrc::segment::Builder& builder, + const std::string& name, + std::string bind_address, unsigned short port, std::string endpoint, std::string live_endpoint, @@ -54,241 +105,38 @@ HttpServerSourceStage::HttpServerSourceStage(std::string bind_address, std::size_t max_queue_size, unsigned short num_server_threads, std::size_t max_payload_size, - std::chrono::seconds request_timeout, + int64_t request_timeout, bool lines, - std::size_t stop_after) : - PythonSource(build()), - m_max_queue_size{max_queue_size}, - m_sleep_time{std::chrono::milliseconds(static_cast(sleep_time))}, - m_queue_timeout{queue_timeout}, - m_queue{max_queue_size}, - m_stop_after{stop_after}, - m_records_emitted{0} -{ - CHECK(boost::beast::http::int_to_status(accept_status) != boost::beast::http::status::unknown) - << "Invalid HTTP status code: " << accept_status; - - payload_parse_fn_t parser = [this, accept_status, lines](const std::string& payload) { - std::unique_ptr table{nullptr}; - try - { - cudf::io::source_info source{payload.c_str(), payload.size()}; - auto options = cudf::io::json_reader_options::builder(source).lines(lines); - table = std::make_unique(cudf::io::read_json(options.build())); - } catch (const std::exception& e) - { - std::string error_msg = "Error occurred converting HTTP payload to Dataframe"; - LOG(ERROR) << error_msg << ": " << e.what(); - return std::make_tuple(400u, "text/plain", error_msg, nullptr); - } - - try - { - // NOLINTNEXTLINE(clang-diagnostic-unused-value) - DCHECK_NOTNULL(table); - auto queue_status = m_queue.push_wait_for(std::move(table), m_queue_timeout); - - if (queue_status == boost::fibers::channel_op_status::success) - { - m_queue_cnt++; - return std::make_tuple(accept_status, "text/plain", std::string(), nullptr); - } - - std::string error_msg = "HTTP payload queue is "; - switch (queue_status) - { - case boost::fibers::channel_op_status::full: - case boost::fibers::channel_op_status::timeout: { - error_msg += "full"; - break; - } - - case boost::fibers::channel_op_status::closed: { - error_msg += "closed"; - break; - } - default: { - error_msg += "in an unknown state"; - break; - } - } - - return std::make_tuple(503u, "text/plain", std::move(error_msg), nullptr); - } catch (const std::exception& e) - { - std::string error_msg = "Error occurred while pushing payload to queue"; - LOG(ERROR) << error_msg << ": " << e.what(); - return std::make_tuple(500u, "text/plain", error_msg, nullptr); - } - }; - - payload_parse_fn_t live_parser = [this, accept_status, lines](const std::string& payload) { - if (!m_server->is_running()) - { - std::string error_msg = "Source server is not running"; - return std::make_tuple(500u, "text/plain", error_msg, nullptr); - } - - return std::make_tuple(accept_status, "text/plain", std::string(), nullptr); - }; - - payload_parse_fn_t ready_parser = [this, accept_status, lines](const std::string& payload) { - if (!m_server->is_running()) - { - std::string error_msg = "Source server is not running"; - return std::make_tuple(500u, "text/plain", error_msg, nullptr); - } - - if (m_queue_cnt < m_max_queue_size) - { - return std::make_tuple(accept_status, "text/plain", std::string(), nullptr); - } - - std::string error_msg = "HTTP payload queue is full or unavailable to accept new values"; - return std::make_tuple(503u, "text/plain", std::move(error_msg), nullptr); - }; - - std::vector endpoints; - endpoints.emplace_back(parser, endpoint, method); - endpoints.emplace_back(live_parser, live_endpoint, live_method); - endpoints.emplace_back(ready_parser, ready_endpoint, ready_method); - - m_server = std::make_unique( - std::move(endpoints), std::move(bind_address), port, num_server_threads, max_payload_size, request_timeout); -} - -HttpServerSourceStage::subscriber_fn_t HttpServerSourceStage::build() + std::size_t stop_after, + const pybind11::object& task_type, + const pybind11::object& task_payload) { - return [this](rxcpp::subscriber subscriber) -> void { - try - { - m_server->start(); - this->source_generator(subscriber); - } catch (const SourceStageStopAfter& e) - { - DLOG(INFO) << "Completed after emitting " << m_records_emitted << " records"; - } catch (const std::exception& e) - { - LOG(ERROR) << "Encountered error while listening for incoming HTTP requests: " << e.what() << std::endl; - subscriber.on_error(std::make_exception_ptr(e)); - return; - } - subscriber.on_completed(); - this->close(); - }; -} - -void HttpServerSourceStage::source_generator(rxcpp::subscriber subscriber) -{ - // only check if the server is running when the queue is empty, allowing all queued messages to be processed prior - // to shutting down - bool server_running = true; - bool queue_closed = false; - while (subscriber.is_subscribed() && server_running && !queue_closed) - { - table_t table_ptr{nullptr}; - auto queue_status = m_queue.try_pop(table_ptr); - if (queue_status == boost::fibers::channel_op_status::success) - { - // NOLINTNEXTLINE(clang-diagnostic-unused-value) - m_queue_cnt--; - DCHECK_NOTNULL(table_ptr); - try - { - auto message = MessageMeta::create_from_cpp(std::move(*table_ptr), 0); - auto num_records = message->count(); - subscriber.on_next(std::move(message)); - m_records_emitted += num_records; - } catch (const std::exception& e) - { - LOG(ERROR) << "Error occurred converting HTTP payload to Dataframe: " << e.what(); - } - - if (m_stop_after > 0 && m_records_emitted >= m_stop_after) - { - throw SourceStageStopAfter(); - } - } - else if (queue_status == boost::fibers::channel_op_status::empty) - { - // if the queue is empty, maybe it's because our server is not running - server_running = m_server->is_running(); + std::unique_ptr task{nullptr}; - if (server_running) - { - // Sleep when there are no messages - boost::this_fiber::sleep_for(m_sleep_time); - } - } - else if (queue_status == boost::fibers::channel_op_status::closed) - { - queue_closed = true; - } - else - { - std::string error_msg{"Unknown queue status: " + std::to_string(static_cast(queue_status))}; - LOG(ERROR) << error_msg; - throw std::runtime_error(error_msg); - } - } -} - -void HttpServerSourceStage::close() -{ - if (m_server) + if (!task_type.is_none() && !task_payload.is_none()) { - m_server->stop(); // this is a no-op if the server is not running - m_server.reset(); + task = std::make_unique(pybind11::cast(task_type), + mrc::pymrc::cast_from_pyobject(task_payload)); } - m_queue.close(); -} - -HttpServerSourceStage::~HttpServerSourceStage() -{ - close(); -} - -// ************ HttpServerSourceStageInterfaceProxy ************ // -std::shared_ptr> HttpServerSourceStageInterfaceProxy::init( - mrc::segment::Builder& builder, - const std::string& name, - std::string bind_address, - unsigned short port, - std::string endpoint, - std::string live_endpoint, - std::string ready_endpoint, - std::string method, - std::string live_method, - std::string ready_method, - unsigned accept_status, - float sleep_time, - long queue_timeout, - std::size_t max_queue_size, - unsigned short num_server_threads, - std::size_t max_payload_size, - int64_t request_timeout, - bool lines, - std::size_t stop_after) -{ - return builder.construct_object( - name, - std::move(bind_address), - port, - std::move(endpoint), - std::move(live_endpoint), - std::move(ready_endpoint), - std::move(method), - std::move(live_method), - std::move(ready_method), - accept_status, - sleep_time, - queue_timeout, - max_queue_size, - num_server_threads, - max_payload_size, - std::chrono::seconds(request_timeout), - lines, - stop_after); + return builder.construct_object>(name, + std::move(bind_address), + port, + std::move(endpoint), + std::move(live_endpoint), + std::move(ready_endpoint), + std::move(method), + std::move(live_method), + std::move(ready_method), + accept_status, + sleep_time, + queue_timeout, + max_queue_size, + num_server_threads, + max_payload_size, + std::chrono::seconds(request_timeout), + lines, + stop_after, + std::move(task)); } } // namespace morpheus diff --git a/python/morpheus/morpheus/_lib/src/utilities/http_server.cpp b/python/morpheus/morpheus/_lib/src/utilities/http_server.cpp index 7e68b313fb..8f07f42ad2 100644 --- a/python/morpheus/morpheus/_lib/src/utilities/http_server.cpp +++ b/python/morpheus/morpheus/_lib/src/utilities/http_server.cpp @@ -15,18 +15,15 @@ * limitations under the License. */ -// TODO(dagardner): add /health & /info endpoints - #include "morpheus/utilities/http_server.hpp" -#include "pymrc/utilities/function_wrappers.hpp" // for PyFuncWrapper - #include // for dispatch, make_address #include #include // for basic_socket_acceptor<>::executor_type #include // for basic_stream_socket #include -#include // for acceptor, endpoint, socket, +#include // for address +#include // for acceptor, endpoint, socket, #include // for socket_base::reuse_address, socket_base, socket_base::max_listen_connections #include // for strand, make_strand, operator== #include // for bind_front_handler, error_code, flat_buffer, tcp_stream @@ -35,26 +32,30 @@ #include // for flat_buffer #include #include // for tcp_stream -#include // for read_async, request, response, verb, write_async +#include // for read_async, response, write_async #include // for error, error::end_of_stream #include // for field, field::content_type #include -#include // for message, response, request -#include // for request_parser, parser -#include // for status, status::not_found -#include // for string_body, basic_string_body, basic_string_body<>::value_type -#include // for verb, operator<<, verb::unknown +#include // for message, response, request +#include // for request_parser, parser +#include // for status, status::not_found +#include // for verb, operator<<, verb::unknown #include -#include // for CHECK and LOG +#include // for CHECK and LOG +#include // for basic_json, json_ref #include #include // IWYU pragma: keep #include +#include // for PyFuncWrapper +#include // for cast_from_json -#include // for exception +#include // for exception +#include #include // needed for glog #include // for runtime_error, length_error #include // indirectly used by pybind11 casting #include // for move +// IWYU pragma: no_include // loosely based on the following examples: // https://www.boost.org/doc/libs/1_74_0/libs/beast/example/http/server/async/http_server_async.cpp @@ -122,7 +123,7 @@ class Session : public std::enable_shared_from_this void handle_request(http::request&& request) { DLOG(INFO) << "Received request: " << request.method() << " : " << request.target(); - m_response = std::make_unique>(); + m_response = std::make_unique>(); bool valid_request = false; for (const auto& endpoint : m_endpoints) @@ -130,8 +131,16 @@ class Session : public std::enable_shared_from_this if (request.target() == endpoint.m_url && request.method() == endpoint.m_method) { valid_request = true; - std::string body{request.body()}; - auto parse_status = (*endpoint.m_parser)(body); + std::tuple parse_status; + if (endpoint.m_requet_handler != nullptr) + { + parse_status = (*endpoint.m_requet_handler)(m_stream.socket().remote_endpoint(), request); + } + else + { + std::string body{request.body()}; + parse_status = (*endpoint.m_parser)(body); + } m_response->result(std::get<0>(parse_status)); m_response->set(http::field::content_type, std::get<1>(parse_status)); @@ -333,19 +342,53 @@ HttpServer::~HttpServer() } } +utilities::json_t request_headers_to_json(const tcp_endpoint_t& tcp_endpoint, const request_t& request) +{ + morpheus::utilities::json_t headers{{"method", request.method_string()}, + {"endpoint", request.target()}, + {"remote_address", tcp_endpoint.address().to_string()}, + {"remote_port", tcp_endpoint.port()}}; + + for (const auto& field : request) + { + headers[field.name_string()] = field.value(); + } + + return headers; +} + /****** HttpEndpointInterfaceProxy *************************/ using mrc::pymrc::PyFuncWrapper; namespace py = pybind11; std::shared_ptr HttpEndpointInterfaceProxy::init(pybind11::function py_parse_fn, std::string url, - std::string method) + std::string method, + bool include_headers) { - auto wrapped_parse_fn = PyFuncWrapper(std::move(py_parse_fn)); - payload_parse_fn_t payload_parse_fn = [wrapped_parse_fn = std::move(wrapped_parse_fn)](const std::string& payload) { + auto wrapped_parse_fn = PyFuncWrapper(std::move(py_parse_fn)); + request_handler_fn_t request_handler_fn = [include_headers, wrapped_parse_fn = std::move(wrapped_parse_fn)]( + const tcp_endpoint_t& tcp_endpoint, const request_t& request) { + std::string body{request.body()}; + std::unique_ptr headers{nullptr}; + if (include_headers) + { + headers = std::make_unique(std::move(request_headers_to_json(tcp_endpoint, request))); + } + py::gil_scoped_acquire gil; - auto py_payload = py::str(payload); - auto py_result = wrapped_parse_fn.operator()(py_payload); + auto py_payload = py::str(body); + pybind11::tuple py_result; + if (include_headers) + { + py::dict py_headers = mrc::pymrc::cast_from_json(*headers); + py_result = wrapped_parse_fn.operator()(py_payload, py_headers); + } + else + { + py_result = wrapped_parse_fn.operator()(py_payload); + } + on_complete_cb_fn_t cb_fn{nullptr}; if (!py_result[3].is_none()) { @@ -372,7 +415,7 @@ std::shared_ptr HttpEndpointInterfaceProxy::init(pybind11::functio std::move(cb_fn)); }; - return std::make_shared(std::move(payload_parse_fn), url, method); + return std::make_shared(std::move(request_handler_fn), std::move(url), method); } /****** HttpServerInterfaceProxy *************************/ @@ -425,11 +468,21 @@ void HttpServerInterfaceProxy::exit(HttpServer& self, self.stop(); } -HttpEndpoint::HttpEndpoint(payload_parse_fn_t payload_parse_fn, std::string url, std::string method) : - m_parser{std::make_shared(std::move(payload_parse_fn))}, +HttpEndpoint::HttpEndpoint(std::shared_ptr&& request_handler_fn, + std::shared_ptr&& payload_parse_fn, + std::string&& url, + const std::string& method) : + m_requet_handler{std::move(request_handler_fn)}, + m_parser{std::move(payload_parse_fn)}, m_url{std::move(url)}, m_method{http::string_to_verb(method)} { + DCHECK(m_requet_handler != nullptr || m_parser != nullptr) + << "Either request_handler_fn or payload_parse_fn must be provided"; + + DCHECK(m_requet_handler == nullptr || m_parser == nullptr) + << "Only one of request_handler_fn or payload_parse_fn can be provided"; + if (m_method == http::verb::unknown) { throw std::runtime_error("Invalid method: " + method); @@ -441,6 +494,16 @@ HttpEndpoint::HttpEndpoint(payload_parse_fn_t payload_parse_fn, std::string url, } } +HttpEndpoint::HttpEndpoint(request_handler_fn_t request_handler_fn, std::string&& url, const std::string& method) : + HttpEndpoint{ + std::move(std::make_shared(std::move(request_handler_fn))), nullptr, std::move(url), method} +{} + +HttpEndpoint::HttpEndpoint(payload_parse_fn_t payload_parse_fn, std::string&& url, const std::string& method) : + HttpEndpoint{ + nullptr, std::move(std::make_shared(std::move(payload_parse_fn))), std::move(url), method} +{} + Listener::Listener(net::io_context& io_context, const std::string& bind_address, unsigned short port, diff --git a/python/morpheus/morpheus/_lib/stages/__init__.pyi b/python/morpheus/morpheus/_lib/stages/__init__.pyi index 12a274c601..a052ad9d87 100644 --- a/python/morpheus/morpheus/_lib/stages/__init__.pyi +++ b/python/morpheus/morpheus/_lib/stages/__init__.pyi @@ -24,7 +24,8 @@ __all__ = [ "FilterDetectionsControlMessageStage", "FilterDetectionsMultiMessageStage", "FilterSource", - "HttpServerSourceStage", + "HttpServerControlMessageSourceStage", + "HttpServerMessageMetaSourceStage", "InferenceClientStageCM", "InferenceClientStageMM", "KafkaSourceStage", @@ -71,7 +72,10 @@ class FilterDetectionsControlMessageStage(mrc.core.segment.SegmentObject): class FilterDetectionsMultiMessageStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, threshold: float, copy: bool, filter_source: morpheus._lib.common.FilterSource, field_name: str = 'probs') -> None: ... pass -class HttpServerSourceStage(mrc.core.segment.SegmentObject): +class HttpServerControlMessageSourceStage(mrc.core.segment.SegmentObject): + def __init__(self, builder: mrc.core.segment.Builder, name: str, bind_address: str = '127.0.0.1', port: int = 8080, endpoint: str = '/message', live_endpoint: str = '/live', ready_endpoint: str = '/ready', method: str = 'POST', live_method: str = 'GET', ready_method: str = 'GET', accept_status: int = 201, sleep_time: float = 0.10000000149011612, queue_timeout: int = 5, max_queue_size: int = 1024, num_server_threads: int = 1, max_payload_size: int = 10485760, request_timeout: int = 30, lines: bool = False, stop_after: int = 0, task_type: object = None, task_payload: object = None) -> None: ... + pass +class HttpServerMessageMetaSourceStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, bind_address: str = '127.0.0.1', port: int = 8080, endpoint: str = '/message', live_endpoint: str = '/live', ready_endpoint: str = '/ready', method: str = 'POST', live_method: str = 'GET', ready_method: str = 'GET', accept_status: int = 201, sleep_time: float = 0.10000000149011612, queue_timeout: int = 5, max_queue_size: int = 1024, num_server_threads: int = 1, max_payload_size: int = 10485760, request_timeout: int = 30, lines: bool = False, stop_after: int = 0) -> None: ... pass class InferenceClientStageCM(mrc.core.segment.SegmentObject): diff --git a/python/morpheus/morpheus/_lib/stages/module.cpp b/python/morpheus/morpheus/_lib/stages/module.cpp index dc6537fa7d..420aacdbc5 100644 --- a/python/morpheus/morpheus/_lib/stages/module.cpp +++ b/python/morpheus/morpheus/_lib/stages/module.cpp @@ -325,11 +325,11 @@ PYBIND11_MODULE(stages, _module) py::arg("stride"), py::arg("column")); - py::class_, + py::class_>, mrc::segment::ObjectProperties, - std::shared_ptr>>( - _module, "HttpServerSourceStage", py::multiple_inheritance()) - .def(py::init<>(&HttpServerSourceStageInterfaceProxy::init), + std::shared_ptr>>>( + _module, "HttpServerMessageMetaSourceStage", py::multiple_inheritance()) + .def(py::init<>(&HttpServerSourceStageInterfaceProxy::init_meta), py::arg("builder"), py::arg("name"), py::arg("bind_address") = "127.0.0.1", @@ -350,6 +350,33 @@ PYBIND11_MODULE(stages, _module) py::arg("lines") = false, py::arg("stop_after") = 0); + py::class_>, + mrc::segment::ObjectProperties, + std::shared_ptr>>>( + _module, "HttpServerControlMessageSourceStage", py::multiple_inheritance()) + .def(py::init<>(&HttpServerSourceStageInterfaceProxy::init_cm), + py::arg("builder"), + py::arg("name"), + py::arg("bind_address") = "127.0.0.1", + py::arg("port") = 8080, + py::arg("endpoint") = "/message", + py::arg("live_endpoint") = "/live", + py::arg("ready_endpoint") = "/ready", + py::arg("method") = "POST", + py::arg("live_method") = "GET", + py::arg("ready_method") = "GET", + py::arg("accept_status") = 201u, + py::arg("sleep_time") = 0.1f, + py::arg("queue_timeout") = 5, + py::arg("max_queue_size") = 1024, + py::arg("num_server_threads") = 1, + py::arg("max_payload_size") = DefaultMaxPayloadSize, + py::arg("request_timeout") = 30, + py::arg("lines") = false, + py::arg("stop_after") = 0, + py::arg("task_type") = py::none(), + py::arg("task_payload") = py::none()); + py::class_, mrc::segment::ObjectProperties, std::shared_ptr>>( diff --git a/python/morpheus/morpheus/_lib/tests/messages/test_control_message.cpp b/python/morpheus/morpheus/_lib/tests/messages/test_control_message.cpp index 642660fcdc..6094ddd67d 100644 --- a/python/morpheus/morpheus/_lib/tests/messages/test_control_message.cpp +++ b/python/morpheus/morpheus/_lib/tests/messages/test_control_message.cpp @@ -63,6 +63,12 @@ TEST_F(TestControlMessage, InitializationTest) auto msg_two = ControlMessage(config); ASSERT_EQ(msg_two.has_task("load"), true); + + auto data_payload = create_mock_msg_meta({"col1", "col2", "col3"}, {"int32", "float32", "string"}, 5); + + auto msg_three = ControlMessage(data_payload, config); + ASSERT_EQ(msg_three.has_task("load"), true); + ASSERT_EQ(msg_three.payload(), data_payload); } TEST_F(TestControlMessage, SetAndGetMetadata) diff --git a/python/morpheus/morpheus/pipeline/__init__.py b/python/morpheus/morpheus/pipeline/__init__.py index 169bddafe1..587f2df920 100644 --- a/python/morpheus/morpheus/pipeline/__init__.py +++ b/python/morpheus/morpheus/pipeline/__init__.py @@ -17,7 +17,7 @@ # Note: The pipeline module is unique in that we re-export all of the classes and functions from the submodules. To # avoid circular imports, we must import the classes in a specific order. And in each submodule, we should never import -# the from pipeline submodules. Instead, we should import from the parent module as a namespace packag and then use the +# the from pipeline submodules. Instead, we should import from the parent module as a namespace package and then use the # fully qualified name to access the classes. For example, in morpheus/pipeline/stage.py: # Do not do this: # ``` @@ -44,5 +44,6 @@ from morpheus.pipeline.multi_message_stage import MultiMessageStage from morpheus.pipeline.source_stage import SourceStage from morpheus.pipeline.single_output_source import SingleOutputSource +from morpheus.pipeline.configurable_output_source import ConfigurableOutputSource from morpheus.pipeline.pipeline import Pipeline from morpheus.pipeline.linear_pipeline import LinearPipeline diff --git a/python/morpheus/morpheus/pipeline/configurable_output_source.py b/python/morpheus/morpheus/pipeline/configurable_output_source.py new file mode 100644 index 0000000000..66dfbc7d33 --- /dev/null +++ b/python/morpheus/morpheus/pipeline/configurable_output_source.py @@ -0,0 +1,71 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Base class for single output source stages which support both `MessageMeta` and `ControlMessage` as output types.""" +from enum import Enum + +import morpheus.pipeline as _pipeline # pylint: disable=cyclic-import +from morpheus.config import Config +from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta + + +class SupportedMessageTypes(Enum): + """Supported output message types""" + MESSAGE_META = "MessageMeta" + CONTROL_MESSAGE = "ControlMessage" + + +class ConfigurableOutputSource(_pipeline.SingleOutputSource): + """ + Base class for single output source stages which support both `MessageMeta` and `ControlMessage` as output types. + + Parameters + ---------- + config : `morpheus.config.Config` + Pipeline configuration instance. + message_type : `SupportedMessageTypes` + The type of message to emit. + task_type : str, default = None + If specified, adds the specified task to the `ControlMessage`. This parameter is only valid when `message_type` + is set to `CONTROL_MESSAGE`. If not `None`, `task_payload` must also be specified. + task_payload : dict, default = None + If specified, adds the specified task to the `ControlMessage`. This parameter is only valid when `message_type` + is set to `CONTROL_MESSAGE`. If not `None`, `task_type` must also be specified. + """ + + def __init__(self, + config: Config, + message_type: SupportedMessageTypes = SupportedMessageTypes.MESSAGE_META, + task_type: str = None, + task_payload: dict = None): + super().__init__(config) + + self._message_type = message_type + self._task_type = task_type + self._task_payload = task_payload + + if (self._message_type is SupportedMessageTypes.CONTROL_MESSAGE): + if ((self._task_type is None) != (self._task_payload is None)): + raise ValueError("Both `task_type` and `task_payload` must be specified if either is specified.") + elif (self._message_type is SupportedMessageTypes.MESSAGE_META): + if (self._task_type is not None or self._task_payload is not None): + raise ValueError("Cannot specify `task_type` or `task_payload` for non-control messages.") + else: + raise ValueError(f"Invalid message type: {self._message_type}") + + def compute_schema(self, schema: _pipeline.StageSchema): + if (self._message_type is SupportedMessageTypes.CONTROL_MESSAGE): + schema.output_schema.set_type(ControlMessage) + else: + schema.output_schema.set_type(MessageMeta) diff --git a/python/morpheus/morpheus/stages/input/http_client_source_stage.py b/python/morpheus/morpheus/stages/input/http_client_source_stage.py index 4a101e0992..bc055f5c30 100644 --- a/python/morpheus/morpheus/stages/input/http_client_source_stage.py +++ b/python/morpheus/morpheus/stages/input/http_client_source_stage.py @@ -25,17 +25,18 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config +from morpheus.messages import ControlMessage from morpheus.messages import MessageMeta +from morpheus.pipeline.configurable_output_source import ConfigurableOutputSource +from morpheus.pipeline.configurable_output_source import SupportedMessageTypes from morpheus.pipeline.preallocator_mixin import PreallocatorMixin -from morpheus.pipeline.single_output_source import SingleOutputSource -from morpheus.pipeline.stage_schema import StageSchema from morpheus.utils import http_utils logger = logging.getLogger(__name__) @register_stage("from-http-client", ignore_args=["query_params", "headers", "**request_kwargs"]) -class HttpClientSourceStage(PreallocatorMixin, SingleOutputSource): +class HttpClientSourceStage(PreallocatorMixin, ConfigurableOutputSource): """ Source stage that polls a remote HTTP server for incoming data. @@ -83,6 +84,14 @@ class HttpClientSourceStage(PreallocatorMixin, SingleOutputSource): payload_to_df_fn : callable, default None A callable that takes the HTTP payload bytes as the first argument and the `lines` parameter is passed in as the second argument and returns a cudf.DataFrame. If unset cudf.read_json is used. + message_type : `SupportedMessageTypes`, case_sensitive = False + The type of message to emit. + task_type : str, default = None + If specified, adds the specified task to the `ControlMessage`. This parameter is only valid when `message_type` + is set to `CONTROL_MESSAGE`. If not `None`, `task_payload` must also be specified. + task_payload : dict, default = None + If specified, adds the specified task to the `ControlMessage`. This parameter is only valid when `message_type` + is set to `CONTROL_MESSAGE`. If not `None`, `task_type` must also be specified. **request_kwargs : dict Additional arguments to pass to the `requests.request` function. """ @@ -102,8 +111,11 @@ def __init__(self, lines: bool = False, stop_after: int = 0, payload_to_df_fn: typing.Callable[[bytes, bool], cudf.DataFrame] = None, + message_type: SupportedMessageTypes = SupportedMessageTypes.MESSAGE_META, + task_type: str = None, + task_payload: dict = None, **request_kwargs): - super().__init__(config) + super().__init__(config, message_type=message_type, task_type=task_type, task_payload=task_payload) self._url = http_utils.prepare_url(url) if callable(query_params): @@ -151,9 +163,6 @@ def supports_cpp_node(self) -> bool: """Indicates whether or not this stage supports a C++ implementation""" return False - def compute_schema(self, schema: StageSchema): - schema.output_schema.set_type(MessageMeta) - def _parse_response(self, response: requests.Response) -> typing.Union[cudf.DataFrame, None]: """ Returns a DataFrame parsed from the response payload. If the response payload is empty, then `None` is returned. @@ -190,18 +199,29 @@ def _generate_frames(self) -> typing.Iterator[MessageMeta]: request_args['params'] = self._query_params_fn() (http_session, - df) = http_utils.request_with_retry(request_args, - requests_session=http_session, - max_retries=self._max_retries, - sleep_time=self._error_sleep_time, - respect_retry_after_header=self._respect_retry_after_header, - accept_status_codes=self._accept_status_codes, - on_success_fn=self._parse_response) - + response) = http_utils.request_with_retry(request_args, + requests_session=http_session, + max_retries=self._max_retries, + sleep_time=self._error_sleep_time, + respect_retry_after_header=self._respect_retry_after_header, + accept_status_codes=self._accept_status_codes) + + df = self._parse_response(response) # Even if we didn't receive any errors, the server may not have had any data for us. if df is not None and len(df): num_rows = len(df) - yield MessageMeta(df) + msg_meta = MessageMeta(df) + if self._message_type is SupportedMessageTypes.CONTROL_MESSAGE: + http_fields = request_args.copy() + http_fields.update(response.headers) + + out_msg = ControlMessage(msg_meta, {"metadata": {"http_fields": http_fields}}) + if self._task_type is not None: + out_msg.add_task(self._task_type, self._task_payload) + else: + out_msg = msg_meta + + yield out_msg num_records_emitted += num_rows time.sleep(self._sleep_time) diff --git a/python/morpheus/morpheus/stages/input/http_server_source_stage.py b/python/morpheus/morpheus/stages/input/http_server_source_stage.py index ed8d99612f..885f6c0fa3 100644 --- a/python/morpheus/morpheus/stages/input/http_server_source_stage.py +++ b/python/morpheus/morpheus/stages/input/http_server_source_stage.py @@ -26,23 +26,28 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config +from morpheus.messages import ControlMessage from morpheus.messages import MessageMeta +from morpheus.pipeline.configurable_output_source import ConfigurableOutputSource +from morpheus.pipeline.configurable_output_source import SupportedMessageTypes from morpheus.pipeline.preallocator_mixin import PreallocatorMixin -from morpheus.pipeline.single_output_source import SingleOutputSource -from morpheus.pipeline.stage_schema import StageSchema from morpheus.utils.http_utils import HTTPMethod from morpheus.utils.http_utils import HttpParseResponse from morpheus.utils.http_utils import MimeTypes from morpheus.utils.producer_consumer_queue import Closed +if typing.TYPE_CHECKING: + from morpheus.common import FiberQueue + from morpheus.common import HttpServer + logger = logging.getLogger(__name__) SUPPORTED_METHODS = (HTTPMethod.POST, HTTPMethod.PUT) HEALTH_SUPPORTED_METHODS = (HTTPMethod.GET, HTTPMethod.POST) -@register_stage("from-http") -class HttpServerSourceStage(PreallocatorMixin, SingleOutputSource): +@register_stage("from-http", ignore_args=["task_type", "task_payload"]) +class HttpServerSourceStage(PreallocatorMixin, ConfigurableOutputSource): """ Source stage that starts an HTTP server and listens for incoming requests on a specified endpoint. @@ -74,14 +79,22 @@ class HttpServerSourceStage(PreallocatorMixin, SingleOutputSource): request_timeout_secs : int, default 30 The maximum amount of time in seconds for any given request. lines : bool, default False - If False, the HTTP server will expect each request to be a JSON array of objects. If True, the HTTP server will - expect each request to be a JSON object per line. + If `False`, the HTTP server will expect each request to be a JSON array of objects. If `True`, the HTTP server + will expect each request to be a JSON object per line. stop_after : int, default 0 Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0` payload_to_df_fn : callable, default None A callable that takes the HTTP payload string as the first argument and the `lines` parameter is passed in as the second argument and returns a cudf.DataFrame. When supplied, the C++ implementation of this stage is disabled, and the Python impl is used. + message_type : `SupportedMessageTypes`, case_sensitive = False + The type of message to emit. + task_type : str, default = None + If specified, adds the specified task to the `ControlMessage`. This parameter is only valid when `message_type` + is set to `CONTROL_MESSAGE`. If not `None`, `task_payload` must also be specified. + task_payload : dict, default = None + If specified, adds the specified task to the `ControlMessage`. This parameter is only valid when `message_type` + is set to `CONTROL_MESSAGE`. If not `None`, `task_type` must also be specified. """ def __init__(self, @@ -103,8 +116,11 @@ def __init__(self, request_timeout_secs: int = 30, lines: bool = False, stop_after: int = 0, - payload_to_df_fn: typing.Callable[[str, bool], cudf.DataFrame] = None): - super().__init__(config) + payload_to_df_fn: typing.Callable[[str, bool], cudf.DataFrame] = None, + message_type: SupportedMessageTypes = SupportedMessageTypes.MESSAGE_META, + task_type: str = None, + task_payload: dict = None): + super().__init__(config, message_type=message_type, task_type=task_type, task_payload=task_payload) self._bind_address = bind_address self._port = port self._endpoint = endpoint @@ -123,10 +139,11 @@ def __init__(self, self._lines = lines self._stop_after = stop_after self._payload_to_df_fn = payload_to_df_fn - self._http_server = None + + self._http_server: "HttpServer" = None # These are only used when C++ mode is disabled - self._queue = None + self._queue: "FiberQueue" = None self._queue_size = 0 self._processing = False self._records_emitted = 0 @@ -146,10 +163,7 @@ def supports_cpp_node(self) -> bool: """Indicates whether this stage supports C++ nodes.""" return True - def compute_schema(self, schema: StageSchema): - schema.output_schema.set_type(MessageMeta) - - def _parse_payload(self, payload: str) -> HttpParseResponse: + def _parse_payload(self, payload: str, headers: dict = None) -> HttpParseResponse: try: if self._payload_to_df_fn is not None: df = self._payload_to_df_fn(payload, self._lines) @@ -165,7 +179,7 @@ def _parse_payload(self, payload: str) -> HttpParseResponse: body=err_msg) try: - self._queue.put(df, block=True, timeout=self._queue_timeout) + self._queue.put((df, headers), block=True, timeout=self._queue_timeout) self._queue_size += 1 return HttpParseResponse(status_code=self._accept_status.value, content_type=MimeTypes.TEXT.value, body="") @@ -195,7 +209,7 @@ def _liveliness_check(self, _: str) -> HttpParseResponse: content_type=MimeTypes.TEXT.value, body=err_msg) - return HttpParseResponse(status_code=self._accept_status.value, content_type=MimeTypes.TEXT.value, body="") + return HttpParseResponse(status_code=HTTPStatus.OK.value, content_type=MimeTypes.TEXT.value, body="") def _readiness_check(self, _: str) -> HttpParseResponse: if not self._http_server.is_running(): @@ -206,7 +220,7 @@ def _readiness_check(self, _: str) -> HttpParseResponse: body=err_msg) if self._queue_size < self._max_queue_size: - return HttpParseResponse(status_code=self._accept_status.value, content_type=MimeTypes.TEXT.value, body="") + return HttpParseResponse(status_code=HTTPStatus.OK.value, content_type=MimeTypes.TEXT.value, body="") err_msg = "HTTP payload queue is full or unavailable to accept new values" logger.error(err_msg) @@ -214,14 +228,20 @@ def _readiness_check(self, _: str) -> HttpParseResponse: content_type=MimeTypes.TEXT.value, body=err_msg) - def _generate_frames(self) -> typing.Iterator[MessageMeta]: + def _generate_frames(self) -> typing.Iterator[ControlMessage | MessageMeta]: from morpheus.common import FiberQueue from morpheus.common import HttpEndpoint from morpheus.common import HttpServer - msg = HttpEndpoint(self._parse_payload, self._endpoint, self._method.name) - live = HttpEndpoint(self._liveliness_check, self._live_endpoint, self._live_method.name) - ready = HttpEndpoint(self._readiness_check, self._ready_endpoint, self._ready_method.name) + msg = HttpEndpoint(self._parse_payload, + self._endpoint, + self._method.name, + include_headers=(self._message_type is SupportedMessageTypes.CONTROL_MESSAGE)) + live = HttpEndpoint(self._liveliness_check, self._live_endpoint, self._live_method.name, include_headers=False) + ready = HttpEndpoint(self._readiness_check, + self._ready_endpoint, + self._ready_method.name, + include_headers=False) with (FiberQueue(self._max_queue_size) as self._queue, HttpServer(endpoints=[msg, live, ready], bind_address=self._bind_address, @@ -236,9 +256,10 @@ def _generate_frames(self) -> typing.Iterator[MessageMeta]: # Read as many messages as we can from the queue if it's empty check to see if we should be shutting # down. It is important that any messages we received that are in the queue are processed before we # shutdown since we already returned an OK response to the client. - df = None + df: cudf.DataFrame = None + headers: dict = None try: - df = self._queue.get() + (df, headers) = self._queue.get() self._queue_size -= 1 except queue.Empty: if (not self._http_server.is_running()): @@ -252,7 +273,15 @@ def _generate_frames(self) -> typing.Iterator[MessageMeta]: if df is not None: num_records = len(df) - yield MessageMeta(df) + msg_meta = MessageMeta(df) + if self._message_type is SupportedMessageTypes.CONTROL_MESSAGE: + out_msg = ControlMessage(msg_meta, {"metadata": {"http_fields": headers}}) + if self._task_type is not None: + out_msg.add_task(self._task_type, self._task_payload) + else: + out_msg = msg_meta + + yield out_msg self._records_emitted += num_records if self._stop_after > 0 and self._records_emitted >= self._stop_after: @@ -260,22 +289,34 @@ def _generate_frames(self) -> typing.Iterator[MessageMeta]: def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: if self._build_cpp_node() and self._payload_to_df_fn is None: + http_server_kwargs = { + "bind_address": self._bind_address, + "port": self._port, + "endpoint": self._endpoint, + "method": self._method.value, + "live_endpoint": self._live_endpoint, + "live_method": self._live_method.value, + "ready_endpoint": self._ready_endpoint, + "ready_method": self._ready_method.value, + "accept_status": self._accept_status.value, + "sleep_time": self._sleep_time, + "queue_timeout": self._queue_timeout, + "max_queue_size": self._max_queue_size, + "num_server_threads": self._num_server_threads, + "max_payload_size": self._max_payload_size_bytes, + "request_timeout": self._request_timeout_secs, + "lines": self._lines, + "stop_after": self._stop_after + } + import morpheus._lib.stages as _stages - node = _stages.HttpServerSourceStage(builder, - self.unique_name, - bind_address=self._bind_address, - port=self._port, - endpoint=self._endpoint, - method=self._method.value, - accept_status=self._accept_status.value, - sleep_time=self._sleep_time, - queue_timeout=self._queue_timeout, - max_queue_size=self._max_queue_size, - num_server_threads=self._num_server_threads, - max_payload_size=self._max_payload_size_bytes, - request_timeout=self._request_timeout_secs, - lines=self._lines, - stop_after=self._stop_after) + if self._message_type is SupportedMessageTypes.CONTROL_MESSAGE: + http_server_kwargs.update({"task_type": self._task_type, "task_payload": self._task_payload}) + server_class = _stages.HttpServerControlMessageSourceStage + else: + server_class = _stages.HttpServerMessageMetaSourceStage + + node = server_class(builder, self.unique_name, **http_server_kwargs) else: node = builder.make_source(self.unique_name, self._generate_frames()) diff --git a/tests/common/test_http_server.py b/tests/common/test_http_server.py index 238ad51e5c..f82e122dcd 100644 --- a/tests/common/test_http_server.py +++ b/tests/common/test_http_server.py @@ -40,8 +40,9 @@ def make_parse_fn(status: HTTPStatus = HTTPStatus.OK, @pytest.mark.parametrize("endpoints", [("/t1", "/t2", "/t3"), ("test/", "123/", "a1d/"), ("/a", "/a/b", "/a/b/c/d")]) @pytest.mark.parametrize("port", [8088, 9090]) @pytest.mark.parametrize("method", ["GET", "POST", "PUT"]) -@pytest.mark.parametrize("use_callback", [True, False]) -@pytest.mark.parametrize("use_context_mgr", [True, False]) +@pytest.mark.parametrize("include_headers", [True, False], ids=["with_headers", "without_headers"]) +@pytest.mark.parametrize("use_callback", [True, False], ids=["with_callback", "without_callback"]) +@pytest.mark.parametrize("use_context_mgr", [True, False], ids=["with_context_mgr", "without_context_mgr"]) @pytest.mark.parametrize("num_threads", [1, 2, min(8, os.cpu_count())]) @pytest.mark.parametrize("status,content_type,content", [(HTTPStatus.OK, MimeTypes.TEXT.value, "OK"), @@ -54,6 +55,7 @@ def test_simple_request(port: int, status: HTTPStatus, content_type: str, content: str, + include_headers: bool, use_callback: bool, use_context_mgr: bool, num_threads: int): @@ -74,16 +76,38 @@ def test_simple_request(port: int, server = None - def check_server(url) -> None: + def check_server(url: str, endpoint: str) -> None: assert server.is_running() - response = requests.request(method=method, url=url, data=payload, timeout=5.0) + response = requests.request(method=method, url=url, data=payload, timeout=5.0, headers={"unit": "test"}) assert response.status_code == status.value assert response.headers["Content-Type"] == content_type assert response.text == content - parse_fn.assert_called_once_with(payload) + if include_headers: + expected_endpoint = endpoint + if not expected_endpoint.startswith("/"): + expected_endpoint = f"/{expected_endpoint}" + + # Subset of headers that we want to check for + expected_headers = { + 'Host': f'127.0.0.1:{port}', + 'endpoint': expected_endpoint, + 'method': method, + 'remote_address': '127.0.0.1', + 'unit': 'test' + } + parse_fn.assert_called_once() + assert parse_fn.call_args[0][0] == payload + + actual_headers = parse_fn.call_args[0][1] + for (key, value) in expected_headers.items(): + assert actual_headers[key] == value + + else: + parse_fn.assert_called_once_with(payload) + parse_fn.reset_mock() if use_callback: @@ -104,21 +128,22 @@ def check_server(url) -> None: for endpoint in endpoints: urls.append(make_url(port, endpoint)) - http_endpoints.append(HttpEndpoint(py_parse_fn=parse_fn, url=endpoint, method=method)) + http_endpoints.append( + HttpEndpoint(py_parse_fn=parse_fn, url=endpoint, method=method, include_headers=include_headers)) if use_context_mgr: with HttpServer(endpoints=http_endpoints, port=port, num_threads=num_threads) as server: assert server.is_running() - for url in urls: - check_server(url) + for (i, url) in enumerate(urls): + check_server(url, endpoints[i]) else: server = HttpServer(endpoints=http_endpoints, port=port, num_threads=num_threads) assert not server.is_running() server.start() - for url in urls: - check_server(url) + for (i, url) in enumerate(urls): + check_server(url, endpoints[i]) server.stop() diff --git a/tests/messages/test_control_message.py b/tests/messages/test_control_message.py index 85f2aa344f..f70f4f5f75 100644 --- a/tests/messages/test_control_message.py +++ b/tests/messages/test_control_message.py @@ -29,10 +29,31 @@ # pylint: disable=unsubscriptable-object -@pytest.mark.usefixtures("config_only_cpp") -def test_control_message_init(): - messages.ControlMessage() # noqa: F841 - messages.ControlMessage({"test": "test"}) # noqa: F841 +def _verify_metadata(msg: messages.ControlMessage, metadata: dict): + assert msg.get_metadata() == metadata + for (key, value) in metadata.items(): + assert msg.get_metadata(key) == value + + +def test_control_message_init(dataset: DatasetManager): + # Explicitly performing copies of the metadata, config and the dataframe, to ensure tha the original data is not + # being modified in place in some way. + msg = messages.ControlMessage() + assert msg.get_metadata() == {} # pylint: disable=use-implicit-booleaness-not-comparison + assert msg.payload() is None + + metadata = {"test_key": "test_value"} + cm_config = {"metadata": metadata.copy()} + + msg = messages.ControlMessage(cm_config.copy()) + _verify_metadata(msg, metadata) + + payload = messages.MessageMeta(dataset["filter_probs.csv"]) + + msg_w_payload = messages.ControlMessage(payload, cm_config.copy()) + _verify_metadata(msg_w_payload, metadata) + + dataset.assert_df_equal(msg_w_payload.payload().df, dataset["filter_probs.csv"]) @pytest.mark.usefixtures("config_only_cpp") diff --git a/tests/pipeline/test_configurable_output_source.py b/tests/pipeline/test_configurable_output_source.py new file mode 100644 index 0000000000..3668405fb7 --- /dev/null +++ b/tests/pipeline/test_configurable_output_source.py @@ -0,0 +1,84 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mrc +import pytest + +from morpheus.config import Config +from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta +from morpheus.pipeline.configurable_output_source import ConfigurableOutputSource +from morpheus.pipeline.configurable_output_source import SupportedMessageTypes +from morpheus.pipeline.stage_schema import StageSchema + + +class ConfigurableOutputSourceImpl(ConfigurableOutputSource): + """ + Concrete implementation of ConfigurableOutputSource for testing purposes. + """ + + @property + def name(self) -> str: + return "from-unit-test" + + def supports_cpp_node(self) -> bool: + return False + + def _build_source(self, _: mrc.Builder) -> mrc.SegmentObject: + raise NotImplementedError("Not implemented in test") + + +@pytest.mark.parametrize("message_type", SupportedMessageTypes) +def test_compute_schema(config: Config, message_type: SupportedMessageTypes): + source = ConfigurableOutputSourceImpl(config=config, message_type=message_type) + + if message_type == SupportedMessageTypes.MESSAGE_META: + expected_message_class = MessageMeta + else: + expected_message_class = ControlMessage + + schema = StageSchema(source) + source.compute_schema(schema) + + assert len(schema.output_schemas) == 1 + + port_schema = schema.output_schemas[0] + assert port_schema.get_type() is expected_message_class + + +def test_constructor_error_task_with_message_meta(config: Config): + with pytest.raises(ValueError): + ConfigurableOutputSourceImpl(config=config, + message_type=SupportedMessageTypes.MESSAGE_META, + task_type="test", + task_payload={"why": "setting task only valid for ControlMessage output"}) + + +def test_constructor_error_task_type_without_task_payload(config: Config): + with pytest.raises(ValueError): + ConfigurableOutputSourceImpl(config=config, + message_type=SupportedMessageTypes.CONTROL_MESSAGE, + task_type="setting task_type requires setting task_payload") + + +def test_constructor_error_task_payload_without_task_type(config: Config): + with pytest.raises(ValueError): + ConfigurableOutputSourceImpl(config=config, + message_type=SupportedMessageTypes.MESSAGE_META, + task_payload={"why": "setting task_payload requires setting task_type"}) + + +def test_constructor_error_invalid_type(config: Config): + with pytest.raises(ValueError): + ConfigurableOutputSourceImpl(config=config, message_type="invalid message type") diff --git a/tests/stages/test_http_client_source_stage_pipe.py b/tests/stages/test_http_client_source_stage_pipe.py index 26bc1f911e..dd11445cdd 100644 --- a/tests/stages/test_http_client_source_stage_pipe.py +++ b/tests/stages/test_http_client_source_stage_pipe.py @@ -21,7 +21,10 @@ from _utils import assert_results from _utils.dataset_manager import DatasetManager from morpheus.config import Config +from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta from morpheus.pipeline import LinearPipeline +from morpheus.pipeline.configurable_output_source import SupportedMessageTypes from morpheus.stages.input.http_client_source_stage import HttpClientSourceStage from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage @@ -30,11 +33,21 @@ @pytest.mark.use_cudf @pytest.mark.parametrize("lines", [False, True], ids=["json", "lines"]) @pytest.mark.parametrize("use_payload_to_df_fn", [False, True], ids=["no_payload_to_df_fn", "payload_to_df_fn"]) +@pytest.mark.parametrize("message_type, task_type, task_payload", + [(SupportedMessageTypes.MESSAGE_META, None, None), + (SupportedMessageTypes.CONTROL_MESSAGE, None, None), + (SupportedMessageTypes.CONTROL_MESSAGE, "test", { + "pay": "load" + })], + ids=["message_meta", "control_message_no_task", "control_message_with_task"]) def test_http_client_source_stage_pipe(config: Config, dataset: DatasetManager, mock_rest_server: str, lines: bool, - use_payload_to_df_fn: bool): + use_payload_to_df_fn: bool, + message_type: SupportedMessageTypes, + task_type: str | None, + task_payload: dict | None): """ Test the HttpClientSourceStage against a mock REST server which will return JSON data which can be deserialized into a DataFrame. @@ -78,11 +91,42 @@ def payload_to_df_fn(payload, lines_arg): max_retries=1, lines=lines, stop_after=num_records, - payload_to_df_fn=payload_to_df_fn)) + payload_to_df_fn=payload_to_df_fn, + message_type=message_type, + task_type=task_type, + task_payload=task_payload)) comp_stage = pipe.add_stage(CompareDataFrameStage(config, expected_df)) pipe.run() - assert_results(comp_stage.get_results()) + assert_results(comp_stage.get_results(clear=False)) + + messages = comp_stage.get_messages() + assert len(messages) == 1 + + recv_msg = messages[0] + if message_type == SupportedMessageTypes.MESSAGE_META: + assert isinstance(recv_msg, MessageMeta) + else: + assert isinstance(recv_msg, ControlMessage) + if task_type is not None: + expected_tasks = {task_type: [task_payload]} + else: + expected_tasks = {} + + assert recv_msg.get_tasks() == expected_tasks + + if lines: + # This is the content type specified in `tests/mock_rest_server/mocks/api/v1/data-lines/GET.mock` + expected_content_type = "text/plain;charset=UTF-8" + else: + expected_content_type = "application/json" + + # Subset of headers that we want to check for + expected_headers = {'url': url, 'method': 'GET', 'Content-Type': expected_content_type} + + actual_headers = recv_msg.get_metadata()['http_fields'] + for (key, value) in expected_headers.items(): + assert actual_headers[key] == value @pytest.mark.slow diff --git a/tests/stages/test_http_server_source_stage.py b/tests/stages/test_http_server_source_stage.py index 01763d78db..0f425efe01 100644 --- a/tests/stages/test_http_server_source_stage.py +++ b/tests/stages/test_http_server_source_stage.py @@ -20,6 +20,7 @@ from io import StringIO from unittest import mock +import pandas as pd import pytest import requests import requests.adapters @@ -29,7 +30,9 @@ from _utils.dataset_manager import DatasetManager from morpheus.config import Config from morpheus.io.serializers import df_to_stream_json +from morpheus.messages import ControlMessage from morpheus.messages import MessageMeta +from morpheus.pipeline.configurable_output_source import SupportedMessageTypes from morpheus.stages.input.http_server_source_stage import HttpServerSourceStage from morpheus.utils.http_utils import HTTPMethod from morpheus.utils.http_utils import MimeTypes @@ -37,7 +40,7 @@ class GetNext(threading.Thread): - def __init__(self, msg_queue: queue.Queue, generator: typing.Iterator[MessageMeta]): + def __init__(self, msg_queue: queue.Queue, generator: typing.Iterator[ControlMessage | MessageMeta]): threading.Thread.__init__(self) self._generator = generator self._msg_queue = msg_queue @@ -59,9 +62,22 @@ def join(self, timeout=None): @pytest.mark.slow @pytest.mark.use_python +@pytest.mark.parametrize("message_type, task_type, task_payload", + [(SupportedMessageTypes.MESSAGE_META, None, None), + (SupportedMessageTypes.CONTROL_MESSAGE, None, None), + (SupportedMessageTypes.CONTROL_MESSAGE, "test", { + "pay": "load" + })], + ids=["message_meta", "control_message_no_task", "control_message_with_task"]) @pytest.mark.parametrize("lines", [False, True], ids=["json", "lines"]) @pytest.mark.parametrize("use_payload_to_df_fn", [False, True], ids=["no_payload_to_df_fn", "payload_to_df_fn"]) -def test_generate_frames(config: Config, dataset_pandas: DatasetManager, lines: bool, use_payload_to_df_fn: bool): +def test_generate_frames(config: Config, + dataset_pandas: DatasetManager, + lines: bool, + use_payload_to_df_fn: bool, + message_type: SupportedMessageTypes, + task_type: str | None, + task_payload: dict | None): # The _generate_frames() method is only used when C++ mode is disabled endpoint = '/test' port = 8088 @@ -69,7 +85,7 @@ def test_generate_frames(config: Config, dataset_pandas: DatasetManager, lines: accept_status = HTTPStatus.OK url = make_url(port, endpoint) - df = dataset_pandas['filter_probs.csv'] + df: pd.DataFrame = dataset_pandas['filter_probs.csv'] if lines: content_type = MimeTypes.TEXT.value @@ -93,7 +109,10 @@ def test_generate_frames(config: Config, dataset_pandas: DatasetManager, lines: method=method, accept_status=accept_status, lines=lines, - payload_to_df_fn=payload_to_df_fn) + payload_to_df_fn=payload_to_df_fn, + message_type=message_type, + task_type=task_type, + task_payload=task_payload) generate_frames = stage._generate_frames() msg_queue = queue.SimpleQueue() @@ -120,7 +139,9 @@ def test_generate_frames(config: Config, dataset_pandas: DatasetManager, lines: data=payload, timeout=10, allow_redirects=False, - headers={"Content-Type": content_type}) + headers={ + "Content-Type": content_type, "unit": "test" + }) result_msg = msg_queue.get(timeout=5.0) get_next_thread.join() @@ -135,7 +156,36 @@ def test_generate_frames(config: Config, dataset_pandas: DatasetManager, lines: else: expected_df = df - dataset_pandas.assert_compare_df(expected_df, result_msg.df) + if message_type == SupportedMessageTypes.CONTROL_MESSAGE: + expected_class = ControlMessage + actual_df = result_msg.payload().df + else: + expected_class = MessageMeta + actual_df = result_msg.df + + assert isinstance(result_msg, expected_class) + dataset_pandas.assert_compare_df(expected_df, actual_df) + + if message_type == SupportedMessageTypes.CONTROL_MESSAGE: + if task_type is not None: + expected_tasks = {task_type: [task_payload]} + else: + expected_tasks = {} + + assert result_msg.get_tasks() == expected_tasks + + # Subset of headers that we want to check for + expected_headers = { + 'Host': f'127.0.0.1:{port}', + 'endpoint': endpoint, + 'method': method.value, + 'remote_address': '127.0.0.1', + 'unit': 'test' + } + + actual_headers = result_msg.get_metadata()['http_fields'] + for (key, value) in expected_headers.items(): + assert actual_headers[key] == value @pytest.mark.parametrize("invalid_method", [HTTPMethod.GET, HTTPMethod.PATCH]) diff --git a/tests/stages/test_http_server_source_stage_pipe.py b/tests/stages/test_http_server_source_stage_pipe.py index d00297a7dd..e84d762be0 100644 --- a/tests/stages/test_http_server_source_stage_pipe.py +++ b/tests/stages/test_http_server_source_stage_pipe.py @@ -25,7 +25,10 @@ from _utils.dataset_manager import DatasetManager from morpheus.config import Config from morpheus.io.serializers import df_to_stream_json +from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta from morpheus.pipeline import LinearPipeline +from morpheus.pipeline.configurable_output_source import SupportedMessageTypes from morpheus.pipeline.pipeline import PipelineState from morpheus.stages.input.http_server_source_stage import HttpServerSourceStage from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage @@ -61,7 +64,7 @@ async def make_request(pipe: LinearPipeline, 'timeout': 5.0, 'allow_redirects': False, 'headers': { - "Content-Type": content_type + "Content-Type": content_type, "unit": "test" } }, accept_status_codes=[accept_status], @@ -81,8 +84,20 @@ async def run_pipe_and_request(pipe: LinearPipeline, @pytest.mark.slow -@pytest.mark.parametrize("lines", [False, True]) -def test_http_server_source_stage_pipe(config: Config, dataset_cudf: DatasetManager, lines: bool): +@pytest.mark.parametrize("message_type, task_type, task_payload", + [(SupportedMessageTypes.MESSAGE_META, None, None), + (SupportedMessageTypes.CONTROL_MESSAGE, None, None), + (SupportedMessageTypes.CONTROL_MESSAGE, "test", { + "pay": "load" + })], + ids=["message_meta", "control_message_no_task", "control_message_with_task"]) +@pytest.mark.parametrize("lines", [False, True], ids=["json", "lines"]) +def test_http_server_source_stage_pipe(config: Config, + dataset_cudf: DatasetManager, + lines: bool, + message_type: SupportedMessageTypes, + task_type: str | None, + task_payload: dict | None): endpoint = '/test' port = 8088 method = HTTPMethod.POST @@ -109,16 +124,47 @@ def test_http_server_source_stage_pipe(config: Config, dataset_cudf: DatasetMana method=method, accept_status=accept_status, lines=lines, - stop_after=num_records)) + stop_after=num_records, + message_type=message_type, + task_type=task_type, + task_payload=task_payload)) comp_stage = pipe.add_stage(CompareDataFrameStage(config, df)) response_queue = queue.SimpleQueue() asyncio.run(run_pipe_and_request(pipe, response_queue, method, accept_status, url, payload, content_type)) - assert_results(comp_stage.get_results()) + assert_results(comp_stage.get_results(clear=False)) response = response_queue.get_nowait() assert response.status_code == accept_status.value assert response.headers["Content-Type"] == MimeTypes.TEXT.value assert response.text == "" + + messages = comp_stage.get_messages() + assert len(messages) == 1 + + recv_msg = messages[0] + if message_type == SupportedMessageTypes.MESSAGE_META: + assert isinstance(recv_msg, MessageMeta) + else: + assert isinstance(recv_msg, ControlMessage) + if task_type is not None: + expected_tasks = {task_type: [task_payload]} + else: + expected_tasks = {} + + assert recv_msg.get_tasks() == expected_tasks + + # Subset of headers that we want to check for + expected_headers = { + 'Host': f'127.0.0.1:{port}', + 'endpoint': endpoint, + 'method': method.value, + 'remote_address': '127.0.0.1', + 'unit': 'test' + } + + actual_headers = recv_msg.get_metadata()['http_fields'] + for (key, value) in expected_headers.items(): + assert actual_headers[key] == value