Skip to content

Commit

Permalink
CXXCBC-448: Expose hooks for fork() scenarious (#536)
Browse files Browse the repository at this point in the history
* CXXCBC-448: Expose hooks for fork() scenarious

In order to gracefully survive fork() syscall, the library should be
able to move itself into stable and predictable state. In particular
ASIO should not have any pending work, and all IO thread should be
stopped. After the fork() the parent process retains control over all
file descriptors, but the child have to reconnect them (ASIO does it in
its hooks), and also the SDK should restart all the IO workers.

At the moment, the only worker thread that the library controls is the
one that in charge of the transactions cleanup, but later once the
public API will hide ASIO context, it will become in charge of the
restarting IO threads.

In case of the wrappers, that use core API directly, they must be able
to stop all IO threads before fork(), wait for work completion, then
perform the syscall and restart IO threads and contexts in both child
and parent later. See example in test/test_integration_examples.cxx

* do not run tests for servers without collections support

* disable test for capella as it cannot load sample buckets
  • Loading branch information
avsej committed Mar 11, 2024
1 parent a3f0bcb commit 933e87e
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 15 deletions.
16 changes: 16 additions & 0 deletions core/impl/cluster.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ class cluster_impl : public std::enable_shared_from_this<cluster_impl>
});
}

void notify_fork(fork_event event)
{
if (transactions_) {
transactions_->notify_fork(event);
}
}

void close(core::utils::movable_function<void()> handler)
{
if (transactions_) {
Expand Down Expand Up @@ -412,6 +419,15 @@ cluster::connect(asio::io_context& io,
});
}

auto
cluster::notify_fork(fork_event event) -> void
{
if (!impl_) {
return;
}
return impl_->notify_fork(event);
}

auto
cluster::close() const -> void
{
Expand Down
10 changes: 7 additions & 3 deletions core/transactions.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

#include <couchbase/transactions.hxx>

#include "couchbase/transactions/transaction_options.hxx"
#include "couchbase/transactions/transaction_result.hxx"
#include "couchbase/transactions/transactions_config.hxx"
#include <couchbase/fork_event.hxx>
#include <couchbase/transactions/transaction_options.hxx>
#include <couchbase/transactions/transaction_result.hxx>
#include <couchbase/transactions/transactions_config.hxx>

#include "transactions/async_attempt_context.hxx"
#include "transactions/attempt_context.hxx"
#include "transactions/exceptions.hxx"
Expand Down Expand Up @@ -222,6 +224,8 @@ class transactions : public couchbase::transactions::transactions
ctx.rollback();
}

void notify_fork(fork_event event);

/**
* @brief Shut down the transactions object
*
Expand Down
2 changes: 2 additions & 0 deletions core/transactions/internal/transactions_cleanup.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ class transactions_cleanup
const atr_cleanup_stats force_cleanup_atr(const core::document_id& atr_id, std::vector<transactions_cleanup_attempt>& results);
const client_record_details get_active_clients(const couchbase::transactions::transaction_keyspace& keyspace, const std::string& uuid);
void remove_client_record_from_all_buckets(const std::string& uuid);
void start();
void stop();
void close();

private:
Expand Down
10 changes: 10 additions & 0 deletions core/transactions/transactions.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,16 @@ transactions::run(async_logic&& code, txn_complete_callback&& cb)
return run(config, std::move(code), std::move(cb));
}

void
transactions::notify_fork(fork_event event)
{
if (event == fork_event::prepare) {
cleanup_->stop();
} else {
cleanup_->start();
}
}

void
transactions::close()
{
Expand Down
36 changes: 24 additions & 12 deletions core/transactions/transactions_cleanup.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,8 @@ transactions_cleanup::transactions_cleanup(core::cluster cluster, const couchbas
: cluster_(std::move(cluster))
, config_(config)
, client_uuid_(uid_generator::next())
, running_(config.cleanup_config.cleanup_client_attempts || config.cleanup_config.cleanup_lost_attempts)
{
if (config.cleanup_config.cleanup_client_attempts) {
cleanup_thr_ = std::thread(std::bind(&transactions_cleanup::attempts_loop, this));
}
if (config_.metadata_collection) {
add_collection(
{ config_.metadata_collection->bucket, config_.metadata_collection->scope, config_.metadata_collection->collection });
}
for (auto& k : config_.cleanup_config.collections) {
add_collection(k);
}
start();
}

static std::uint64_t
Expand Down Expand Up @@ -554,7 +544,23 @@ transactions_cleanup::add_collection(couchbase::transactions::transaction_keyspa
}

void
transactions_cleanup::close()
transactions_cleanup::start()
{
running_ = config_.cleanup_config.cleanup_client_attempts || config_.cleanup_config.cleanup_lost_attempts;
if (config_.cleanup_config.cleanup_client_attempts) {
cleanup_thr_ = std::thread(std::bind(&transactions_cleanup::attempts_loop, this));
}
if (config_.metadata_collection) {
add_collection(
{ config_.metadata_collection->bucket, config_.metadata_collection->scope, config_.metadata_collection->collection });
}
for (const auto& k : config_.cleanup_config.collections) {
add_collection(k);
}
}

void
transactions_cleanup::stop()
{
{
std::unique_lock<std::mutex> lock(mutex_);
Expand All @@ -571,6 +577,12 @@ transactions_cleanup::close()
t.join();
}
}
}

void
transactions_cleanup::close()
{
stop();
CB_LOST_ATTEMPT_CLEANUP_LOG_DEBUG("all lost attempt cleanup threads closed");
remove_client_record_from_all_buckets(client_uuid_);
}
Expand Down
3 changes: 3 additions & 0 deletions couchbase/cluster.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <couchbase/bucket_manager.hxx>
#include <couchbase/cluster_options.hxx>
#include <couchbase/diagnostics_options.hxx>
#include <couchbase/fork_event.hxx>
#include <couchbase/ping_options.hxx>
#include <couchbase/query_index_manager.hxx>
#include <couchbase/query_options.hxx>
Expand Down Expand Up @@ -95,6 +96,8 @@ class cluster
auto operator=(const cluster& other) -> cluster& = default;
auto operator=(cluster&& other) -> cluster& = default;

void notify_fork(fork_event event);

void close() const;

/**
Expand Down
39 changes: 39 additions & 0 deletions couchbase/fork_event.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2020-Present Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

namespace couchbase
{
enum class fork_event {
/**
* Notify the cluster that the process is about to fork.
*/
prepare,

/**
* Notify the context that the process has forked and is the parent.
*/
parent,

/**
* Notify the context that the process has forked and is the child.
*/
child,
};

} // namespace couchbase
Loading

0 comments on commit 933e87e

Please sign in to comment.