Skip to content

Commit

Permalink
MB-48716: Make expiry pager config change update task waketime
Browse files Browse the repository at this point in the history
Recent changes for MB-41403 which ensured the ExpiredItemPager task is
not recreated for every config change.

However, in doing so, changes to the task sleep time configuration were
no longer immediately applied; instead the task had to sleep until it's
existing wake time, to then snooze for the newly configured period.

Resolve this by calling into ExecutorPool to ensure the wake time used
by the pool is directly updated when the config changes.

Change-Id: I0620e9884549da631a419064403f75a753835886
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/163008
Tested-by: Build Bot <[email protected]>
Reviewed-by: Dave Rigby <[email protected]>
  • Loading branch information
jameseh96 authored and daverigby committed Oct 11, 2021
1 parent 584d3a1 commit 1fbba81
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 30 deletions.
46 changes: 38 additions & 8 deletions engines/ep/src/item_pager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,29 +272,52 @@ ExpiredItemPager::ExpiredItemPager(EventuallyPersistentEngine* e,
auto cfg = config.wlock();
cfg->sleepTime = std::chrono::seconds(stime);
cfg->initialRunTime = taskTime;
updateWakeTimeFromCfg(*cfg);
snooze(calculateWakeTimeFromCfg(*cfg).count());
}

void ExpiredItemPager::updateSleepTime(std::chrono::seconds sleepTime) {
auto cfg = config.wlock();
cfg->sleepTime = sleepTime;
updateWakeTimeFromCfg(*cfg);
ExecutorPool::get()->snooze(getId(),
calculateWakeTimeFromCfg(*cfg).count());
}
void ExpiredItemPager::updateInitialRunTime(ssize_t initialRunTime) {
auto cfg = config.wlock();
cfg->initialRunTime = initialRunTime;
updateWakeTimeFromCfg(*cfg);
ExecutorPool::get()->snooze(getId(),
calculateWakeTimeFromCfg(*cfg).count());
}

std::chrono::seconds ExpiredItemPager::getSleepTime() const {
return config.rlock()->sleepTime;
}

bool ExpiredItemPager::enable() {
auto cfg = config.wlock();
if (cfg->enabled) {
return false;
}
cfg->enabled = true;
snooze(calculateWakeTimeFromCfg(*cfg).count());
ExecutorPool::get()->schedule(shared_from_this());
return true;
}

bool ExpiredItemPager::disable() {
auto cfg = config.wlock();
if (!cfg->enabled) {
return false;
}
cfg->enabled = false;
ExecutorPool::get()->cancel(getId());
return true;
}

bool ExpiredItemPager::isEnabled() const {
return config.rlock()->enabled;
}

void ExpiredItemPager::updateWakeTimeFromCfg(
std::chrono::seconds ExpiredItemPager::calculateWakeTimeFromCfg(
const ExpiredItemPager::Config& cfg) {
auto initialSleep = double(cfg.sleepTime.count());
if (cfg.initialRunTime != -1) {
Expand Down Expand Up @@ -325,8 +348,9 @@ void ExpiredItemPager::updateWakeTimeFromCfg(

initialSleep = difftime(mktime(&timeTarget), mktime(&timeNow));
}
snooze(initialSleep);
updateExpPagerTime(initialSleep);
using namespace std::chrono;
return duration_cast<seconds>(duration<double>(initialSleep));
}

bool ExpiredItemPager::run() {
Expand Down Expand Up @@ -373,9 +397,15 @@ bool ExpiredItemPager::run() {
maxExpectedDurationForVisitorTask);
}
}
auto sleepTime = config.rlock()->sleepTime;
snooze(sleepTime.count());
updateExpPagerTime(sleepTime.count());
{
// hold the lock while calling snooze - avoids a config change updating
// the sleep time immediately after we read it, then this snooze
// here overwriting the wake time with the old value
auto cfg = config.rlock();
auto sleepTime = cfg->sleepTime.count();
snooze(sleepTime);
updateExpPagerTime(sleepTime);
}

return true;
}
Expand Down
25 changes: 16 additions & 9 deletions engines/ep/src/item_pager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <folly/Synchronized.h>
#include <memcached/types.h> // for ssize_t
#include <chrono>
#include <memory>

typedef std::pair<int64_t, int64_t> row_range_t;

Expand Down Expand Up @@ -134,7 +135,8 @@ class ItemPager : public GlobalTask {
* Dispatcher job responsible for purging expired items from
* memory and disk.
*/
class ExpiredItemPager : public GlobalTask {
class ExpiredItemPager : public GlobalTask,
public std::enable_shared_from_this<ExpiredItemPager> {
public:

/**
Expand Down Expand Up @@ -167,14 +169,20 @@ class ExpiredItemPager : public GlobalTask {
std::chrono::seconds getSleepTime() const;

/**
* Lock the config for writing, and return a handle.
* Enable and schedule the expiry pager task.
*
* Used when other operations need synchronising with config changes
* (e.g., scheduling or cancelling the task)
* @return true if the task was scheduled, false if it was already
* enabled
*/
auto wlockConfig() {
return config.wlock();
}
bool enable();

/**
* Disable and cancel the expiry pager task.
*
* @return true if the task was cancelled, false if it was already
* disabled
*/
bool disable();

bool isEnabled() const;

Expand All @@ -196,8 +204,7 @@ class ExpiredItemPager : public GlobalTask {
ssize_t initialRunTime = -1;
bool enabled = false;
};

void updateWakeTimeFromCfg(const Config& cfg);
std::chrono::seconds calculateWakeTimeFromCfg(const Config& cfg);

folly::Synchronized<Config> config;
/**
Expand Down
15 changes: 2 additions & 13 deletions engines/ep/src/kv_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2120,26 +2120,15 @@ void KVBucket::setExpiryPagerTasktime(ssize_t val) {
}

void KVBucket::enableExpiryPager() {
// hold the config handle while scheduling the task to avoid
// racing with another thread cancelling it.
auto cfg = expiryPagerTask->wlockConfig();
if (cfg->enabled) {
if (!expiryPagerTask->enable()) {
EP_LOG_DEBUG_RAW("Expiry Pager already enabled!");
return;
}
cfg->enabled = true;
ExecutorPool::get()->cancel(expiryPagerTask->getId());
ExecutorPool::get()->schedule(expiryPagerTask);
}

void KVBucket::disableExpiryPager() {
auto cfg = expiryPagerTask->wlockConfig();
if (!cfg->enabled) {
if (!expiryPagerTask->disable()) {
EP_LOG_DEBUG_RAW("Expiry Pager already disabled!");
return;
}
cfg->enabled = false;
ExecutorPool::get()->cancel(expiryPagerTask->getId());
}

void KVBucket::wakeUpExpiryPager() {
Expand Down
27 changes: 27 additions & 0 deletions engines/ep/tests/module_tests/kv_bucket_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,33 @@ TEST_F(KVBucketTest, DataRaceInDoWorkerStat) {
pool->cancel(task->getId());
}

TEST_F(KVBucketTest, ExpiryConfigChangeWakesTask) {
// schedule the expiry pager task.
store->enableExpiryPager();
// check that the task has a longer runtime to start with
ASSERT_GT(store->getExpiryPagerSleeptime(), 100);

// check the task has not run yet.
auto& epstats = engine->getEpStats();
ASSERT_EQ(0, epstats.expiryPagerRuns);

// try to change the config to get the task to run asap
store->setExpiryPagerSleeptime(0);

using namespace std::chrono;
using namespace std::chrono_literals;
auto deadline = steady_clock::now() + 5s;

while (epstats.expiryPagerRuns == 0 &&
std::chrono::steady_clock::now() < deadline) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}

// check that the task has run before our deadline - it wouldn't have
// if the config change did not wake the task through the pool.
EXPECT_GT(epstats.expiryPagerRuns, 0);
}

void KVBucketTest::storeAndDeleteItem(Vbid vbid,
const DocKey& key,
std::string value) {
Expand Down

0 comments on commit 1fbba81

Please sign in to comment.