Skip to content

Commit

Permalink
Define a type alias for the state change callback
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Aug 14, 2024
1 parent 6ad77b6 commit df9f552
Show file tree
Hide file tree
Showing 13 changed files with 34 additions and 32 deletions.
16 changes: 4 additions & 12 deletions cpp/mrc/include/mrc/pipeline/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include "mrc/types.hpp"
#include "mrc/utils/macros.hpp"

#include <functional> // for function
Expand Down Expand Up @@ -52,21 +53,12 @@ class IExecutor

namespace mrc {

enum class State
{
Init = 0,
Run = 1,
Joined = 2,
Stop = 3,
Kill = 4
};

// For backwards compatibility, make utility implementation which holds onto a unique_ptr
class Executor : public pipeline::IExecutor
{
public:
Executor();
Executor(std::shared_ptr<Options> options, std::function<void(State)> state_change_cb = nullptr);
Executor(std::shared_ptr<Options> options, on_state_change_fn state_change_cb = nullptr);
~Executor() override;

void register_pipeline(std::shared_ptr<pipeline::IPipeline> pipeline) override;
Expand All @@ -79,9 +71,9 @@ class Executor : public pipeline::IExecutor
};

std::unique_ptr<pipeline::IExecutor> make_executor(std::shared_ptr<Options> options,
std::function<void(State)> state_change_cb = nullptr);
on_state_change_fn state_change_cb = nullptr);

std::unique_ptr<pipeline::IExecutor> make_executor(std::unique_ptr<pipeline::ISystem> system,
std::function<void(State)> state_change_cb = nullptr);
on_state_change_fn state_change_cb = nullptr);

} // namespace mrc
12 changes: 12 additions & 0 deletions cpp/mrc/include/mrc/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ using PortName = std::string;
using PortID = std::uint16_t;
using PortAddress = std::uint64_t; // id + rank + port

enum class State
{
Init = 0,
Run = 1,
Joined = 2,
Stop = 3,
Kill = 4
};

// Stage change callback passed to the executor
using on_state_change_fn = std::function<void(State)>;

// NOLINTEND(readability-identifier-naming)

} // namespace mrc
2 changes: 1 addition & 1 deletion cpp/mrc/src/internal/executor/executor_definition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static bool valid_pipeline(const pipeline::PipelineDefinition& pipeline)
}

ExecutorDefinition::ExecutorDefinition(std::unique_ptr<system::SystemDefinition> system,
std::function<void(State)> state_change_cb) :
on_state_change_fn state_change_cb) :
SystemProvider(std::move(system)),
Service("ExecutorDefinition"),
m_resources_manager(std::make_unique<resources::Manager>(*this)),
Expand Down
5 changes: 2 additions & 3 deletions cpp/mrc/src/internal/executor/executor_definition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ namespace mrc::executor {
class ExecutorDefinition : public pipeline::IExecutor, public Service, public system::SystemProvider
{
public:
ExecutorDefinition(std::unique_ptr<system::SystemDefinition> system,
std::function<void(State)> state_change_cb = nullptr);
ExecutorDefinition(std::unique_ptr<system::SystemDefinition> system, on_state_change_fn state_change_cb = nullptr);
~ExecutorDefinition() override;

static std::shared_ptr<ExecutorDefinition> unwrap(std::shared_ptr<IExecutor> object);
Expand All @@ -65,7 +64,7 @@ class ExecutorDefinition : public pipeline::IExecutor, public Service, public sy
void do_service_await_live() final;
void do_service_await_join() final;

std::function<void(State)> m_state_change_cb = nullptr;
on_state_change_fn m_state_change_cb = nullptr;
std::unique_ptr<resources::Manager> m_resources_manager;
std::unique_ptr<pipeline::Manager> m_pipeline_manager;
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/mrc/src/internal/pipeline/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace mrc::pipeline {

Manager::Manager(std::shared_ptr<PipelineDefinition> pipeline,
resources::Manager& resources,
std::function<void(State)> state_change_cb) :
on_state_change_fn state_change_cb) :
Service("pipeline::Manager"),
m_pipeline(std::move(pipeline)),
m_resources(resources),
Expand Down
4 changes: 2 additions & 2 deletions cpp/mrc/src/internal/pipeline/manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Manager : public Service
public:
Manager(std::shared_ptr<PipelineDefinition> pipeline,
resources::Manager& resources,
std::function<void(State)> state_change_cb = nullptr);
on_state_change_fn state_change_cb = nullptr);
~Manager() override;

const PipelineDefinition& pipeline() const;
Expand All @@ -67,7 +67,7 @@ class Manager : public Service
void do_service_kill() final;
void do_service_await_join() final;

std::function<void(State)> m_state_change_cb = nullptr;
on_state_change_fn m_state_change_cb = nullptr;
resources::Manager& m_resources;
std::shared_ptr<PipelineDefinition> m_pipeline;
std::unique_ptr<node::WritableEntrypoint<ControlMessage>> m_update_channel;
Expand Down
4 changes: 2 additions & 2 deletions cpp/mrc/src/internal/pipeline/pipeline_instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class PipelineInstance final : public Service, public PipelineResources
public:
PipelineInstance(std::shared_ptr<const PipelineDefinition> definition,
resources::Manager& resources,
std::function<void(State)> state_change_cb = nullptr);
on_state_change_fn state_change_cb = nullptr);
~PipelineInstance() override;

// currently we are passing the instance back to the executor
Expand Down Expand Up @@ -89,7 +89,7 @@ class PipelineInstance final : public Service, public PipelineResources
bool m_joinable{false};
Promise<void> m_joinable_promise;
SharedFuture<void> m_joinable_future;
std::function<void(State)> m_state_change_cb = nullptr;
on_state_change_fn m_state_change_cb = nullptr;
};

} // namespace mrc::pipeline
2 changes: 1 addition & 1 deletion cpp/mrc/src/internal/segment/segment_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ SegmentInstance::SegmentInstance(std::shared_ptr<const SegmentDefinition> defini
SegmentRank rank,
pipeline::PipelineResources& resources,
std::size_t partition_id,
std::function<void(State)> state_change_cb) :
on_state_change_fn state_change_cb) :
Service("segment::SegmentInstance"),
m_name(definition->name()),
m_id(definition->id()),
Expand Down
4 changes: 2 additions & 2 deletions cpp/mrc/src/internal/segment/segment_instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SegmentInstance final : public Service
SegmentRank rank,
pipeline::PipelineResources& resources,
std::size_t partition_id,
std::function<void(State)> state_change_cb = nullptr);
on_state_change_fn state_change_cb = nullptr);
~SegmentInstance() override;

const std::string& name() const;
Expand Down Expand Up @@ -81,7 +81,7 @@ class SegmentInstance final : public Service
std::unique_ptr<BuilderDefinition> m_builder;
pipeline::PipelineResources& m_resources;
const std::size_t m_default_partition_id;
std::function<void(State)> m_state_change_cb = nullptr;
on_state_change_fn m_state_change_cb = nullptr;

std::map<std::string, std::unique_ptr<mrc::runnable::Runner>> m_runners;
std::map<std::string, std::unique_ptr<mrc::runnable::Runner>> m_egress_runners;
Expand Down
7 changes: 3 additions & 4 deletions cpp/mrc/src/public/pipeline/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace mrc {

Executor::Executor() : m_impl(make_executor(std::make_shared<Options>())) {}

Executor::Executor(std::shared_ptr<Options> options, std::function<void(State)> state_change_cb) :
Executor::Executor(std::shared_ptr<Options> options, on_state_change_fn state_change_cb) :
m_impl(make_executor(options, std::move(state_change_cb)))
{}

Expand All @@ -58,8 +58,7 @@ void Executor::join()
m_impl->join();
}

std::unique_ptr<pipeline::IExecutor> make_executor(std::shared_ptr<Options> options,
std::function<void(State)> state_change_cb)
std::unique_ptr<pipeline::IExecutor> make_executor(std::shared_ptr<Options> options, on_state_change_fn state_change_cb)
{
// Convert options to a system object first
auto system = mrc::make_system(std::move(options));
Expand All @@ -70,7 +69,7 @@ std::unique_ptr<pipeline::IExecutor> make_executor(std::shared_ptr<Options> opti
}

std::unique_ptr<pipeline::IExecutor> make_executor(std::unique_ptr<pipeline::ISystem> system,
std::function<void(State)> state_change_cb)
on_state_change_fn state_change_cb)
{
auto full_system = system::SystemDefinition::unwrap(std::move(system));

Expand Down
4 changes: 2 additions & 2 deletions python/mrc/_pymrc/include/pymrc/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class Executor
public:
Executor();

Executor(std::shared_ptr<Options> options, std::function<void(State)> state_change_cb = nullptr);
Executor(std::shared_ptr<Options> options, on_state_change_fn state_change_cb = nullptr);
~Executor();

void register_pipeline(pymrc::Pipeline& pipeline);
Expand All @@ -74,7 +74,7 @@ class Executor
SharedFuture<void> m_join_future;

std::shared_ptr<pipeline::IExecutor> m_exec;
std::function<void(State)> m_state_change_cb = nullptr;
on_state_change_fn m_state_change_cb = nullptr;
};

class PyBoostFuture
Expand Down
2 changes: 1 addition & 1 deletion python/mrc/_pymrc/src/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void Awaitable::next()
}

/** Executor impls -- move to own file **/
Executor::Executor(std::shared_ptr<Options> options, std::function<void(State)> state_change_cb) :
Executor::Executor(std::shared_ptr<Options> options, on_state_change_fn state_change_cb) :
m_state_change_cb(std::move(state_change_cb))
{
// Before creating the internal exec, set the signal mask so we can capture Ctrl+C
Expand Down
2 changes: 1 addition & 1 deletion python/mrc/core/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ PYBIND11_MODULE(executor, py_mod)
return exec;
}))
.def(py::init<>([](std::shared_ptr<mrc::Options> options, py::object state_change_cb) {
std::function<void(State)> wrapped_state_change_cb = nullptr;
on_state_change_fn wrapped_state_change_cb = nullptr;
if (!state_change_cb.is_none())
{
auto wrapped_cb = PyFuncWrapper(std::move(state_change_cb));
Expand Down

0 comments on commit df9f552

Please sign in to comment.