Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add map_async and flat_map_async Operators for Python Async Generators #392

Open
wants to merge 24 commits into
base: branch-23.11
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions cpp/mrc/include/mrc/runnable/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

#pragma once

#include <glog/logging.h>
#include "mrc/types.hpp" // for Future

#include <cstddef>
#include <exception>
#include <sstream>
#include <string>
#include <glog/logging.h> // for CHECK, COMPACT_GOOGLE_LOG_FATAL, LogMessag...

#include <cstddef> // for size_t
#include <exception> // for exception_ptr
#include <functional> // for function
#include <sstream> // for stringstream
#include <string> // for allocator, string

namespace mrc::runnable {

class IEngine;
class Runner;
enum class EngineType;

Expand All @@ -41,7 +45,7 @@ class Context
{
public:
Context() = delete;
Context(std::size_t rank, std::size_t size);
Context(const Runner& runner, IEngine& engine, std::size_t rank, std::size_t size);
virtual ~Context() = default;

EngineType execution_context() const;
Expand All @@ -54,6 +58,8 @@ class Context
void barrier();
void yield();

Future<void> launch_fiber(std::function<void()> task);

const std::string& info() const;

template <typename ContextT>
Expand All @@ -69,7 +75,7 @@ class Context
void set_exception(std::exception_ptr exception_ptr);

protected:
void init(const Runner& runner);
void start();
bool status() const;
void finish();
virtual void init_info(std::stringstream& ss);
Expand All @@ -79,7 +85,8 @@ class Context
std::size_t m_size;
std::string m_info{"Uninitialized Context"};
std::exception_ptr m_exception_ptr{nullptr};
const Runner* m_runner{nullptr};
const Runner& m_runner;
IEngine& m_engine;

virtual void do_lock() = 0;
virtual void do_unlock() = 0;
Expand Down
10 changes: 2 additions & 8 deletions cpp/mrc/include/mrc/runnable/detail/type_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,10 @@
return std::make_pair(self<ctx_thread>{}, self<T>{});
}

static auto unwrap_context(l2_concept c, Context& t)
{
return std::make_pair(self<Context>{}, self<Context>{});
}

template <typename T>
static error unwrap_context(error e, T& t)
static auto unwrap_context(l2_concept c, T& t)
{
static_assert(invalid_concept<T>::error, "object is not a Context");
return {};
return std::make_pair(self<T>{}, self<T>{});

Check warning on line 91 in cpp/mrc/include/mrc/runnable/detail/type_traits.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/runnable/detail/type_traits.hpp#L91

Added line #L91 was not covered by tests
}

template <typename T>
Expand Down
2 changes: 2 additions & 0 deletions cpp/mrc/include/mrc/runnable/engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "mrc/core/fiber_meta_data.hpp"
#include "mrc/core/fiber_pool.hpp"
#include "mrc/core/task_queue.hpp"
#include "mrc/runnable/context.hpp"
#include "mrc/runnable/launch_options.hpp"
#include "mrc/types.hpp"

Expand Down Expand Up @@ -54,6 +55,7 @@ class IEngine
virtual Future<void> launch_task(std::function<void()> task) = 0;

friend Runner;
friend Context;
};

/**
Expand Down
31 changes: 19 additions & 12 deletions cpp/mrc/include/mrc/runnable/launch_control.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ class LaunchControl final
// engines are out way of running some task on the specified backend
std::shared_ptr<IEngines> engines = build_engines(options);

// create runner
auto runner = runnable::make_runner(std::move(runnable));

// make contexts
std::vector<std::shared_ptr<Context>> contexts;
if constexpr (is_fiber_runnable_v<RunnableT>)
Expand All @@ -113,6 +116,7 @@ class LaunchControl final
"ThreadEngine";

contexts = make_contexts<FiberContext<ContextWrapperT<context_t>>>(
*runner,
*engines,
std::forward<ContextArgsT>(context_args)...);
}
Expand All @@ -123,6 +127,7 @@ class LaunchControl final
"to be run on a "
"FiberEngine";
contexts = make_contexts<ThreadContext<ContextWrapperT<context_t>>>(
*runner,
*engines,
std::forward<ContextArgsT>(context_args)...);
}
Expand All @@ -132,12 +137,14 @@ class LaunchControl final
if (backend == EngineType::Fiber)
{
contexts = make_contexts<FiberContext<ContextWrapperT<context_t>>>(
*runner,
*engines,
std::forward<ContextArgsT>(context_args)...);
}
else if (backend == EngineType::Thread)
{
contexts = make_contexts<ThreadContext<ContextWrapperT<context_t>>>(
*runner,
*engines,
std::forward<ContextArgsT>(context_args)...);
}
Expand All @@ -147,9 +154,6 @@ class LaunchControl final
}
}

// create runner
auto runner = runnable::make_runner(std::move(runnable));

// construct the launcher
return std::make_unique<Launcher>(std::move(runner), std::move(contexts), std::move(engines));
}
Expand Down Expand Up @@ -204,6 +208,9 @@ class LaunchControl final
// engines are out way of running some task on the specified backend
std::shared_ptr<IEngines> engines = build_engines(options);

// create runner
auto runner = runnable::make_runner(std::move(runnable));

// make contexts
std::vector<std::shared_ptr<Context>> contexts;
if constexpr (is_fiber_runnable_v<RunnableT>)
Expand All @@ -212,27 +219,29 @@ class LaunchControl final
"FiberRunnable to "
"be run on a "
"ThreadEngine";
contexts = make_contexts<context_t>(*engines, std::forward<ContextArgsT>(context_args)...);
contexts = make_contexts<context_t>(*runner, *engines, std::forward<ContextArgsT>(context_args)...);
}
else if constexpr (is_thread_context_v<RunnableT>)
{
CHECK(get_engine_factory(options.engine_factory_name).backend() == EngineType::Thread) << "Requested "
"ThreadRunnable "
"to be run on a "
"FiberEngine";
contexts = make_contexts<context_t>(*engines, std::forward<ContextArgsT>(context_args)...);
contexts = make_contexts<context_t>(*runner, *engines, std::forward<ContextArgsT>(context_args)...);
}
else
{
auto backend = get_engine_factory(options.engine_factory_name).backend();
if (backend == EngineType::Fiber)
{
contexts = make_contexts<FiberContext<context_t>>(*engines,
contexts = make_contexts<FiberContext<context_t>>(*runner,
*engines,
std::forward<ContextArgsT>(context_args)...);
}
else if (backend == EngineType::Thread)
{
contexts = make_contexts<ThreadContext<context_t>>(*engines,
contexts = make_contexts<ThreadContext<context_t>>(*runner,
*engines,
std::forward<ContextArgsT>(context_args)...);
}
else
Expand All @@ -241,9 +250,6 @@ class LaunchControl final
}
}

// create runner
auto runner = runnable::make_runner(std::move(runnable));

// construct the launcher
return std::make_unique<Launcher>(std::move(runner), std::move(contexts), std::move(engines));
}
Expand Down Expand Up @@ -325,14 +331,15 @@ class LaunchControl final
* @return auto
*/
template <typename WrappedContextT, typename... ArgsT>
auto make_contexts(const IEngines& engines, ArgsT&&... args)
auto make_contexts(const Runner& runner, const IEngines& engines, ArgsT&&... args)
{
const auto size = engines.size();
std::vector<std::shared_ptr<Context>> contexts;
auto resources = std::make_shared<typename WrappedContextT::resource_t>(size);
for (std::size_t i = 0; i < size; ++i)
{
contexts.push_back(std::make_shared<WrappedContextT>(resources, i, size, args...));
contexts.push_back(
std::make_shared<WrappedContextT>(resources, runner, *engines.launchers()[i], i, size, args...));
}
return std::move(contexts);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/mrc/include/mrc/runnable/runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class SpecializedRunner : public Runner
auto resources = std::make_shared<typename WrappedContextT::resource_t>(size);
for (std::size_t i = 0; i < size; ++i)
{
contexts.push_back(std::make_shared<WrappedContextT>(resources, i, size, std::forward<ArgsT>(args)...));
contexts.push_back(std::make_shared<WrappedContextT>(resources, i, std::forward<ArgsT>(args)...));
}
return std::move(contexts);
}
Expand Down
9 changes: 9 additions & 0 deletions cpp/mrc/include/mrc/segment/builder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ class IBuilder
typename... ArgsT>
auto make_node(std::string name, ArgsT&&... ops);

template <typename NodeTypeT, typename... ArgsT>
auto make_node_explicit(std::string name, ArgsT&&... ops);

/**
* Creates and returns an instance of a node component with the specified type, name and arguments.
* @tparam SinkTypeT The sink type of the node component to be created.
Expand Down Expand Up @@ -436,6 +439,12 @@ auto IBuilder::make_node(std::string name, ArgsT&&... ops)
return construct_object<NodeTypeT<SinkTypeT, SourceTypeT>>(name, std::forward<ArgsT>(ops)...);
}

template <typename NodeTypeT, typename... ArgsT>
auto IBuilder::make_node_explicit(std::string name, ArgsT&&... ops)
{
return construct_object<NodeTypeT>(name, std::forward<ArgsT>(ops)...);
}

template <typename SinkTypeT, typename SourceTypeT, template <class, class> class NodeTypeT, typename... ArgsT>
auto IBuilder::make_node_component(std::string name, ArgsT&&... ops)
{
Expand Down
9 changes: 7 additions & 2 deletions cpp/mrc/include/mrc/segment/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ class Context : public ContextT
{
public:
template <typename... ArgsT>
Context(std::size_t rank, std::size_t size, std::string name, ArgsT&&... args) :
ContextT(std::move(rank), std::move(size), std::forward<ArgsT>(args)...),
Context(const mrc::runnable::Runner& runner,
mrc::runnable::IEngine& engine,
std::size_t rank,
std::size_t size,
std::string name,
ArgsT&&... args) :
ContextT(runner, engine, std::move(rank), std::move(size), std::forward<ArgsT>(args)...),
m_name(std::move(name))
{
static_assert(std::is_base_of_v<runnable::Context, ContextT>, "ContextT must derive from Context");
Expand Down
13 changes: 3 additions & 10 deletions cpp/mrc/src/internal/runnable/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,16 @@

#include "internal/runnable/engine.hpp"

#include "mrc/types.hpp"
#include "mrc/types.hpp" // for Future

#include <glog/logging.h>

#include <mutex>
#include <ostream>
#include <utility>
#include <mutex> // for mutex, lock_guard
#include <utility> // for move

namespace mrc::runnable {

Future<void> Engine::launch_task(std::function<void()> task)
{
std::lock_guard<decltype(m_mutex)> lock(m_mutex);
if (m_launched)
{
LOG(FATAL) << "detected attempted reuse of a runnable::Engine; this is a fatal error";
}
m_launched = true;
return do_launch_task(std::move(task));
}
Expand Down
46 changes: 33 additions & 13 deletions cpp/mrc/src/public/runnable/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

#include "mrc/runnable/context.hpp"

#include "mrc/runnable/runner.hpp"
#include "mrc/runnable/runner.hpp" // for Runner

#include <boost/fiber/fss.hpp>
#include <glog/logging.h>
#include <boost/fiber/fss.hpp> // for fiber_specific_ptr
#include <boost/fiber/future/async.hpp> // for async
#include <glog/logging.h> // for COMPACT_GOOGLE_LOG_FATAL

#include <cstddef>
#include <exception>
#include <sstream>
#include <string>
#include <utility>
#include <cstddef> // for size_t
#include <exception> // for exception_ptr, current_excep...
#include <sstream> // for operator<<, basic_ostream
#include <string> // for char_traits, operator<<, string
#include <utility> // for move

namespace mrc::runnable {

Expand All @@ -47,7 +48,12 @@

} // namespace

Context::Context(std::size_t rank, std::size_t size) : m_rank(rank), m_size(size) {}
Context::Context(const Runner& runner, IEngine& engine, std::size_t rank, std::size_t size) :
m_runner(runner),
m_engine(engine),
m_rank(rank),
m_size(size)
{}

EngineType Context::execution_context() const
{
Expand Down Expand Up @@ -93,7 +99,23 @@
do_yield();
}

void Context::init(const Runner& runner)
Future<void> Context::launch_fiber(std::function<void()> task)
{
return boost::fibers::async([this, task]() {
auto& fiber_local = FiberLocalContext::get();
fiber_local.reset(new FiberLocalContext());
fiber_local->m_context = this;
try
{
task();
} catch (...)

Check warning on line 111 in cpp/mrc/src/public/runnable/context.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/runnable/context.cpp#L111

Added line #L111 was not covered by tests
{
set_exception(std::current_exception());

Check warning on line 113 in cpp/mrc/src/public/runnable/context.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/runnable/context.cpp#L113

Added line #L113 was not covered by tests
}
});
}

void Context::start()
{
auto& fiber_local = FiberLocalContext::get();
fiber_local.reset(new FiberLocalContext());
Expand All @@ -102,8 +124,6 @@
std::stringstream ss;
this->init_info(ss);
m_info = ss.str();

m_runner = &runner;
}

void Context::finish()
Expand All @@ -127,7 +147,7 @@
if (m_exception_ptr == nullptr)
{
m_exception_ptr = std::move(std::current_exception());
m_runner->kill();
m_runner.kill();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/mrc/src/public/runnable/runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void Runner::enqueue(std::shared_ptr<IEngines> launcher, std::vector<std::shared
auto engine = instance.m_engine;

auto f = engine->launch_task([this, context, &instance] {
context->init(*this);
context->start();
update_state(context->rank(), State::Running);
instance.m_live_promise.set_value();
m_runnable->main(*context);
Expand Down
Loading
Loading