Skip to content

Commit

Permalink
Remove mrc::internals namespace and cleanup class names (#328)
Browse files Browse the repository at this point in the history
This PR does the following:

- Removes the `mrc::internal` namespace entirely
- Standardizes the core interface classes `IBuilder`, `ISegment`, `IPipeline` and `IExecutor`
   - All public classes have a pure virtual interface class with the name `I<XXX>`. The implementation name is `<XXX>Definition` to follow the `SegmentDefinition`, `PipelineDefinition` classes.
- Renames classes and files with similar names to be unique
   - For example:
      - `segment::Instance` => `segment::SegmentInstance`
      - `pipeline::Instance` => `pipeline::PipelineInstance`
      - `src/internal/control_plane/resources.hpp` => `src/internal/control_plane/control_plane_resources.hpp`
      - `src/internal/data_plane/resources.hpp` => `src/internal/data_plane/data_plane_resources.hpp`

This PR has a significant number of breaking changes. Effort was taken to keep the external interface as similar as possible, but some breaking changes to the external API needed to be made:

- Many classes have been renamed:
   - All classes under `mrc/engine` have been moved to `mrc/pipeline`
      - Includes `IExecutor`, `IPipeline`, `IBuilder`, `ISystem`, and `IDefinition`
   - `segment::Definition` => `pipeline::SegmentDefinition`
   - `segment::Segment` => `Segment`
   - `segment::Builder` => `segment::IBuilder`
   - `pipeline::Resources` => `runnable::IRunnableResources`
   - `runnable::Engine` => `runnable::IEngine`
   - `runnable::Engines` => `runnable::IEngines`
- Classes have been removed:
   - `IResources`
- The entire `mrc::internals` namespace has been removed
   - This only impacts internal source files, but there were some instances in `include/mrc/engine` that referenced this namespace

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - Devin Robison (https://github.com/drobison00)

URL: #328
  • Loading branch information
mdemoret-nv authored Jun 26, 2023
1 parent fea298f commit 3d34d6c
Show file tree
Hide file tree
Showing 338 changed files with 4,106 additions and 3,965 deletions.
4 changes: 2 additions & 2 deletions ci/scripts/github/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ function update_conda_env() {

# Make sure we have the conda-merge package installed
if [[ -z "$(conda list | grep conda-merge)" ]]; then
rapids-mamba-retry install -n mrc -c conda-forge "conda-merge>=0.2"
rapids-mamba-retry install -q -n mrc -c conda-forge "conda-merge>=0.2"
fi

# Create a temp directory which we store the combined environment file in
Expand All @@ -91,7 +91,7 @@ function update_conda_env() {
conda run -n mrc --live-stream conda-merge ${CONDA_ENV_YML} ${CONDA_CLANG_ENV_YML} ${CONDA_CI_ENV_YML} > ${condatmpdir}/merged_env.yml

# Update the conda env with prune remove excess packages (in case one was removed from the env)
rapids-mamba-retry env update -n mrc -q --prune --file ${condatmpdir}/merged_env.yml
rapids-mamba-retry env update -n mrc --prune --file ${condatmpdir}/merged_env.yml

# Delete the temp directory
rm -rf ${condatmpdir}
Expand Down
54 changes: 24 additions & 30 deletions cpp/mrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,34 @@ add_library(libmrc
src/internal/codable/codable_storage.cpp
src/internal/codable/decodable_storage_view.cpp
src/internal/codable/storage_view.cpp
src/internal/control_plane/client/connections_manager.cpp
src/internal/control_plane/client.cpp
src/internal/control_plane/client/connections_manager.cpp
src/internal/control_plane/client/instance.cpp
src/internal/control_plane/client/state_manager.cpp
src/internal/control_plane/client/subscription_service.cpp
src/internal/control_plane/resources.cpp
src/internal/control_plane/server/connection_manager.cpp
src/internal/control_plane/control_plane_resources.cpp
src/internal/control_plane/server.cpp
src/internal/control_plane/server/connection_manager.cpp
src/internal/control_plane/server/subscription_manager.cpp
src/internal/control_plane/server/tagged_issuer.cpp
src/internal/data_plane/callbacks.cpp
src/internal/data_plane/client.cpp
src/internal/data_plane/data_plane_resources.cpp
src/internal/data_plane/request.cpp
src/internal/data_plane/resources.cpp
src/internal/data_plane/server.cpp
src/internal/executor/executor.cpp
src/internal/executor/iexecutor.cpp
src/internal/executor/executor_definition.cpp
src/internal/grpc/progress_engine.cpp
src/internal/grpc/server.cpp
src/internal/memory/device_resources.cpp
src/internal/memory/host_resources.cpp
src/internal/memory/transient_pool.cpp
src/internal/network/resources.cpp
src/internal/network/network_resources.cpp
src/internal/pipeline/controller.cpp
src/internal/pipeline/instance.cpp
src/internal/pipeline/ipipeline.cpp
src/internal/pipeline/manager.cpp
src/internal/pipeline/pipeline.cpp
src/internal/pipeline/pipeline_definition.cpp
src/internal/pipeline/pipeline_instance.cpp
src/internal/pipeline/pipeline_resources.cpp
src/internal/pipeline/port_graph.cpp
src/internal/pipeline/resources.cpp
src/internal/pubsub/publisher_round_robin.cpp
src/internal/pubsub/publisher_service.cpp
src/internal/pubsub/subscriber_service.cpp
Expand All @@ -61,21 +59,19 @@ add_library(libmrc
src/internal/resources/manager.cpp
src/internal/resources/partition_resources_base.cpp
src/internal/resources/partition_resources.cpp
src/internal/runnable/engine.cpp
src/internal/runnable/engine_factory.cpp
src/internal/runnable/engine.cpp
src/internal/runnable/engines.cpp
src/internal/runnable/fiber_engine.cpp
src/internal/runnable/fiber_engines.cpp
src/internal/runnable/resources.cpp
src/internal/runnable/runnable_resources.cpp
src/internal/runnable/thread_engine.cpp
src/internal/runnable/thread_engines.cpp
src/internal/runtime/partition.cpp
src/internal/runtime/runtime.cpp
src/internal/segment/builder.cpp
src/internal/segment/definition.cpp
src/internal/segment/ibuilder.cpp
src/internal/segment/idefinition.cpp
src/internal/segment/instance.cpp
src/internal/segment/builder_definition.cpp
src/internal/segment/segment_definition.cpp
src/internal/segment/segment_instance.cpp
src/internal/service.cpp
src/internal/system/device_info.cpp
src/internal/system/device_partition.cpp
Expand All @@ -84,39 +80,36 @@ add_library(libmrc
src/internal/system/fiber_pool.cpp
src/internal/system/fiber_task_queue.cpp
src/internal/system/gpu_info.cpp
src/internal/system/host_partition.cpp
src/internal/system/host_partition_provider.cpp
src/internal/system/iresources.cpp
src/internal/system/isystem.cpp
src/internal/system/partition.cpp
src/internal/system/host_partition.cpp
src/internal/system/partition_provider.cpp
src/internal/system/partition.cpp
src/internal/system/partitions.cpp
src/internal/system/resources.cpp
src/internal/system/system.cpp
src/internal/system/system_provider.cpp
src/internal/system/thread.cpp
src/internal/system/system.cpp
src/internal/system/thread_pool.cpp
src/internal/system/thread.cpp
src/internal/system/threading_resources.cpp
src/internal/system/topology.cpp
src/internal/ucx/context.cpp
src/internal/ucx/endpoint.cpp
src/internal/ucx/memory_block.cpp
src/internal/ucx/receive_manager.cpp
src/internal/ucx/resources.cpp
src/internal/ucx/ucx_resources.cpp
src/internal/ucx/worker.cpp
src/internal/utils/collision_detector.cpp
src/internal/utils/exception_guard.cpp
src/internal/utils/parse_config.cpp
src/internal/utils/parse_ints.cpp
src/internal/utils/shared_resource_bit_map.cpp
src/public/benchmarking/tracer.cpp
src/public/benchmarking/trace_statistics.cpp
src/public/benchmarking/tracer.cpp
src/public/benchmarking/util.cpp
src/public/channel/channel.cpp
src/public/codable/encoded_object.cpp
src/public/codable/memory.cpp
src/public/core/addresses.cpp
src/public/core/bitmap.cpp
src/public/core/executor.cpp
src/public/core/fiber_pool.cpp
src/public/core/logging.cpp
src/public/core/thread.cpp
Expand Down Expand Up @@ -145,15 +138,16 @@ add_library(libmrc
src/public/options/resources.cpp
src/public/options/services.cpp
src/public/options/topology.cpp
src/public/pipeline/executor.cpp
src/public/pipeline/pipeline.cpp
src/public/pipeline/segment.cpp
src/public/pipeline/system.cpp
src/public/runnable/context.cpp
src/public/runnable/launcher.cpp
src/public/runnable/runnable.cpp
src/public/runnable/runner.cpp
src/public/runnable/types.cpp
src/public/runtime/remote_descriptor.cpp
src/public/segment/builder.cpp
src/public/segment/definition.cpp
src/public/utils/bytes_to_string.cpp
src/public/utils/thread_utils.cpp
src/public/utils/type_utils.cpp
Expand Down
11 changes: 5 additions & 6 deletions cpp/mrc/benchmarks/bench_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
#include "mrc/benchmarking/segment_watcher.hpp"
#include "mrc/benchmarking/tracer.hpp"
#include "mrc/benchmarking/util.hpp"
#include "mrc/core/executor.hpp"
#include "mrc/engine/pipeline/ipipeline.hpp"
#include "mrc/node/rx_node.hpp"
#include "mrc/node/rx_sink.hpp"
#include "mrc/node/rx_source.hpp"
#include "mrc/pipeline/executor.hpp"
#include "mrc/pipeline/pipeline.hpp"
#include "mrc/segment/builder.hpp" // IWYU pragma: keep
#include "mrc/segment/object.hpp" // IWYU pragma: keep
Expand Down Expand Up @@ -190,7 +189,7 @@ class SimpleEmitReceiveFixture : public benchmark::Fixture
void SetUp(const ::benchmark::State& state) override
{
TimeUtil::estimate_steady_clock_delay();
auto init = [this](segment::Builder& segment) {
auto init = [this](segment::IBuilder& segment) {
std::string src_name = "nsrc";
std::string int_name = "n1";
std::string sink_name = "nsink";
Expand All @@ -215,7 +214,7 @@ class SimpleEmitReceiveFixture : public benchmark::Fixture
segment.make_edge(internal, sink);
};

auto pipeline = pipeline::make_pipeline();
auto pipeline = mrc::make_pipeline();
auto segment = pipeline->make_segment("bench_segment", init);

std::shared_ptr<Executor> executor = std::make_shared<Executor>();
Expand Down Expand Up @@ -250,7 +249,7 @@ class LongEmitReceiveFixture : public benchmark::Fixture
void SetUp(const ::benchmark::State& state) override
{
TimeUtil::estimate_steady_clock_delay();
auto init = [this](segment::Builder& segment) {
auto init = [this](segment::IBuilder& segment) {
std::string src_name = "nsrc";
std::string sink_name = "nsink";

Expand Down Expand Up @@ -284,7 +283,7 @@ class LongEmitReceiveFixture : public benchmark::Fixture
segment.make_edge(last_node, sink);
};

auto pipeline = pipeline::make_pipeline();
auto pipeline = mrc::make_pipeline();
auto segment = pipeline->make_segment("bench_segment", init);

std::shared_ptr<Executor> executor = std::make_shared<Executor>();
Expand Down
13 changes: 7 additions & 6 deletions cpp/mrc/include/mrc/benchmarking/segment_watcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include "mrc/benchmarking/trace_statistics.hpp"
#include "mrc/benchmarking/tracer.hpp"
#include "mrc/core/executor.hpp"
#include "mrc/pipeline/executor.hpp"

#include <boost/fiber/barrier.hpp>
#include <boost/fiber/condition_variable.hpp>
Expand Down Expand Up @@ -48,8 +48,8 @@ class SegmentWatcher
using time_pt_t = std::chrono::time_point<std::chrono::steady_clock>;

~SegmentWatcher() = default;
SegmentWatcher(std::shared_ptr<Executor> executor);
SegmentWatcher(std::shared_ptr<Executor> executor, std::function<void(TracerTypeT&)> payload_init);
SegmentWatcher(std::shared_ptr<pipeline::IExecutor> executor);
SegmentWatcher(std::shared_ptr<pipeline::IExecutor> executor, std::function<void(TracerTypeT&)> payload_init);

[[nodiscard]] bool tracing() const;

Expand Down Expand Up @@ -142,7 +142,7 @@ class SegmentWatcher
std::atomic<bool> m_segment_started{false};
std::atomic<bool> m_latency_cycle_ready{true};

std::shared_ptr<Executor> m_executor;
std::shared_ptr<pipeline::IExecutor> m_executor;

std::mutex m_mutex;
boost::fibers::condition_variable_any m_cond_wake;
Expand Down Expand Up @@ -290,11 +290,12 @@ decltype(auto) SegmentWatcher<TracerTypeT>::create_rx_tracer_source(const std::s
}

template <typename TracerTypeT>
SegmentWatcher<TracerTypeT>::SegmentWatcher(std::shared_ptr<Executor> executor) : m_executor(std::move(executor))
SegmentWatcher<TracerTypeT>::SegmentWatcher(std::shared_ptr<pipeline::IExecutor> executor) :
m_executor(std::move(executor))
{}

template <typename TracerTypeT>
SegmentWatcher<TracerTypeT>::SegmentWatcher(std::shared_ptr<Executor> executor,
SegmentWatcher<TracerTypeT>::SegmentWatcher(std::shared_ptr<pipeline::IExecutor> executor,
std::function<void(TracerTypeT&)> payload_init) :
m_executor(std::move(executor)),
m_payload_init(payload_init)
Expand Down
40 changes: 0 additions & 40 deletions cpp/mrc/include/mrc/core/executor.hpp

This file was deleted.

9 changes: 8 additions & 1 deletion cpp/mrc/include/mrc/core/task_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include "mrc/core/fiber_meta_data.hpp"
#include "mrc/core/utils.hpp"
#include "mrc/types.hpp"

#include <boost/fiber/all.hpp>
Expand Down Expand Up @@ -70,8 +71,14 @@ class FiberTaskQueue

// track detached fibers - main fiber will wait on all detached fibers to finish
packaged_task<void()> wrapped_task([this, t = std::move(task)]() mutable {
auto decrement_detached = Unwinder::create([this]() {
--m_detached;
});

// Call the task
t();
--m_detached;

// Detached will get automatically decremented even if there is an exception
});
++m_detached;

Expand Down
31 changes: 17 additions & 14 deletions cpp/mrc/include/mrc/core/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

#include "mrc/utils/macros.hpp"

#include <glog/logging.h>

#include <algorithm>
#include <exception>
#include <map>
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>

namespace mrc {

Expand Down Expand Up @@ -54,47 +57,47 @@ std::set<KeyT> extract_keys(const std::map<KeyT, ValT>& stdmap)
}

// RAII will execute a function when destroyed.
template <typename FunctionT>
class Unwinder
{
public:
explicit Unwinder(std::function<void()> unwind_fn) : m_unwind_fn(std::move(unwind_fn)) {}

~Unwinder()
{
if (!!m_function)
if (!!m_unwind_fn)
{
try
{
(*m_function)();
m_unwind_fn();
} catch (...)
{
LOG(ERROR) << "Fatal error during unwinder function";
std::terminate();
}
}
}

explicit Unwinder(FunctionT* function_arg) : m_function(function_arg) {}

void detach()
{
m_function = nullptr;
m_unwind_fn = nullptr;
}

Unwinder() = delete;
Unwinder(const Unwinder&) = delete;
Unwinder& operator=(const Unwinder&) = delete;

static Unwinder create(std::function<void()> unwind_fn)
{
return Unwinder(std::move(unwind_fn));
}

private:
FunctionT* m_function;
std::function<void()> m_unwind_fn;
};

#define MRC_UNWIND(var_name, function) MRC_UNWIND_EXPLICIT(uw_func_##var_name, var_name, function)

#define MRC_UNWIND_AUTO(function) \
MRC_UNWIND_EXPLICIT(MRC_UNIQUE_VAR_NAME(uw_func_), MRC_UNIQUE_VAR_NAME(un_obj_), function)
#define MRC_UNWIND(unwinder_name, function) mrc::Unwinder unwinder_name(function);

#define MRC_UNWIND_EXPLICIT(function_name, unwinder_name, function) \
auto function_name = (function); \
mrc::Unwinder<decltype(function_name)> unwinder_name(std::addressof(function_name))
#define MRC_UNWIND_AUTO(function) MRC_UNWIND(MRC_UNIQUE_VAR_NAME(__un_obj_), function)

template <typename T>
std::pair<std::set<T>, std::set<T>> set_compare(const std::set<T>& cur_set, const std::set<T>& new_set)
Expand Down
Loading

0 comments on commit 3d34d6c

Please sign in to comment.