Skip to content

Commit

Permalink
Add shutdown support memory arbitrator
Browse files Browse the repository at this point in the history
Summary:
There are bunch of flaky mock arbitrator tests which are due to the background global arbitration
which can continue to run after the test finishes.
In general we need the shutdown procedure to handle the shared arbitrator destruction properly.
This PR adds shutdown support in shared arbitrator and is invoked from memory manager. This fixes
plus some test fixes solves all existing flakiness in mock arbitrator tests in Meta.

Differential Revision: D64742451
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 22, 2024
1 parent bb3b7ca commit 1e58e15
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 30 deletions.
2 changes: 2 additions & 0 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
}

MemoryManager::~MemoryManager() {
arbitrator_->shutdown();

if (pools_.size() != 0) {
const auto errMsg = fmt::format(
"pools_.size() != 0 ({} vs {}). There are unexpected alive memory "
Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class NoopArbitrator : public MemoryArbitrator {
return "NOOP";
}

void shutdown() override {}

void addPool(const std::shared_ptr<MemoryPool>& pool) override {
VELOX_CHECK_EQ(pool->capacity(), 0);
growPool(pool.get(), pool->maxCapacity(), 0);
Expand Down
4 changes: 4 additions & 0 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class MemoryArbitrator {

virtual ~MemoryArbitrator() = default;

/// Invoked by the memory manager to shutdown the memory arbitrator to stop
/// serving new memory arbitration requests.
virtual void shutdown() = 0;

/// Invoked by the memory manager to add a newly created memory pool. The
/// memory arbitrator allocates the initial capacity for 'pool' and
/// dynamically adjusts its capacity based query memory needs through memory
Expand Down
72 changes: 43 additions & 29 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,31 @@ SharedArbitrator::SharedArbitrator(const Config& config)
<< participantConfig_.toString();
}

void SharedArbitrator::shutdown() {
{
std::lock_guard<std::mutex> l(stateLock_);
VELOX_CHECK(globalArbitrationWaiters_.empty());
if (shutdown_) {
return;
}
shutdown_ = true;
}

shutdownGlobalArbitration();

VELOX_MEM_LOG(INFO) << "Stopping memory reclaim executor '"
<< memoryReclaimExecutor_->getName() << "': threads: "
<< memoryReclaimExecutor_->numActiveThreads() << "/"
<< memoryReclaimExecutor_->numThreads()
<< ", task queue: "
<< memoryReclaimExecutor_->getTaskQueueSize();
memoryReclaimExecutor_.reset();
VELOX_MEM_LOG(INFO) << "Memory reclaim executor stopped";

VELOX_CHECK_EQ(
participants_.size(), 0, "Unexpected alive participants on destruction");
}

void SharedArbitrator::setupGlobalArbitration() {
if (!globalArbitrationEnabled_) {
return;
Expand Down Expand Up @@ -291,14 +316,6 @@ void SharedArbitrator::shutdownGlobalArbitration() {

VELOX_CHECK(!globalArbitrationAbortCapacityLimits_.empty());
VELOX_CHECK_NOT_NULL(globalArbitrationController_);
{
std::lock_guard<std::mutex> l(stateLock_);
// We only expect stop global arbitration once during velox runtime
// shutdown.
VELOX_CHECK(!globalArbitrationStop_);
VELOX_CHECK(globalArbitrationWaiters_.empty());
globalArbitrationStop_ = true;
}

VELOX_MEM_LOG(INFO) << "Stopping global arbitration controller";
globalArbitrationThreadCv_.notify_one();
Expand All @@ -315,19 +332,7 @@ void SharedArbitrator::wakeupGlobalArbitrationThread() {
}

SharedArbitrator::~SharedArbitrator() {
shutdownGlobalArbitration();

VELOX_MEM_LOG(INFO) << "Stopping memory reclaim executor '"
<< memoryReclaimExecutor_->getName() << "': threads: "
<< memoryReclaimExecutor_->numActiveThreads() << "/"
<< memoryReclaimExecutor_->numThreads()
<< ", task queue: "
<< memoryReclaimExecutor_->getTaskQueueSize();
memoryReclaimExecutor_.reset();
VELOX_MEM_LOG(INFO) << "Memory reclaim executor stopped";

VELOX_CHECK_EQ(
participants_.size(), 0, "Unexpected alive participants on destruction");
shutdown();

if (freeNonReservedCapacity_ + freeReservedCapacity_ != capacity_) {
const std::string errMsg = fmt::format(
Expand Down Expand Up @@ -393,6 +398,8 @@ void SharedArbitrator::finishArbitration(ArbitrationOperation* op) {
}

void SharedArbitrator::addPool(const std::shared_ptr<MemoryPool>& pool) {
checkRunning();

VELOX_CHECK_EQ(pool->capacity(), 0);

auto newParticipant = ArbitrationParticipant::create(
Expand Down Expand Up @@ -439,6 +446,8 @@ void SharedArbitrator::addPool(const std::shared_ptr<MemoryPool>& pool) {
}

void SharedArbitrator::removePool(MemoryPool* pool) {
checkRunning();

VELOX_CHECK_EQ(pool->reservedBytes(), 0);
const uint64_t freedBytes = shrinkPool(pool, 0);
VELOX_CHECK_EQ(pool->capacity(), 0);
Expand Down Expand Up @@ -512,8 +521,8 @@ std::optional<ArbitrationCandidate> SharedArbitrator::findAbortCandidate(
candidateIdx = i;
continue;
}
// With the same capacity size bucket, we favor the old participant to let
// long running query proceed first.
// With the same capacity size bucket, we favor the old participant to
// let long running query proceed first.
if (candidates[candidateIdx].participant->id() <
candidates[i].participant->id()) {
candidateIdx = i;
Expand Down Expand Up @@ -608,6 +617,8 @@ uint64_t SharedArbitrator::allocateCapacityLocked(
uint64_t SharedArbitrator::shrinkCapacity(
MemoryPool* pool,
uint64_t /*unused*/) {
checkRunning();

VELOX_CHECK(pool->isRoot());
auto participant = getParticipant(pool->name());
VELOX_CHECK(participant.has_value());
Expand All @@ -618,6 +629,8 @@ uint64_t SharedArbitrator::shrinkCapacity(
uint64_t requestBytes,
bool allowSpill,
bool allowAbort) {
checkRunning();

const uint64_t targetBytes = requestBytes == 0 ? capacity_ : requestBytes;
ScopedMemoryArbitrationContext abitrationCtx{};
const uint64_t startTimeMs = getCurrentTimeMs();
Expand Down Expand Up @@ -663,6 +676,8 @@ ArbitrationOperation SharedArbitrator::createArbitrationOperation(
}

bool SharedArbitrator::growCapacity(MemoryPool* pool, uint64_t requestBytes) {
checkRunning();

VELOX_CHECK(pool->isRoot());
auto op = createArbitrationOperation(pool, requestBytes);
ScopedArbitration scopedArbitration(this, &op);
Expand Down Expand Up @@ -711,8 +726,8 @@ bool SharedArbitrator::growCapacity(ArbitrationOperation& op) {
if (!globalArbitrationEnabled_ &&
op.participant()->reclaimableUsedCapacity() >=
participantConfig_.minReclaimBytes) {
// NOTE: if global memory arbitration is not enabled, we will try to reclaim
// from the participant itself before failing this operation.
// NOTE: if global memory arbitration is not enabled, we will try to
// reclaim from the participant itself before failing this operation.
reclaim(
op.participant(),
op.requestBytes(),
Expand Down Expand Up @@ -803,10 +818,9 @@ void SharedArbitrator::globalArbitrationMain() {
while (true) {
{
std::unique_lock l(stateLock_);
globalArbitrationThreadCv_.wait(l, [&] {
return globalArbitrationStop_ || !globalArbitrationWaiters_.empty();
});
if (globalArbitrationStop_) {
globalArbitrationThreadCv_.wait(
l, [&] { return shutdown_ || !globalArbitrationWaiters_.empty(); });
if (shutdown_) {
VELOX_CHECK(globalArbitrationWaiters_.empty());
break;
}
Expand Down
10 changes: 9 additions & 1 deletion velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ class SharedArbitrator : public memory::MemoryArbitrator {

static void unregisterFactory();

void shutdown() override;

void addPool(const std::shared_ptr<MemoryPool>& pool) final;

void removePool(MemoryPool* pool) final;
Expand Down Expand Up @@ -283,6 +285,11 @@ class SharedArbitrator : public memory::MemoryArbitrator {
const memory::ScopedMemoryArbitrationContext arbitrationCtx_{};
};

FOLLY_ALWAYS_INLINE void checkRunning() {
std::lock_guard<std::mutex> l(stateLock_);
VELOX_CHECK(!shutdown_, "SharedArbitrator is not running");
}

// Invoked to get the arbitration participant by 'name'. The function returns
// std::nullopt if the underlying query memory pool is destroyed.
std::optional<ScopedArbitrationParticipant> getParticipant(
Expand Down Expand Up @@ -553,10 +560,11 @@ class SharedArbitrator : public memory::MemoryArbitrator {
// Lock used to protect the arbitrator internal state.
mutable std::mutex stateLock_;

bool shutdown_{false};

tsan_atomic<uint64_t> freeReservedCapacity_{0};
tsan_atomic<uint64_t> freeNonReservedCapacity_{0};

bool globalArbitrationStop_{false};
// Indicates if the global arbitration is currently running or not.
tsan_atomic<bool> globalArbitrationRunning_{false};

Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/tests/ArbitrationParticipantTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class TestArbitrator : public MemoryArbitrator {
.capacity = config.capacity,
.extraConfigs = config.extraConfigs}) {}

void shutdown() override {}

void addPool(const std::shared_ptr<MemoryPool>& /*unused*/) override {}

void removePool(MemoryPool* /*unused*/) override {}
Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/tests/MemoryArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ class FakeTestArbitrator : public MemoryArbitrator {
return "USER";
}

void shutdown() override {}

void addPool(const std::shared_ptr<MemoryPool>& /*unused*/) override {}

void removePool(MemoryPool* /*unused*/) override {}
Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class FakeTestArbitrator : public MemoryArbitrator {
.extraConfigs = config.extraConfigs}),
injectAddPoolFailure_(injectAddPoolFailure) {}

void shutdown() override {}

void addPool(const std::shared_ptr<MemoryPool>& /*unused*/) override {
VELOX_CHECK(!injectAddPoolFailure_, "Failed to add pool");
}
Expand Down
85 changes: 85 additions & 0 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2268,6 +2268,91 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, arbitrationAbort) {
ASSERT_EQ(task3->capacity(), memoryCapacity / 4);
}

TEST_F(MockSharedArbitrationTest, shutdown) {
uint64_t memoryCapacity = 256 * MB;
setupMemory(memoryCapacity);
arbitrator_->shutdown();
// double shutdown.
arbitrator_->shutdown();
// Check APIs.
// NOTE: the arbitrator running is first check for external APIs.
VELOX_ASSERT_THROW(
arbitrator_->addPool(nullptr), "SharedArbitrator is not running");
VELOX_ASSERT_THROW(
arbitrator_->removePool(nullptr), "SharedArbitrator is not running");
VELOX_ASSERT_THROW(
arbitrator_->growCapacity(nullptr, 0), "SharedArbitrator is not running");
VELOX_ASSERT_THROW(
arbitrator_->shrinkCapacity(nullptr, 0),
"SharedArbitrator is not running");

auto arbitratorHelper = test::SharedArbitratorTestHelper(arbitrator_);
ASSERT_TRUE(arbitratorHelper.shutdown());
}

TEST_F(MockSharedArbitrationTest, shutdownWait) {
uint64_t memoryCapacity = 256 * MB;
setupMemory(
memoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1.0, nullptr, true, 2'000);
std::shared_ptr<MockTask> task1 = addTask(memoryCapacity);
auto* op1 = task1->addMemoryOp(true);
op1->allocate(memoryCapacity / 2);
ASSERT_EQ(task1->capacity(), memoryCapacity / 2);

std::shared_ptr<MockTask> task2 = addTask(memoryCapacity);
auto* op2 = task2->addMemoryOp(true);
op2->allocate(memoryCapacity / 2);
ASSERT_EQ(task2->capacity(), memoryCapacity / 2);

folly::EventCount globalArbitrationStarted;
std::atomic_bool globalArbitrationStartedFlag{false};
folly::EventCount globalArbitrationWait;
std::atomic_bool globalArbitrationWaitFlag{true};
SCOPED_TESTVALUE_SET(
"facebook::velox::memory::SharedArbitrator::runGlobalArbitration",
std::function<void(const SharedArbitrator*)>(
([&](const SharedArbitrator* arbitrator) {
test::SharedArbitratorTestHelper arbitratorHelper(
const_cast<SharedArbitrator*>(arbitrator));
ASSERT_EQ(arbitratorHelper.numGlobalArbitrationWaiters(), 1);
globalArbitrationStartedFlag = true;
globalArbitrationStarted.notifyAll();
globalArbitrationWait.await(
[&]() { return !globalArbitrationWaitFlag.load(); });
})));
VELOX_ASSERT_THROW(op1->allocate(memoryCapacity / 4), "timeout");
globalArbitrationStarted.await(
[&]() { return globalArbitrationStartedFlag.load(); });

op2->freeAll();
task2.reset();
op1->freeAll();
task1.reset();

test::SharedArbitratorTestHelper arbitratorHelper(
const_cast<SharedArbitrator*>(arbitrator_));
ASSERT_FALSE(arbitratorHelper.shutdown());

std::atomic_bool shutdownCompleted{false};
std::thread shutdownThread([&]() {
arbitrator_->shutdown();
shutdownCompleted = true;
});

std::this_thread::sleep_for(std::chrono::seconds(2)); // NOLINT
ASSERT_FALSE(shutdownCompleted);
ASSERT_TRUE(arbitratorHelper.globalArbitrationRunning());
ASSERT_TRUE(arbitratorHelper.shutdown());

globalArbitrationWaitFlag = false;
globalArbitrationWait.notifyAll();

arbitratorHelper.waitForGlobalArbitrationToFinish();
shutdownThread.join();
ASSERT_TRUE(shutdownCompleted);
ASSERT_TRUE(arbitratorHelper.shutdown());
}

TEST_F(MockSharedArbitrationTest, memoryPoolAbortCapacityLimit) {
const int64_t memoryCapacity = 256 << 20;

Expand Down
5 changes: 5 additions & 0 deletions velox/common/memory/tests/SharedArbitratorTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class SharedArbitratorTestHelper {
return arbitrator_->globalArbitrationController_.get();
}

bool shutdown() const {
std::lock_guard<std::mutex> l(arbitrator_->stateLock_);
return arbitrator_->shutdown_;
}

private:
SharedArbitrator* const arbitrator_;
};
Expand Down

0 comments on commit 1e58e15

Please sign in to comment.