Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for ControlMessage as an output type for HttpServerSourceStage and HttpClientSourceStage #1834

Open
wants to merge 56 commits into
base: branch-24.10
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
271d31f
Add constructor that takes a message meta and a config
dagardner-nv Aug 2, 2024
34f267c
Add test for new constructor overload
dagardner-nv Aug 2, 2024
e2d87ee
WIP
dagardner-nv Aug 2, 2024
73c51b9
Add tests for new control message constructor
dagardner-nv Aug 2, 2024
a07d88f
WIP
dagardner-nv Aug 3, 2024
e53758c
WIP
dagardner-nv Aug 3, 2024
be2824e
Rename cm_task_t prior to making it shared
dagardner-nv Aug 6, 2024
5aefa9c
Move control_message_task_t
dagardner-nv Aug 6, 2024
61a4563
WIP
dagardner-nv Aug 6, 2024
d06bbd3
The live and ready endpoints should always return a 200 on success, t…
dagardner-nv Aug 6, 2024
4c5d693
Add a comment explaining the parser/queue split
dagardner-nv Aug 6, 2024
39e5201
Move the string variables
dagardner-nv Aug 6, 2024
bff6482
For now ignore control message related cli flags
dagardner-nv Aug 6, 2024
a0535e1
Alternately support a request parser
dagardner-nv Aug 6, 2024
f839023
WIP
dagardner-nv Aug 6, 2024
b4f8057
WIP
dagardner-nv Aug 6, 2024
c13dfa3
WIP
dagardner-nv Aug 6, 2024
ebd200b
Sore http fields in the control message metadata
dagardner-nv Aug 6, 2024
22c144e
Fix setting of control message metadata
dagardner-nv Aug 6, 2024
fd90053
Fix construction of ControlMessage
dagardner-nv Aug 7, 2024
0e051cc
Move header definitions for interface proxy above method impls
dagardner-nv Aug 7, 2024
6181c1f
Add an optional include_headers bool arg to HttpEndpoint, when false …
dagardner-nv Aug 7, 2024
a6bf5f6
Fix type-hints for the fiber queue
dagardner-nv Aug 7, 2024
b96710d
Set headers in control message metadata in python impl
dagardner-nv Aug 7, 2024
36bf306
Fix docstrings
dagardner-nv Aug 7, 2024
f9d2dbf
Fill in docstrings
dagardner-nv Aug 7, 2024
2736486
Remove out of date todo comment [no ci]
dagardner-nv Aug 7, 2024
453481e
Add docstrings
dagardner-nv Aug 7, 2024
af4aae3
Remove out of date comment [no ci]
dagardner-nv Aug 7, 2024
4365344
Lint fixes [no ci]
dagardner-nv Aug 7, 2024
d874850
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Aug 7, 2024
60fd42d
Docstring fix, and fix for removed variable [no ci]
dagardner-nv Aug 7, 2024
1f61dc8
Switch message_type to an enum, allowing this to be set from the cli
dagardner-nv Aug 7, 2024
75dc5ff
Cleanups [no ci]
dagardner-nv Aug 7, 2024
849e2c7
Add test for new include_headers argument to HttpEndpoint [no ci]
dagardner-nv Aug 7, 2024
4ef152f
Fix return type hint for _generate_frames [no ci]
dagardner-nv Aug 7, 2024
0d50179
Cleanups [no ci]
dagardner-nv Aug 7, 2024
714663f
Fix test
dagardner-nv Aug 8, 2024
bb71b94
Test to ensure we add tasks to control messages [no ci]
dagardner-nv Aug 8, 2024
9d52555
update tests for control message output
dagardner-nv Aug 8, 2024
28b9e55
First pass at supporting control message as an output type [no ci]
dagardner-nv Aug 8, 2024
9d6f5da
Create a new base class ConfigurableOutputSource which contains code …
dagardner-nv Aug 8, 2024
6bbcb24
Remove unused import [no ci]
dagardner-nv Aug 8, 2024
58d58f3
WIP [no ci]
dagardner-nv Aug 8, 2024
c3cfd03
Fix circular import issue
dagardner-nv Aug 8, 2024
072fc20
Specify ids for test params to make it clearer which parameter is bei…
dagardner-nv Aug 8, 2024
fccc48e
Formatting [no ci]
dagardner-nv Aug 8, 2024
e62ed19
Update test for control message support
dagardner-nv Aug 8, 2024
bed4761
Create tests for ConfigurableOutputSource
dagardner-nv Aug 8, 2024
c4cf203
Remove the 'case_sensitive = False' bit as this is specific to the CLI
dagardner-nv Aug 8, 2024
9c1241c
Add test for invalid type error
dagardner-nv Aug 8, 2024
1434213
Fix docstrings
dagardner-nv Aug 8, 2024
ab99f15
Add type hint for the http server
dagardner-nv Aug 8, 2024
51ea300
Merge branch 'branch-24.10' into david-http-source-cm-1811
dagardner-nv Aug 13, 2024
3170636
Mark tests _utils as known first party
dagardner-nv Aug 13, 2024
c079820
Merge branch 'branch-24.10' into david-http-source-cm-1811
dagardner-nv Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/morpheus/morpheus/_lib/common/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class FilterSource():
__members__: dict # value = {'Auto': <FilterSource.Auto: 0>, 'TENSOR': <FilterSource.TENSOR: 1>, 'DATAFRAME': <FilterSource.DATAFRAME: 2>}
pass
class HttpEndpoint():
def __init__(self, py_parse_fn: function, url: str, method: str) -> None: ...
def __init__(self, py_parse_fn: function, url: str, method: str, include_headers: bool = False) -> None: ...
pass
class HttpServer():
def __enter__(self) -> HttpServer: ...
Expand Down
6 changes: 5 additions & 1 deletion python/morpheus/morpheus/_lib/common/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,11 @@ PYBIND11_MODULE(common, _module)
.value("DATAFRAME", FilterSource::DATAFRAME);

py::class_<HttpEndpoint, std::shared_ptr<HttpEndpoint>>(_module, "HttpEndpoint")
.def(py::init<>(&HttpEndpointInterfaceProxy::init), py::arg("py_parse_fn"), py::arg("url"), py::arg("method"));
.def(py::init<>(&HttpEndpointInterfaceProxy::init),
py::arg("py_parse_fn"),
py::arg("url"),
py::arg("method"),
py::arg("include_headers") = false);

py::class_<HttpServer, std::shared_ptr<HttpServer>>(_module, "HttpServer")
.def(py::init<>(&HttpServerInterfaceProxy::init),
Expand Down
131 changes: 9 additions & 122 deletions python/morpheus/morpheus/_lib/include/morpheus/messages/control.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,128 +39,6 @@ enum class MORPHEUS_EXPORT ControlMessageType
TRAINING
};

// class PayloadManager
// {
// public:
// /**
// * @brief Get the tensor object identified by `name`
// *
// * @param name
// * @return TensorObject&
// * @throws std::runtime_error If no tensor matching `name` exists
// */
// TensorObject& get_tensor(const std::string& name)
// {
// return m_tensors->get_tensor(name);
// }

// /**
// * @brief Get the tensor object identified by `name`
// *
// * @param name
// * @return const TensorObject&
// * @throws std::runtime_error If no tensor matching `name` exists
// */
// const TensorObject& get_tensor(const std::string& name) const
// {
// return m_tensors->get_tensor(name);
// }

// /**
// * @brief Set the tensor object identified by `name`
// *
// * @param name
// * @param tensor
// * @throws std::length_error If the number of rows in `tensor` does not match `count`.
// */
// void set_tensor(const std::string& name, TensorObject&& tensor)
// {
// m_tensors->set_tensor(name, std::move(tensor));
// }

// /**
// * @brief Get a reference to the internal tensors map
// *
// * @return const TensorMap&
// */
// const TensorMap& get_tensors() const
// {
// return m_tensors->get_tensors();
// }

// /**
// * @brief Set the tensors object
// *
// * @param tensors
// * @throws std::length_error If the number of rows in the `tensors` do not match `count`.
// */
// void set_tensors(TensorMap&& tensors)
// {
// m_tensors->set_tensors(std::move(tensors));
// }

// /**
// * @brief Get the tensor object identified by `name`
// *
// * @param name
// * @return TensorObject&
// * @throws std::runtime_error If no tensor matching `name` exists
// */
// TensorObject& get_column(const std::string& name)
// {
// return m_tensors->get_tensor(name);
// }

// /**
// * @brief Get the tensor object identified by `name`
// *
// * @param name
// * @return const TensorObject&
// * @throws std::runtime_error If no tensor matching `name` exists
// */
// const TensorObject& get_column(const std::string& name) const
// {
// return m_tensors->get_tensor(name);
// }

// /**
// * @brief Set the tensor object identified by `name`
// *
// * @param name
// * @param tensor
// * @throws std::length_error If the number of rows in `tensor` does not match `count`.
// */
// void set_column(const std::string& name, TensorObject&& tensor)
// {
// m_tensors->set_tensor(name, std::move(tensor));
// }

// /**
// * @brief Get a reference to the internal tensors map
// *
// * @return const TensorMap&
// */
// TableInfo get_columns() const
// {
// return m_df->get_info();
// }

// /**
// * @brief Set the tensors object
// *
// * @param tensors
// * @throws std::length_error If the number of rows in the `tensors` do not match `count`.
// */
// void set_columns(TableInfo&& tensors)
// {
// m_tensors->set_tensors(std::move(tensors));
// }

// private:
// std::shared_ptr<MessageMeta> m_df;
// std::shared_ptr<TensorMemory> m_tensors;
// };

class MORPHEUS_EXPORT TensorMemory;

// System-clock for better compatibility with pybind11/chrono
Expand All @@ -178,6 +56,7 @@ class MORPHEUS_EXPORT ControlMessage
public:
ControlMessage();
explicit ControlMessage(const morpheus::utilities::json_t& config);
explicit ControlMessage(const std::shared_ptr<MessageMeta>& payload, const morpheus::utilities::json_t& config);

ControlMessage(const ControlMessage& other); // Copies config and metadata, but not payload

Expand Down Expand Up @@ -387,6 +266,14 @@ struct MORPHEUS_EXPORT ControlMessageProxy
*/
static std::shared_ptr<ControlMessage> create(pybind11::dict& config);

/**
* @brief Creates a new ControlMessage instance from a Python MessageMeta and a configuration dictionary.
* @param meta Python instance of MessageMeta
* @param config A pybind11::dict representing the configuration for the ControlMessage.
* @return A shared_ptr to a newly created ControlMessage instance.
*/
static std::shared_ptr<ControlMessage> create(const pybind11::object& meta, pybind11::dict& config);

/**
* @brief Creates a new ControlMessage instance as a copy of an existing one.
* @param other A shared_ptr to another ControlMessage instance to copy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "morpheus/messages/meta.hpp"
#include "morpheus/messages/multi.hpp"
#include "morpheus/types.hpp" // for TensorIndex
#include "morpheus/utilities/json_types.hpp" // for control_message_task_t
#include "morpheus/utilities/python_util.hpp" // for show_warning_message
#include "morpheus/utilities/string_util.hpp" // for MORPHEUS_CONCAT_STR

Expand Down Expand Up @@ -50,18 +51,16 @@ namespace morpheus {
* @file
*/

using cm_task_t = std::pair<std::string, nlohmann::json>;

void make_output_message(std::shared_ptr<MessageMeta>& incoming_message,
TensorIndex start,
TensorIndex stop,
cm_task_t* task,
control_message_task_t* task,
std::shared_ptr<MultiMessage>& windowed_message);

void make_output_message(std::shared_ptr<MessageMeta>& incoming_message,
TensorIndex start,
TensorIndex stop,
cm_task_t* task,
control_message_task_t* task,
std::shared_ptr<ControlMessage>& windowed_message);

/****** DeserializationStage********************************/
Expand All @@ -83,8 +82,8 @@ class MORPHEUS_EXPORT DeserializeStage
* @param task Optional task to be added to all outgoing `ControlMessage`s, ignored when `OutputT` is `MultiMessage`
*/
DeserializeStage(TensorIndex batch_size,
bool ensure_sliceable_index = true,
std::unique_ptr<cm_task_t> task = nullptr) :
bool ensure_sliceable_index = true,
std::unique_ptr<control_message_task_t> task = nullptr) :
base_t(base_t::op_factory_from_sub_fn(build_operator())),
m_batch_size(batch_size),
m_ensure_sliceable_index(ensure_sliceable_index),
Expand All @@ -95,7 +94,7 @@ class MORPHEUS_EXPORT DeserializeStage

TensorIndex m_batch_size;
bool m_ensure_sliceable_index{true};
std::unique_ptr<cm_task_t> m_task{nullptr};
std::unique_ptr<control_message_task_t> m_task{nullptr};
};

/****** DeserializationStageInterfaceProxy******************/
Expand Down
Loading
Loading