Skip to content

Commit

Permalink
Merge pull request #471 from dagardner-nv/branch-24.06-merge-24.03
Browse files Browse the repository at this point in the history
Forward-merge branch-24.03 into branch-24.06
  • Loading branch information
dagardner-nv committed Apr 11, 2024
2 parents 0701b13 + 2308e23 commit 41afd96
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 78 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:

checks:
needs: [prepare]
if: ${{ ! fromJSON(needs.prepare.outputs.has_skip_ci_label) }}
if: ${{ !fromJSON(needs.prepare.outputs.has_skip_ci_label) && fromJSON(needs.prepare.outputs.is_pr )}}
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
Expand Down
38 changes: 38 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,41 @@
# MRC 24.03.00 (7 Apr 2024)

## 🚨 Breaking Changes

- Update cast_from_pyobject to throw on unsupported types rather than returning null ([#451](https://github.com/nv-morpheus/MRC/pull/451)) [@dagardner-nv](https://github.com/dagardner-nv)
- RAPIDS 24.02 Upgrade ([#433](https://github.com/nv-morpheus/MRC/pull/433)) [@cwharris](https://github.com/cwharris)

## 🐛 Bug Fixes

- Update CR year ([#460](https://github.com/nv-morpheus/MRC/pull/460)) [@dagardner-nv](https://github.com/dagardner-nv)
- Removing the INFO log when creating an AsyncioRunnable ([#456](https://github.com/nv-morpheus/MRC/pull/456)) [@mdemoret-nv](https://github.com/mdemoret-nv)
- Update cast_from_pyobject to throw on unsupported types rather than returning null ([#451](https://github.com/nv-morpheus/MRC/pull/451)) [@dagardner-nv](https://github.com/dagardner-nv)
- Adopt updated builds of CI runners ([#442](https://github.com/nv-morpheus/MRC/pull/442)) [@dagardner-nv](https://github.com/dagardner-nv)
- Update Conda channels to prioritize `conda-forge` over `nvidia` ([#436](https://github.com/nv-morpheus/MRC/pull/436)) [@cwharris](https://github.com/cwharris)
- Remove redundant copy of libmrc_pymrc.so ([#429](https://github.com/nv-morpheus/MRC/pull/429)) [@dagardner-nv](https://github.com/dagardner-nv)
- Unifying cmake exports name across all Morpheus repos ([#427](https://github.com/nv-morpheus/MRC/pull/427)) [@mdemoret-nv](https://github.com/mdemoret-nv)
- Updating the workspace settings to remove deprecated python options ([#425](https://github.com/nv-morpheus/MRC/pull/425)) [@mdemoret-nv](https://github.com/mdemoret-nv)
- Use `dependencies.yaml` to generate environment files ([#416](https://github.com/nv-morpheus/MRC/pull/416)) [@cwharris](https://github.com/cwharris)

## 📖 Documentation

- Update minimum requirements ([#467](https://github.com/nv-morpheus/MRC/pull/467)) [@dagardner-nv](https://github.com/dagardner-nv)

## 🚀 New Features

- Add maximum simultaneous tasks support to `TaskContainer` ([#464](https://github.com/nv-morpheus/MRC/pull/464)) [@cwharris](https://github.com/cwharris)
- Add `TestScheduler` to support testing time-based coroutines without waiting for timeouts ([#453](https://github.com/nv-morpheus/MRC/pull/453)) [@cwharris](https://github.com/cwharris)
- Adding RoundRobinRouter node type for distributing values to downstream nodes ([#449](https://github.com/nv-morpheus/MRC/pull/449)) [@mdemoret-nv](https://github.com/mdemoret-nv)
- Add IoScheduler to enable epoll-based Task scheduling ([#448](https://github.com/nv-morpheus/MRC/pull/448)) [@cwharris](https://github.com/cwharris)
- Update ops-bot.yaml ([#446](https://github.com/nv-morpheus/MRC/pull/446)) [@AyodeAwe](https://github.com/AyodeAwe)
- RAPIDS 24.02 Upgrade ([#433](https://github.com/nv-morpheus/MRC/pull/433)) [@cwharris](https://github.com/cwharris)

## 🛠️ Improvements

- Update MRC to use CCCL instead of libcudacxx ([#444](https://github.com/nv-morpheus/MRC/pull/444)) [@cwharris](https://github.com/cwharris)
- Optionally skip the CI pipeline if the PR contains the skip-ci label ([#426](https://github.com/nv-morpheus/MRC/pull/426)) [@dagardner-nv](https://github.com/dagardner-nv)
- Add flake8, yapf, and isort pre-commit hooks. ([#420](https://github.com/nv-morpheus/MRC/pull/420)) [@cwharris](https://github.com/cwharris)

# MRC 23.11.00 (30 Nov 2023)

## 🐛 Bug Fixes
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ MRC includes both Python and C++ bindings and supports installation via [conda](

### Prerequisites

- Pascal architecture (Compute capability 6.0) or better
- NVIDIA driver `450.80.02` or higher
- Volta architecture (Compute capability 7.0) or better
- [CUDA 12.1](https://developer.nvidia.com/cuda-12-1-0-download-archive)
- [conda or miniconda](https://conda.io/projects/conda/en/latest/user-guide/install/linux.html)
- If using Docker:
- [Docker](https://docs.docker.com/get-docker/)
Expand Down
1 change: 0 additions & 1 deletion cpp/mrc/benchmarks/bench_baselines.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "mrc/benchmarking/util.hpp"

#include <benchmark/benchmark.h>
#include <nlohmann/json.hpp>
#include <rxcpp/rx.hpp>

#include <chrono>
Expand Down
34 changes: 16 additions & 18 deletions cpp/mrc/include/mrc/coroutines/task_container.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -37,15 +37,14 @@
*/

#pragma once

#include "mrc/coroutines/task.hpp"

#include <atomic>
#include <cstddef>
#include <list>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <vector>

namespace mrc::coroutines {
Expand All @@ -60,7 +59,7 @@ class TaskContainer
* @param e Tasks started in the container are scheduled onto this executor. For tasks created
* from a coro::io_scheduler, this would usually be that coro::io_scheduler instance.
*/
TaskContainer(std::shared_ptr<Scheduler> e);
TaskContainer(std::shared_ptr<Scheduler> e, std::size_t max_concurrent_tasks = 0);

TaskContainer(const TaskContainer&) = delete;
TaskContainer(TaskContainer&&) = delete;
Expand Down Expand Up @@ -93,30 +92,20 @@ class TaskContainer
*/
auto garbage_collect() -> std::size_t;

/**
* @return The number of tasks that are awaiting deletion.
*/
auto delete_task_size() const -> std::size_t;

/**
* @return True if there are no tasks awaiting deletion.
*/
auto delete_tasks_empty() const -> bool;

/**
* @return The number of active tasks in the container.
*/
auto size() const -> std::size_t;
auto size() -> std::size_t;

/**
* @return True if there are no active tasks in the container.
*/
auto empty() const -> bool;
auto empty() -> bool;

/**
* @return The capacity of this task manager before it will need to grow in size.
*/
auto capacity() const -> std::size_t;
auto capacity() -> std::size_t;

/**
* Will continue to garbage collect and yield until all tasks are complete. This method can be
Expand All @@ -138,6 +127,11 @@ class TaskContainer
*/
auto gc_internal() -> std::size_t;

/**
* Starts the next taks in the queue if one is available and max concurrent tasks has not yet been met.
*/
void try_start_next_task(std::unique_lock<std::mutex> lock);

/**
* Encapsulate the users tasks in a cleanup task which marks itself for deletion upon
* completion. Simply co_await the users task until its completed and then mark the given
Expand All @@ -156,7 +150,7 @@ class TaskContainer
/// thread pools for indeterminate lifetime requests.
std::mutex m_mutex{};
/// The number of alive tasks.
std::atomic<std::size_t> m_size{};
std::size_t m_size{};
/// Maintains the lifetime of the tasks until they are completed.
std::list<std::optional<Task<void>>> m_tasks{};
/// The set of tasks that have completed and need to be deleted.
Expand All @@ -166,6 +160,10 @@ class TaskContainer
std::shared_ptr<Scheduler> m_scheduler_lifetime{nullptr};
/// This is used internally since io_scheduler cannot pass itself in as a shared_ptr.
Scheduler* m_scheduler{nullptr};
/// tasks to be processed in order of start
std::queue<decltype(m_tasks.end())> m_next_tasks;
/// maximum number of tasks to be run simultaneously
std::size_t m_max_concurrent_tasks;

friend Scheduler;
};
Expand Down
7 changes: 7 additions & 0 deletions cpp/mrc/include/mrc/coroutines/test_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ class TestScheduler : public Scheduler
*/
mrc::coroutines::Task<> yield_until(std::chrono::time_point<std::chrono::steady_clock> time) override;

/**
* Returns the time according to the scheduler. Time may be progressed by resume_next, resume_for, and resume_until.
*
* @return the current time according to the scheduler.
*/
std::chrono::time_point<std::chrono::steady_clock> time();

/**
* Immediately resumes the next-in-queue coroutine handle.
*
Expand Down
102 changes: 61 additions & 41 deletions cpp/mrc/src/public/coroutines/task_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@

namespace mrc::coroutines {

TaskContainer::TaskContainer(std::shared_ptr<Scheduler> e) :
TaskContainer::TaskContainer(std::shared_ptr<Scheduler> e, std::size_t max_concurrent_tasks) :
m_scheduler_lifetime(std::move(e)),
m_scheduler(m_scheduler_lifetime.get())
m_scheduler(m_scheduler_lifetime.get()),
m_max_concurrent_tasks(max_concurrent_tasks)
{
if (m_scheduler_lifetime == nullptr)
{
Expand All @@ -43,17 +44,17 @@ TaskContainer::TaskContainer(std::shared_ptr<Scheduler> e) :
TaskContainer::~TaskContainer()
{
// This will hang the current thread.. but if tasks are not complete thats also pretty bad.
while (!this->empty())
while (not empty())
{
this->garbage_collect();
garbage_collect();
}
}

auto TaskContainer::start(Task<void>&& user_task, GarbageCollectPolicy cleanup) -> void
{
m_size.fetch_add(1, std::memory_order::relaxed);
auto lock = std::unique_lock(m_mutex);

std::scoped_lock lk{m_mutex};
m_size += 1;

if (cleanup == GarbageCollectPolicy::yes)
{
Expand All @@ -64,48 +65,42 @@ auto TaskContainer::start(Task<void>&& user_task, GarbageCollectPolicy cleanup)
auto pos = m_tasks.emplace(m_tasks.end(), std::nullopt);
auto task = make_cleanup_task(std::move(user_task), pos);
*pos = std::move(task);
m_next_tasks.push(pos);

// Start executing from the cleanup task to schedule the user's task onto the thread pool.
pos->value().resume();
auto current_task_count = m_size - m_next_tasks.size();

if (m_max_concurrent_tasks == 0 or current_task_count < m_max_concurrent_tasks)
{
try_start_next_task(std::move(lock));
}
}

auto TaskContainer::garbage_collect() -> std::size_t
{
std::scoped_lock lk{m_mutex};
auto lock = std::scoped_lock(m_mutex);
return gc_internal();
}

auto TaskContainer::delete_task_size() const -> std::size_t
{
std::atomic_thread_fence(std::memory_order::acquire);
return m_tasks_to_delete.size();
}

auto TaskContainer::delete_tasks_empty() const -> bool
auto TaskContainer::size() -> std::size_t
{
std::atomic_thread_fence(std::memory_order::acquire);
return m_tasks_to_delete.empty();
auto lock = std::scoped_lock(m_mutex);
return m_size;
}

auto TaskContainer::size() const -> std::size_t
{
return m_size.load(std::memory_order::relaxed);
}

auto TaskContainer::empty() const -> bool
auto TaskContainer::empty() -> bool
{
return size() == 0;
}

auto TaskContainer::capacity() const -> std::size_t
auto TaskContainer::capacity() -> std::size_t
{
std::atomic_thread_fence(std::memory_order::acquire);
auto lock = std::scoped_lock(m_mutex);
return m_tasks.size();
}

auto TaskContainer::garbage_collect_and_yield_until_empty() -> Task<void>
{
while (!empty())
while (not empty())
{
garbage_collect();
co_await m_scheduler->yield();
Expand All @@ -115,22 +110,44 @@ auto TaskContainer::garbage_collect_and_yield_until_empty() -> Task<void>
TaskContainer::TaskContainer(Scheduler& e) : m_scheduler(&e) {}
auto TaskContainer::gc_internal() -> std::size_t
{
std::size_t deleted{0};
if (!m_tasks_to_delete.empty())
if (m_tasks_to_delete.empty())
{
return 0;
}

std::size_t delete_count = m_tasks_to_delete.size();

for (const auto& pos : m_tasks_to_delete)
{
for (const auto& pos : m_tasks_to_delete)
// Destroy the cleanup task and the user task.
if (pos->has_value())
{
// Destroy the cleanup task and the user task.
if (pos->has_value())
{
pos->value().destroy();
}
m_tasks.erase(pos);
pos->value().destroy();
}
deleted = m_tasks_to_delete.size();
m_tasks_to_delete.clear();

m_tasks.erase(pos);
}

m_tasks_to_delete.clear();

return delete_count;
}

void TaskContainer::try_start_next_task(std::unique_lock<std::mutex> lock)
{
if (m_next_tasks.empty())
{
// no tasks to process
return;
}
return deleted;

auto pos = m_next_tasks.front();
m_next_tasks.pop();

// release the lock before starting the task
lock.unlock();

pos->value().resume();
}

auto TaskContainer::make_cleanup_task(Task<void> user_task, task_position_t pos) -> Task<void>
Expand All @@ -155,11 +172,14 @@ auto TaskContainer::make_cleanup_task(Task<void> user_task, task_position_t pos)
LOG(ERROR) << "coro::task_container user_task had unhandle exception, not derived from std::exception.\n";
}

std::scoped_lock lk{m_mutex};
auto lock = std::unique_lock(m_mutex);
m_tasks_to_delete.push_back(pos);
// This has to be done within scope lock to make sure this coroutine task completes before the
// task container object destructs -- if it was waiting on .empty() to become true.
m_size.fetch_sub(1, std::memory_order::relaxed);
m_size -= 1;

try_start_next_task(std::move(lock));

co_return;
}

Expand Down
13 changes: 13 additions & 0 deletions cpp/mrc/src/public/coroutines/test_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "mrc/coroutines/test_scheduler.hpp"

#include <chrono>
#include <compare>

namespace mrc::coroutines {
Expand Down Expand Up @@ -56,8 +57,15 @@ mrc::coroutines::Task<> TestScheduler::yield_until(std::chrono::time_point<std::
co_return co_await TestScheduler::Operation{this, time};
}

std::chrono::time_point<std::chrono::steady_clock> TestScheduler::time()
{
return m_time;
}

bool TestScheduler::resume_next()
{
using namespace std::chrono_literals;

if (m_queue.empty())
{
return false;
Expand All @@ -69,6 +77,11 @@ bool TestScheduler::resume_next()

m_time = handle.second;

if (not m_queue.empty())
{
m_time = m_queue.top().second;
}

handle.first.resume();

return true;
Expand Down
Loading

0 comments on commit 41afd96

Please sign in to comment.