Skip to content

Commit

Permalink
Add TestScheduler to support testing time-based coroutines without …
Browse files Browse the repository at this point in the history
…waiting for timeouts (#453)

Adds a manually driven TestScheduler that can fast-forward through delayed coroutines.

Required for nv-morpheus/Morpheus#1548

Authors:
  - Christopher Harris (https://github.com/cwharris)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #453
  • Loading branch information
cwharris authored Mar 25, 2024
1 parent 9cf1ebc commit bd7955e
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 0 deletions.
1 change: 1 addition & 0 deletions cpp/mrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ add_library(libmrc
src/public/coroutines/io_scheduler.cpp
src/public/coroutines/sync_wait.cpp
src/public/coroutines/task_container.cpp
src/public/coroutines/test_scheduler.cpp
src/public/coroutines/thread_local_context.cpp
src/public/coroutines/thread_pool.cpp
src/public/cuda/device_guard.cpp
Expand Down
105 changes: 105 additions & 0 deletions cpp/mrc/include/mrc/coroutines/test_scheduler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "mrc/coroutines/scheduler.hpp"
#include "mrc/coroutines/task.hpp"

#include <chrono>
#include <coroutine>
#include <queue>
#include <utility>
#include <vector>

#pragma once

namespace mrc::coroutines {

class TestScheduler : public Scheduler
{
private:
struct Operation
{
public:
Operation(TestScheduler* self, std::chrono::time_point<std::chrono::steady_clock> time);

static constexpr bool await_ready()
{
return false;

Check warning on line 41 in cpp/mrc/include/mrc/coroutines/test_scheduler.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/coroutines/test_scheduler.hpp#L41

Added line #L41 was not covered by tests
}

void await_suspend(std::coroutine_handle<> handle);

void await_resume() {}

Check warning on line 46 in cpp/mrc/include/mrc/coroutines/test_scheduler.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/coroutines/test_scheduler.hpp#L46

Added line #L46 was not covered by tests

private:
TestScheduler* m_self;
std::chrono::time_point<std::chrono::steady_clock> m_time;
};

using item_t = std::pair<std::coroutine_handle<>, std::chrono::time_point<std::chrono::steady_clock>>;
struct ItemCompare
{
bool operator()(item_t& lhs, item_t& rhs);
};

std::priority_queue<item_t, std::vector<item_t>, ItemCompare> m_queue;
std::chrono::time_point<std::chrono::steady_clock> m_time = std::chrono::steady_clock::now();

public:
/**
* @brief Enqueue's the coroutine handle to be resumed at the current logical time.
*/
void resume(std::coroutine_handle<> handle) noexcept override;

/**
* Suspends the current function and enqueue's it to be resumed at the current logical time.
*/
mrc::coroutines::Task<> yield() override;

/**
* Suspends the current function and enqueue's it to be resumed at the current logica time + the given duration.
*/
mrc::coroutines::Task<> yield_for(std::chrono::milliseconds time) override;

/**
* Suspends the current function and enqueue's it to be resumed at the given logical time.
*/
mrc::coroutines::Task<> yield_until(std::chrono::time_point<std::chrono::steady_clock> time) override;

/**
* Immediately resumes the next-in-queue coroutine handle.
*
* @return true if more coroutines exist in the queue after resuming, false otherwise.
*/
bool resume_next();

/**
* Immediately resumes next-in-queue coroutines up to the current logical time + the given duration, in-order.
*
* @return true if more coroutines exist in the queue after resuming, false otherwise.
*/
bool resume_for(std::chrono::milliseconds time);

/**
* Immediately resumes next-in-queue coroutines up to the given logical time.
*
* @return true if more coroutines exist in the queue after resuming, false otherwise.
*/
bool resume_until(std::chrono::time_point<std::chrono::steady_clock> time);
};

} // namespace mrc::coroutines
102 changes: 102 additions & 0 deletions cpp/mrc/src/public/coroutines/test_scheduler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "mrc/coroutines/test_scheduler.hpp"

#include <compare>

namespace mrc::coroutines {

TestScheduler::Operation::Operation(TestScheduler* self, std::chrono::time_point<std::chrono::steady_clock> time) :
m_self(self),
m_time(time)

Check warning on line 26 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L24-L26

Added lines #L24 - L26 were not covered by tests
{}

bool TestScheduler::ItemCompare::operator()(item_t& lhs, item_t& rhs)

Check warning on line 29 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L29

Added line #L29 was not covered by tests
{
return lhs.second > rhs.second;

Check warning on line 31 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L31

Added line #L31 was not covered by tests
}

void TestScheduler::Operation::await_suspend(std::coroutine_handle<> handle)

Check warning on line 34 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L34

Added line #L34 was not covered by tests
{
m_self->m_queue.emplace(std::move(handle), m_time);

Check warning on line 36 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L36

Added line #L36 was not covered by tests
}

void TestScheduler::resume(std::coroutine_handle<> handle) noexcept

Check warning on line 39 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L39

Added line #L39 was not covered by tests
{
m_queue.emplace(std::move(handle), std::chrono::steady_clock::now());

Check warning on line 41 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L41

Added line #L41 was not covered by tests
}

mrc::coroutines::Task<> TestScheduler::yield()

Check warning on line 44 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L44

Added line #L44 was not covered by tests
{
co_return co_await TestScheduler::Operation{this, m_time};
}

mrc::coroutines::Task<> TestScheduler::yield_for(std::chrono::milliseconds time)

Check warning on line 49 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L49

Added line #L49 was not covered by tests
{
co_return co_await TestScheduler::Operation{this, m_time + time};
}

mrc::coroutines::Task<> TestScheduler::yield_until(std::chrono::time_point<std::chrono::steady_clock> time)

Check warning on line 54 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L54

Added line #L54 was not covered by tests
{
co_return co_await TestScheduler::Operation{this, time};
}

bool TestScheduler::resume_next()

Check warning on line 59 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L59

Added line #L59 was not covered by tests
{
if (m_queue.empty())

Check warning on line 61 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L61

Added line #L61 was not covered by tests
{
return false;

Check warning on line 63 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L63

Added line #L63 was not covered by tests
}

auto handle = m_queue.top();

Check warning on line 66 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L66

Added line #L66 was not covered by tests

m_queue.pop();

Check warning on line 68 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L68

Added line #L68 was not covered by tests

m_time = handle.second;

Check warning on line 70 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L70

Added line #L70 was not covered by tests

handle.first.resume();

Check warning on line 72 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L72

Added line #L72 was not covered by tests

return true;

Check warning on line 74 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L74

Added line #L74 was not covered by tests
}

bool TestScheduler::resume_for(std::chrono::milliseconds time)

Check warning on line 77 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L77

Added line #L77 was not covered by tests
{
return resume_until(m_time + time);

Check warning on line 79 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L79

Added line #L79 was not covered by tests
}

bool TestScheduler::resume_until(std::chrono::time_point<std::chrono::steady_clock> time)

Check warning on line 82 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L82

Added line #L82 was not covered by tests
{
m_time = time;

Check warning on line 84 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L84

Added line #L84 was not covered by tests

while (not m_queue.empty())

Check warning on line 86 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L86

Added line #L86 was not covered by tests
{
if (m_queue.top().second <= m_time)

Check warning on line 88 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L88

Added line #L88 was not covered by tests
{
m_queue.top().first.resume();
m_queue.pop();

Check warning on line 91 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L90-L91

Added lines #L90 - L91 were not covered by tests
}
else
{
return true;

Check warning on line 95 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L95

Added line #L95 was not covered by tests
}
}

return false;

Check warning on line 99 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L99

Added line #L99 was not covered by tests
}

} // namespace mrc::coroutines

0 comments on commit bd7955e

Please sign in to comment.