Skip to content

Commit

Permalink
Merge pull request #7 from useshortcut/mark/sc-170835/fix-mcrouter-in…
Browse files Browse the repository at this point in the history
…-latest-ami-v2

Update Mcrouter to work with latest Linux Amazon 2
  • Loading branch information
virgofx authored Oct 1, 2021
2 parents a9db3dd + 61b3519 commit 4179985
Show file tree
Hide file tree
Showing 128 changed files with 2,350 additions and 1,127 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ name: build
on:
push:
branches:
- master
- main
- github_action
pull_request:
branches:
- master
- main

jobs:
build:
Expand Down
7 changes: 7 additions & 0 deletions mcrouter/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,25 @@ Makefile.in
/configure
/depcomp
/install-sh
/lib/carbon/gen-cpp2/
/lib/network/CaretSerializedMessage.h
/lib/network/McAsciiParser-gen.cpp
/lib/network/gen-cpp2/*
/lib/network/gen/gen-cpp2/
/libtool
/m4/libtool.m4
/m4/ltoptions.m4
/m4/ltsugar.m4
/m4/ltversion.m4
/m4/lt~obsolete.m4
/mcrouter_sr_deps.h
/missing
/RouterRegistry.h
/scripts/.*-done
/scripts/all
/ServerOnRequest.h
/stamp-h1
/ThriftAcceptor.h
/lib/gtest*/*
/tools/mcpiper/mcpiper
*.o
Expand Down
27 changes: 19 additions & 8 deletions mcrouter/CarbonRouterClient-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,14 @@ template <class RouterInfo>
template <class Request>
typename std::enable_if<
ListContains<typename RouterInfo::RoutableRequests, Request>::value,
uint64_t>::type
std::pair<uint64_t, uint64_t>>::type
CarbonRouterClient<RouterInfo>::findAffinitizedProxyIdx(
const Request& req) const {
assert(mode_ == ThreadMode::AffinitizedRemoteThread);

// Create a traverser
uint64_t hash = 0;
uint64_t hint = 0;
RouteHandleTraverser<typename RouterInfo::RouteHandleIf> t(
/* start */ nullptr,
/* end */ nullptr,
Expand All @@ -214,10 +215,14 @@ CarbonRouterClient<RouterInfo>::findAffinitizedProxyIdx(
}
return false;
},
[&hash](const HostInfoPtr& host, const RequestClass& requestClass) {
[&hash, &hint](
const HostWithShard& hostWithShard,
const RequestClass& requestClass) {
auto& host = hostWithShard.first;
if (!requestClass.is(RequestClass::kShadow) && host &&
host->location().getTWTaskID()) {
hash = *host->location().getTWTaskID();
hint = RoutingHintEncoder::encodeRoutingHint(hostWithShard);
return true;
}
return false;
Expand All @@ -229,18 +234,18 @@ CarbonRouterClient<RouterInfo>::findAffinitizedProxyIdx(
config.second.proxyRoute().traverse(req, t);
}

return hash % proxies_.size();
return {hash % proxies_.size(), hint};
}

template <class RouterInfo>
template <class Request>
typename std::enable_if<
!ListContains<typename RouterInfo::RoutableRequests, Request>::value,
uint64_t>::type
std::pair<uint64_t, uint64_t>>::type
CarbonRouterClient<RouterInfo>::findAffinitizedProxyIdx(
const Request& /* unused */) const {
assert(mode_ == ThreadMode::AffinitizedRemoteThread);
return 0;
return {0, 0};
}

template <class RouterInfo>
Expand All @@ -251,8 +256,8 @@ bool CarbonRouterClient<RouterInfo>::send(
F&& callback,
folly::StringPiece ipAddr) {
using IterReference = typename std::iterator_traits<InputIt>::reference;
using Request = typename std::decay<decltype(
detail::unwrapRequest(std::declval<IterReference>()))>::type;
using Request = typename std::decay<decltype(detail::unwrapRequest(
std::declval<IterReference>()))>::type;

auto makeNextPreq = [this, ipAddr, &callback, &begin](bool inBatch) {
auto proxyRequestContext = makeProxyRequestContext(
Expand Down Expand Up @@ -346,8 +351,10 @@ CarbonRouterClient<RouterInfo>::makeProxyRequestContext(
folly::StringPiece ipAddr,
bool inBatch) {
Proxy<RouterInfo>* proxy = proxies_[proxyIdx_];
uint64_t routingHint = 0;
if (mode_ == ThreadMode::AffinitizedRemoteThread) {
size_t idx = findAffinitizedProxyIdx(req);
auto [idx, hint] = findAffinitizedProxyIdx(req);
routingHint = hint;
proxy = proxies_[idx];
if (inBatch) {
proxiesToNotify_[idx] = true;
Expand All @@ -369,6 +376,10 @@ CarbonRouterClient<RouterInfo>::makeProxyRequestContext(
}
});

if (routingHint) {
proxyRequestContext->setRoutingHint(routingHint);
}

proxyRequestContext->setRequester(self_);
if (!ipAddr.empty()) {
proxyRequestContext->setUserIpAddress(ipAddr);
Expand Down
4 changes: 2 additions & 2 deletions mcrouter/CarbonRouterClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,13 @@ class CarbonRouterClient : public CarbonRouterClientBase {
template <class Request>
typename std::enable_if<
ListContains<typename RouterInfo::RoutableRequests, Request>::value,
uint64_t>::type
std::pair<uint64_t, uint64_t>>::type
findAffinitizedProxyIdx(const Request& req) const;

template <class Request>
typename std::enable_if<
!ListContains<typename RouterInfo::RoutableRequests, Request>::value,
uint64_t>::type
std::pair<uint64_t, uint64_t>>::type
findAffinitizedProxyIdx(const Request& req) const;

/**
Expand Down
106 changes: 105 additions & 1 deletion mcrouter/CarbonRouterInstance-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ CarbonRouterInstance<RouterInfo>::spinUp() {
bool configuringFromDisk = false;
{
std::lock_guard<std::mutex> lg(configReconfigLock_);

auto builder = createConfigBuilder();
if (builder.hasError()) {
std::string initialError = std::move(builder.error());
Expand Down Expand Up @@ -375,6 +374,21 @@ void CarbonRouterInstance<RouterInfo>::subscribeToConfigUpdate() {
{
std::lock_guard<std::mutex> lg(configReconfigLock_);

if (opts_.enable_partial_reconfigure) {
try {
if (reconfigurePartially()) {
configuredFromDisk_.store(false, std::memory_order_relaxed);
return;
}
} catch (const std::exception& e) {
MC_LOG_FAILURE(
opts(),
failure::Category::kInvalidConfig,
"Error on partial reconfiguring: {}",
e.what());
}
}

auto builder = createConfigBuilder();
if (builder) {
success = reconfigure(builder.value());
Expand Down Expand Up @@ -480,6 +494,93 @@ bool CarbonRouterInstance<RouterInfo>::reconfigure(
return result.hasValue();
}

template <class RouterInfo>
bool CarbonRouterInstance<RouterInfo>::reconfigurePartially() {
auto partialUpdates = configApi_->releasePartialUpdatesLocked();
VLOG(2) << "receive partial updates:" << partialUpdates.size();
if (partialUpdates.empty()) {
return false;
}
partialReconfigAttempt_.store(
partialReconfigAttempt_.load(std::memory_order_relaxed) +
partialUpdates.size(),
std::memory_order_relaxed);

// Use the first proxy's config, as it's same in all proxies.
// Also there is no contention for holding the read lock as write lock is
// only obtained by proxy_config_swap() from config thread (same thread
// invoking this function)
{
auto proxyConfig = getProxy(0)->getConfigLocked();
// Make sure partial config is allow for all updates
for (const auto& update : partialUpdates) {
if (!proxyConfig.second.allowPartialConfig(update.tierName)) {
VLOG(1) << folly::sformat(
"tier {} not allow partial reconfigure", update.tierName);
return false;
}
}
}

auto& partialConfigs = getProxy(0)->getConfigUnsafe()->getPartialConfigs();
for (size_t i = 0; i < opts_.num_proxies; i++) {
for (const auto& update : partialUpdates) {
auto& tierPartialConfigs = partialConfigs.at(update.tierName).second;
if (i == 0) {
VLOG(1) << folly::sformat(
"partial update: tier={}, oldApString={}, newApString={}, oldFailureDomain={}, newFailureDomain={}",
update.tierName,
update.oldApString,
update.newApString,
update.oldFailureDomain,
update.newFailureDomain);
}
for (const auto& [apAttr, poolList] : tierPartialConfigs) {
auto oldAp = createAccessPoint(
update.oldApString, update.oldFailureDomain, *this, *apAttr);
auto newAp =
std::const_pointer_cast<const AccessPoint>(createAccessPoint(
update.newApString, update.newFailureDomain, *this, *apAttr));
if (UNLIKELY(oldAp->getProtocol() != newAp->getProtocol())) {
return false;
}
auto replacedAp = getProxy(i)->replaceAP(*oldAp, newAp);
if (!replacedAp) {
VLOG(2) << folly::sformat(
"could not replace AP for tier={}, proxy={}, protocol={}",
update.tierName,
i,
mc_protocol_to_string(oldAp->getProtocol()));
return false;
}
auto proxyConfig = getProxy(i)->getConfigLocked();
for (const auto& pool : poolList) {
if (!proxyConfig.second.updateAccessPoints(pool, replacedAp, newAp)) {
VLOG(2) << folly::sformat(
"could not update AccessPoints for tier={}, pool={}, proxy={}, protocol={}",
update.tierName,
pool,
i,
mc_protocol_to_string(oldAp->getProtocol()));
return false;
}
}
}
}
}
int numUpdates = partialUpdates.size();
if (!configApi_->updatePartialConfigSource(std::move(partialUpdates))) {
return false;
}

VLOG_IF(0, !opts_.constantly_reload_configs)
<< "Partially reconfigured " << opts_.num_proxies << " proxies";
partialReconfigSuccess_.store(
partialReconfigSuccess_.load(std::memory_order_relaxed) + numUpdates,
std::memory_order_relaxed);
return true;
}

template <class RouterInfo>
folly::Expected<folly::Unit, std::string>
CarbonRouterInstance<RouterInfo>::configure(const ProxyConfigBuilder& builder) {
Expand Down Expand Up @@ -517,6 +618,9 @@ CarbonRouterInstance<RouterInfo>::createConfigBuilder() {
/* mark config attempt before, so that
successful config is always >= last config attempt. */
lastConfigAttempt_.store(time(nullptr), std::memory_order_relaxed);
configFullAttempt_.store(
configFullAttempt_.load(std::memory_order_relaxed) + 1,
std::memory_order_relaxed);
configApi_->trackConfigSources();
std::string config;
std::string path;
Expand Down
3 changes: 2 additions & 1 deletion mcrouter/CarbonRouterInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class CarbonRouterInstance
*/
void shutdown() noexcept;

ProxyBase* getProxyBase(size_t index) const override;
ProxyBase* getProxyBase(size_t index) const override final;

/**
* @return nullptr if index is >= opts.num_proxies,
Expand Down Expand Up @@ -232,6 +232,7 @@ class CarbonRouterInstance
NB file-based configuration is synchronous
but server-based configuration is asynchronous */
bool reconfigure(const ProxyConfigBuilder& builder);
bool reconfigurePartially();
/** Create the ProxyConfigBuilder used to reconfigure.
Returns error reason if constructor fails. **/
folly::Expected<ProxyConfigBuilder, std::string> createConfigBuilder();
Expand Down
15 changes: 15 additions & 0 deletions mcrouter/CarbonRouterInstanceBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ class CarbonRouterInstanceBase {
return configuredFromDisk_.load(std::memory_order_relaxed);
}

size_t partialReconfigAttempt() const {
return partialReconfigAttempt_.load(std::memory_order_relaxed);
}

size_t partialReconfigSuccess() const {
return partialReconfigSuccess_.load(std::memory_order_relaxed);
}

size_t configFullAttempt() const {
return configFullAttempt_.load(std::memory_order_relaxed);
}

bool isRxmitReconnectionDisabled() const {
return disableRxmitReconnection_;
}
Expand Down Expand Up @@ -243,6 +255,9 @@ class CarbonRouterInstanceBase {
std::atomic<time_t> lastConfigAttempt_{0};
std::atomic<size_t> configFailures_{0};
std::atomic<bool> configuredFromDisk_{false};
std::atomic<size_t> partialReconfigAttempt_{0};
std::atomic<size_t> partialReconfigSuccess_{0};
std::atomic<size_t> configFullAttempt_{0};

// Stores whether we should reconnect after hitting rxmit threshold
std::atomic<bool> disableRxmitReconnection_{false};
Expand Down
15 changes: 13 additions & 2 deletions mcrouter/ConfigApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ bool ConfigApi::readFromBackupFile(
return false;
}

LOG(INFO) << "Reading config source " << sourcePrefix << name
<< " from backup.";
VLOG(1) << "Reading config source " << sourcePrefix << name
<< " from backup.";
return folly::readFile(filePath.c_str(), contents);
}

Expand All @@ -543,6 +543,17 @@ bool ConfigApi::shouldReadFromBackupFiles() const {
return readFromBackupFiles_;
}

std::vector<ConfigApi::PartialUpdate> ConfigApi::releasePartialUpdatesLocked() {
return {};
}

bool ConfigApi::updatePartialConfigSource(
std::vector<ConfigApi::PartialUpdate> /*updates*/) {
return true;
}

void ConfigApi::addPartialUpdateForTest(PartialUpdate&) {}

} // namespace mcrouter
} // namespace memcache
} // namespace facebook
21 changes: 21 additions & 0 deletions mcrouter/ConfigApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class ConfigApi : public ConfigApiIf {
public:
typedef std::function<void()> Callback;
typedef CallbackPool<>::CallbackHandle CallbackHandle;
struct PartialUpdate;

static const char* const kFilePrefix;

Expand Down Expand Up @@ -108,11 +109,20 @@ class ConfigApi : public ConfigApiIf {
return true;
}

virtual std::string getFailureDomainStr(uint32_t /* unused */) const {
return "";
}

/**
* Stops observing for file changes
*/
virtual void stopObserving(pid_t pid) noexcept;

virtual std::vector<PartialUpdate> releasePartialUpdatesLocked();

virtual bool updatePartialConfigSource(std::vector<PartialUpdate> updates);
virtual void addPartialUpdateForTest(PartialUpdate& update);

~ConfigApi() override;

/**
Expand All @@ -128,6 +138,17 @@ class ConfigApi : public ConfigApiIf {
*/
void enableReadingFromBackupFiles();

struct PartialUpdate {
std::string tierName;
std::string oldApString;
std::string newApString;
uint32_t oldFailureDomain;
uint32_t newFailureDomain;
int64_t version;
std::string hostname;
int64_t serviceId;
};

protected:
const McrouterOptions& opts_;
CallbackPool<> callbacks_;
Expand Down
Loading

0 comments on commit 4179985

Please sign in to comment.