diff --git a/.github/workflows/ci-verification.yml b/.github/workflows/ci-verification.yml index 13836fbdbe08..392095a2ac8a 100644 --- a/.github/workflows/ci-verification.yml +++ b/.github/workflows/ci-verification.yml @@ -233,3 +233,25 @@ jobs: name: tlc-trace-validation-consensus path: | tla/traces/* + + model-checking-self-healing-open: + name: Model Checking - Self-Healing Open + runs-on: [self-hosted, 1ES.Pool=gha-vmss-d16av5-ci] + container: + image: mcr.microsoft.com/azurelinux/base/core:3.0 + options: --user root --publish-all --cap-add NET_ADMIN --cap-add NET_RAW --cap-add SYS_PTRACE + + steps: + - name: "Checkout dependencies" + shell: bash + run: | + gpg --import /etc/pki/rpm-gpg/MICROSOFT-RPM-GPG-KEY + tdnf -y update + tdnf -y install ca-certificates git + + - uses: actions/checkout@v5 + - name: Install Stateright dependencies + run: | + tdnf install -y cargo + + - run: cd tla/disaster-recovery && cargo run check diff --git a/CMakeLists.txt b/CMakeLists.txt index c1f5cd20be64..c1ce8bd78873 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -380,6 +380,7 @@ endif() set(CCF_IMPL_SOURCE ${CCF_DIR}/src/enclave/main.cpp ${CCF_DIR}/src/enclave/thread_local.cpp ${CCF_DIR}/src/node/quote.cpp ${CCF_DIR}/src/node/uvm_endorsements.cpp + ${CCF_DIR}/src/node/self_healing_open_impl.cpp ) add_ccf_static_library( @@ -688,11 +689,20 @@ if(BUILD_TESTS) add_unit_test( frontend_test ${CMAKE_CURRENT_SOURCE_DIR}/src/node/rpc/test/frontend_test.cpp - ${CCF_DIR}/src/node/quote.cpp ${CCF_DIR}/src/node/uvm_endorsements.cpp + ${CCF_DIR}/src/node/quote.cpp + ${CCF_DIR}/src/node/uvm_endorsements.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/node/self_healing_open_impl.cpp ) target_link_libraries( - frontend_test PRIVATE ${CMAKE_THREAD_LIBS_INIT} http_parser ccf_js - ccf_endpoints ccfcrypto ccf_kv + frontend_test + PRIVATE ${CMAKE_THREAD_LIBS_INIT} + http_parser + ccf_js + ccf_endpoints + ccfcrypto + ccf_kv + uv + curl ) add_unit_test( @@ -718,11 +728,20 @@ if(BUILD_TESTS) add_unit_test( node_frontend_test ${CMAKE_CURRENT_SOURCE_DIR}/src/node/rpc/test/node_frontend_test.cpp - ${CCF_DIR}/src/node/quote.cpp ${CCF_DIR}/src/node/uvm_endorsements.cpp + ${CCF_DIR}/src/node/quote.cpp + ${CCF_DIR}/src/node/uvm_endorsements.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/node/self_healing_open_impl.cpp ) target_link_libraries( - node_frontend_test PRIVATE ${CMAKE_THREAD_LIBS_INIT} http_parser ccf_js - ccf_endpoints ccfcrypto ccf_kv + node_frontend_test + PRIVATE ${CMAKE_THREAD_LIBS_INIT} + http_parser + ccf_js + ccf_endpoints + ccfcrypto + ccf_kv + uv + curl ) add_unit_test( @@ -1190,15 +1209,16 @@ if(BUILD_TESTS) 10000 --use-jwt ) - add_test_bin( - curl_test ${CMAKE_CURRENT_SOURCE_DIR}/src/http/test/curl_test.cpp - ) - target_link_libraries(curl_test PRIVATE curl uv http_parser) if(LONG_TESTS) add_e2e_test( NAME e2e_curl PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_curl.py ) + + add_test_bin( + curl_test ${CMAKE_CURRENT_SOURCE_DIR}/src/http/test/curl_test.cpp + ) + target_link_libraries(curl_test PRIVATE curl uv http_parser) endif() endif() diff --git a/doc/audit/builtin_maps.rst b/doc/audit/builtin_maps.rst index 82bb7fdf3be7..0de6abf707cb 100644 --- a/doc/audit/builtin_maps.rst +++ b/doc/audit/builtin_maps.rst @@ -571,4 +571,51 @@ While the contents themselves are encrypted, the table is public so as to be acc **Value** The mechanism by which the ledger secret was recovered. .. doxygenenum:: ccf::RecoveryType - :project: CCF \ No newline at end of file + :project: CCF + +``self_healing_open.nodes`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Key** Intrinsic node ID: A string which is unique to a particular node role within a cluster. Currently its IP and port. + +**Value** + +.. doxygenstruct:: SelfHealingOpenNodeInfo + :project: CCF + :members: + +``self_healing_open.gossip`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Key** Intrinsic node ID of the source of the gossip message. + +**Value** + +.. doxygenstruct:: ccf::self_healing_open::GossipRequest + :project: CCF + :members: + +``self_healing_open.chosen_node`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Value** The intrinsic node ID of the chosen node. This will either be the node this node voted for, or the node that is has received an `IAmOpen` message from. + +``self_healing_open.votes`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Key** Intrinsic node ID of the node which has voted for this node to be opened. + +``selfhealingopen.sm_state`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Value** State machine state of the self-healing open protocol. + +``selfhealingopen.timeout_sm_state`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Value** Timeout state machine state of the self-healing open protocol. Ticks based on `failover_timeout` and advances `selfhealingopen.sm_state` if it falls behind. + +``selfhealingopen.failover_open`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Value** Boolean flag indicating whether the latest self-healing-open recovery opened using a failover timeout. \ No newline at end of file diff --git a/doc/host_config_schema/cchost_config.json b/doc/host_config_schema/cchost_config.json index 34b1bd4383e6..f686f3a79cc9 100644 --- a/doc/host_config_schema/cchost_config.json +++ b/doc/host_config_schema/cchost_config.json @@ -428,6 +428,28 @@ "previous_sealed_ledger_secret_location": { "type": ["string"], "description": "Path to the sealed ledger secret folder, the ledger secrets for the recovered service will be unsealed from here instead of reconstructed from recovery shares." + }, + "self_healing_open": { + "type": "object", + "properties": { + "addresses": { + "type": "array", + "items": { + "type": "string" + }, + "description": "List of addresses (host:port) of the cluster that should open via self-healing-open" + }, + "retry_timeout": { + "type": "string", + "default": "100ms", + "description": "Interval (time string) at which the node re-sends self-healing-open messages. This should be significantly less than 'failover_timeout'" + }, + "failover_timeout": { + "type": "string", + "default": "2000ms", + "description": "Interval (time string) after which the node forcibly advances to the next phase of the self-healing-open protocol" + } + } } }, "required": ["previous_service_identity_file"], diff --git a/doc/operations/recovery.rst b/doc/operations/recovery.rst index db257b724ebb..dacb54d72b98 100644 --- a/doc/operations/recovery.rst +++ b/doc/operations/recovery.rst @@ -145,6 +145,94 @@ Which of these two paths is taken is noted in the `public:ccf.internal.last_reco ... $ /opt/ccf/bin/js_generic --config /path/to/config/file +Self-Healing-Open recovery +-------------------------- + +In environments with limited orchestration or limited operator access, it is desirable to allow an automated disaster recovery without operator intervention. +At a high level, Self-Healing-Open recovery allows recovering replicas to discover which node has the most up-to-date ledger and automatically recover the network using that ledger. + +There are two paths, a election path, and a very-high-availablity failover path. +The election path ensures that if: all nodes restart and have full network connectivity, a majority of nodes' on-disk ledger contains every committed transaction, and no timeouts trigger; then there will be only one recovered network, then all committed transaction will be persisted. +However, the election path can become stuck, in which case the failover path is designed to ensure progress. + +In the election path, nodes first gossip with each other, learning of the ledgers of other nodes. +Once they have heard from every node they vote for the node with the best ledger. +If a node receives votes from a majority of nodes, it invokes `transition-to-open` and notifies the other nodes to restart and join it. +This path is illustrated below, and is guaranteed to succeed if all nodes can communicate and no timeouts trigger. + +.. mermaid:: + + sequenceDiagram + participant N1 + participant N2 + participant N3 + + Note over N1, N3: Gossip + + N1 ->> N2: Gossip(Tx=1) + N1 ->> N3: Gossip(Tx=1) + N2 ->> N3: Gossip(Tx=2) + N3 ->> N2: Gossip(Tx=3) + + Note over N1, N3: Vote + N2 ->> N3: Vote + N3 ->> N3: Vote + + Note over N1, N3: Open/Join + N3 ->> N1: IAmOpen + N3 ->> N2: IAmOpen + + Note over N1, N2: Restart + + Note over N3: Transition-to-open + + Note over N3: Local unsealing + + Note over N3: Open + + N1 ->> N3: Join + N2 ->> N3: Join + +In the failover path, each phase has a timeout to skip to the next phase if a failure has occurred. +For example, the election path requires all nodes to communicate to advance from the gossip phase to the vote phase. +However, if any node fails to recover, the election path is stuck. +In this case, after a timeout, nodes will advance to the vote phase regardless of whether they have heard from all nodes, and vote for the best ledger they have heard of at that point. + +Unfortunately, this can lead to multiple forks of the service if different nodes cannot communicate with each other and timeout. +Hence, we recommend setting the timeout substantially higher than the highest expected recovery time, to minimise the chance of this happening. +To audit if timeouts were used to open the service, the `public:ccf.gov.selfhealingopen.failover_open` table tracks this. + +This failover path is illustrated below. + +.. mermaid:: + + sequenceDiagram + participant N1 + participant N2 + participant N3 + + Note over N1, N3: Gossip + + N2 ->> N3: Gossip(Tx=2) + N3 ->> N2: Gossip(Tx=3) + + Note over N1: Timeout + Note over N3: Timeout + + Note over N1, N3: Vote + + N1 ->> N1: Vote + N3 ->> N3: Vote + N2 ->> N3: Vote + + Note over N1, N3: Open/Join + + Note over N1: Transition-to-open + Note over N3: Transition-to-open + + +If the network fails during reconfiguration, each node will use its latest known configuration to recover. Since reconfiguration requires votes from a majority of nodes, the latest configuration should recover using the election path, however nodes in the previous configuration may recover using the election path. + Notes ----- diff --git a/include/ccf/node/startup_config.h b/include/ccf/node/startup_config.h index bd2684459d85..48e725b005fb 100644 --- a/include/ccf/node/startup_config.h +++ b/include/ccf/node/startup_config.h @@ -102,6 +102,14 @@ namespace ccf Snapshots snapshots = {}; }; + struct SelfHealingOpenConfig + { + std::vector addresses; + ccf::ds::TimeString retry_timeout = {"100ms"}; + ccf::ds::TimeString failover_timeout = {"2000ms"}; + bool operator==(const SelfHealingOpenConfig&) const = default; + }; + struct StartupConfig : CCFConfig { StartupConfig() = default; @@ -146,6 +154,7 @@ namespace ccf std::nullopt; std::optional previous_sealed_ledger_secret_location = std::nullopt; + std::optional self_healing_open = std::nullopt; }; Recover recover = {}; }; diff --git a/include/ccf/service/tables/self_healing_open.h b/include/ccf/service/tables/self_healing_open.h new file mode 100644 index 000000000000..e7e112af9ccd --- /dev/null +++ b/include/ccf/service/tables/self_healing_open.h @@ -0,0 +1,76 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "ccf/ds/enum_formatter.h" +#include "ccf/ds/json.h" +#include "ccf/ds/quote_info.h" +#include "ccf/service/map.h" + +using IntrinsicIdentifier = std::string; + +struct SelfHealingOpenNodeInfo +{ + ccf::QuoteInfo quote_info; + std::string published_network_address; + std::vector cert_der; + std::string service_identity; + IntrinsicIdentifier intrinsic_id; +}; + +DECLARE_JSON_TYPE(SelfHealingOpenNodeInfo); +DECLARE_JSON_REQUIRED_FIELDS( + SelfHealingOpenNodeInfo, + quote_info, + published_network_address, + cert_der, + service_identity, + intrinsic_id); + +enum class SelfHealingOpenSM +{ + GOSSIPING = 0, + VOTING, + OPENING, // by chosen node + JOINING, // by all other replicas + OPEN, +}; + +DECLARE_JSON_ENUM( + SelfHealingOpenSM, + {{SelfHealingOpenSM::GOSSIPING, "Gossiping"}, + {SelfHealingOpenSM::VOTING, "Voting"}, + {SelfHealingOpenSM::OPENING, "Opening"}, + {SelfHealingOpenSM::JOINING, "Joining"}, + {SelfHealingOpenSM::OPEN, "Open"}}); + +namespace ccf +{ + using SelfHealingOpenNodeInfoMap = + ServiceMap; + using SelfHealingOpenGossips = + ServiceMap; + using SelfHealingOpenChosenNode = ServiceValue; + using SelfHealingOpenVotes = ServiceSet; + using SelfHealingOpenSMState = ServiceValue; + using SelfHealingOpenTimeoutSMState = ServiceValue; + using SelfHealingOpenFailoverFlag = ServiceValue; + + namespace Tables + { + static constexpr auto SELF_HEALING_OPEN_NODES = + "public:ccf.gov.selfhealingopen.nodes"; + static constexpr auto SELF_HEALING_OPEN_GOSSIPS = + "public:ccf.gov.selfhealingopen.gossip"; + static constexpr auto SELF_HEALING_OPEN_CHOSEN_NODE = + "public:ccf.gov.selfhealingopen.chosen_node"; + static constexpr auto SELF_HEALING_OPEN_VOTES = + "public:ccf.gov.selfhealingopen.votes"; + static constexpr auto SELF_HEALING_OPEN_SM_STATE = + "public:ccf.gov.selfhealingopen.sm_state"; + static constexpr auto SELF_HEALING_OPEN_TIMEOUT_SM_STATE = + "public:ccf.gov.selfhealingopen.timeout_sm_state"; + static constexpr auto SELF_HEALING_OPEN_FAILOVER_FLAG = + "public:ccf.gov.selfhealingopen.failover_open"; + } +} diff --git a/src/common/configuration.h b/src/common/configuration.h index bf65485bdf12..d303a3076fa0 100644 --- a/src/common/configuration.h +++ b/src/common/configuration.h @@ -113,6 +113,11 @@ namespace ccf node_to_node_message_limit, historical_cache_soft_limit); + DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(SelfHealingOpenConfig); + DECLARE_JSON_REQUIRED_FIELDS(SelfHealingOpenConfig, addresses); + DECLARE_JSON_OPTIONAL_FIELDS( + SelfHealingOpenConfig, retry_timeout, failover_timeout); + DECLARE_JSON_TYPE(StartupConfig::Start); DECLARE_JSON_REQUIRED_FIELDS( StartupConfig::Start, members, constitution, service_configuration); @@ -125,11 +130,13 @@ namespace ccf service_cert, follow_redirect); - DECLARE_JSON_TYPE(StartupConfig::Recover); + DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(StartupConfig::Recover); DECLARE_JSON_REQUIRED_FIELDS( + StartupConfig::Recover, previous_service_identity); + DECLARE_JSON_OPTIONAL_FIELDS( StartupConfig::Recover, - previous_service_identity, - previous_sealed_ledger_secret_location); + previous_sealed_ledger_secret_location, + self_healing_open); DECLARE_JSON_TYPE_WITH_BASE(StartupConfig, CCFConfig); DECLARE_JSON_REQUIRED_FIELDS( diff --git a/src/crypto/csr.h b/src/crypto/csr.h index 73cc2789f533..f9123196de82 100644 --- a/src/crypto/csr.h +++ b/src/crypto/csr.h @@ -13,7 +13,7 @@ namespace ccf::crypto * @param signing_request CSR to extract the public key from * @return extracted public key */ - Pem public_key_pem_from_csr(const Pem& signing_request) + inline Pem public_key_pem_from_csr(const Pem& signing_request) { X509* icrt = NULL; OpenSSL::Unique_BIO mem(signing_request); diff --git a/src/enclave/interface.h b/src/enclave/interface.h index 06404edac925..bc2f58fdc3b8 100644 --- a/src/enclave/interface.h +++ b/src/enclave/interface.h @@ -27,7 +27,10 @@ enum AdminMessage : ringbuffer::Message DEFINE_RINGBUFFER_MSG_TYPE(tick), /// Notify the host of work done since last message. Enclave -> Host - DEFINE_RINGBUFFER_MSG_TYPE(work_stats) + DEFINE_RINGBUFFER_MSG_TYPE(work_stats), + + /// Notify the host that it should restart + DEFINE_RINGBUFFER_MSG_TYPE(restart) }; DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(AdminMessage::fatal_error_msg, std::string); @@ -36,6 +39,7 @@ DECLARE_RINGBUFFER_MESSAGE_NO_PAYLOAD(AdminMessage::stop_notice); DECLARE_RINGBUFFER_MESSAGE_NO_PAYLOAD(AdminMessage::stopped); DECLARE_RINGBUFFER_MESSAGE_NO_PAYLOAD(AdminMessage::tick); DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(AdminMessage::work_stats, std::string); +DECLARE_RINGBUFFER_MESSAGE_NO_PAYLOAD(AdminMessage::restart); /// Messages sent from app endpoints enum AppMessage : ringbuffer::Message diff --git a/src/host/configuration.h b/src/host/configuration.h index 6454afd5f4ed..82eefa8f019b 100644 --- a/src/host/configuration.h +++ b/src/host/configuration.h @@ -120,6 +120,8 @@ namespace host std::string previous_service_identity_file; std::optional previous_sealed_ledger_secret_location = std::nullopt; + std::optional self_healing_open = + std::nullopt; bool operator==(const Recover&) const = default; }; Recover recover = {}; @@ -172,7 +174,8 @@ namespace host CCHostConfig::Command::Recover, initial_service_certificate_validity_days, previous_service_identity_file, - previous_sealed_ledger_secret_location); + previous_sealed_ledger_secret_location, + self_healing_open); DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command); DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command, type); diff --git a/src/host/handle_ring_buffer.h b/src/host/handle_ring_buffer.h index 8ffcc87804ca..bed88891b9aa 100644 --- a/src/host/handle_ring_buffer.h +++ b/src/host/handle_ring_buffer.h @@ -5,6 +5,7 @@ #include "../ds/files.h" #include "../enclave/interface.h" #include "ds/internal_logger.h" +#include "ds/non_blocking.h" #include "timer.h" #include @@ -53,6 +54,13 @@ namespace asynchost uv_stop(uv_default_loop()); LOG_INFO_FMT("Host stopped successfully"); }); + + DISPATCHER_SET_MESSAGE_HANDLER( + bp, AdminMessage::restart, [&](const uint8_t*, size_t) { + LOG_INFO_FMT("Received request to restart enclave, sending stops"); + auto to_enclave = nbwf.create_writer_to_inside(); + RINGBUFFER_WRITE_MESSAGE(AdminMessage::stop, to_enclave); + }); } void on_timer() diff --git a/src/host/run.cpp b/src/host/run.cpp index d72ed4cf6a3d..52812d74b7bd 100644 --- a/src/host/run.cpp +++ b/src/host/run.cpp @@ -834,6 +834,8 @@ namespace ccf startup_config.recover.previous_sealed_ledger_secret_location = config.command.recover.previous_sealed_ledger_secret_location; } + startup_config.recover.self_healing_open = + config.command.recover.self_healing_open; } else { diff --git a/src/http/curl.h b/src/http/curl.h index 8b0f6bc31b07..802e4c6cbf78 100644 --- a/src/http/curl.h +++ b/src/http/curl.h @@ -777,13 +777,16 @@ namespace ccf::curl } // Notify curl of the error + int running_handles = 0; CHECK_CURL_MULTI( curl_multi_socket_action, self->curl_request_curlm, socket_context->socket, CURL_CSELECT_ERR, - nullptr); + &running_handles); self->curl_request_curlm.perform(); + LOG_TRACE_FMT( + "Finished handling error on socket {}", socket_context->socket); return; } diff --git a/src/http/error_reporter.h b/src/http/error_reporter.h index b79a32c2390d..779e474a17ec 100644 --- a/src/http/error_reporter.h +++ b/src/http/error_reporter.h @@ -2,6 +2,8 @@ // Licensed under the Apache 2.0 License. #pragma once +#include "ccf/rpc_context.h" + namespace http { class ErrorReporter diff --git a/src/node/node_state.h b/src/node/node_state.h index 336d08695f04..c633d4ab439b 100644 --- a/src/node/node_state.h +++ b/src/node/node_state.h @@ -17,6 +17,7 @@ #include "ccf/service/node_info_network.h" #include "ccf/service/reconfiguration_type.h" #include "ccf/service/tables/acme_certificates.h" +#include "ccf/service/tables/self_healing_open.h" #include "ccf/service/tables/service.h" #include "ccf/tx.h" #include "ccf_acme_client.h" @@ -41,6 +42,7 @@ #include "node/ledger_secrets.h" #include "node/local_sealing.h" #include "node/node_to_node_channel_manager.h" +#include "node/self_healing_open_impl.h" #include "node/snapshotter.h" #include "node_to_node.h" #include "pal/quote_generation.h" @@ -78,7 +80,7 @@ namespace ccf ccf::crypto::Pem service_cert; }; - void reset_data(std::vector& data) + inline void reset_data(std::vector& data) { data.clear(); data.shrink_to_fit(); @@ -86,6 +88,8 @@ namespace ccf class NodeState : public AbstractNodeState { + friend class SelfHealingOpenSubsystem; + private: // // this node's core state @@ -233,6 +237,8 @@ namespace ccf last_recovered_signed_idx = last_recovered_idx; } + SelfHealingOpenSubsystem self_healing_open_impl; + public: NodeState( ringbuffer::AbstractWriterFactory& writer_factory, @@ -248,7 +254,8 @@ namespace ccf to_host(writer_factory.create_writer_to_outside()), network(network), rpcsessions(rpcsessions), - share_manager(network.ledger_secrets) + share_manager(network.ledger_secrets), + self_healing_open_impl(this) {} QuoteVerificationResult verify_quote( @@ -2999,5 +3006,10 @@ namespace ccf { return writer_factory; } + + SelfHealingOpenSubsystem& self_healing_open() override + { + return self_healing_open_impl; + } }; } diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index 98202b79b3db..237595a104aa 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -4,6 +4,7 @@ #include "ccf/common_auth_policies.h" #include "ccf/common_endpoint_registry.h" +#include "ccf/endpoint_context.h" #include "ccf/http_query.h" #include "ccf/js/core/context.h" #include "ccf/json_handler.h" @@ -12,22 +13,28 @@ #include "ccf/pal/attestation.h" #include "ccf/pal/mem.h" #include "ccf/service/reconfiguration_type.h" +#include "ccf/service/tables/self_healing_open.h" #include "ccf/version.h" #include "crypto/certs.h" #include "crypto/csr.h" #include "ds/files.h" +#include "ds/ring_buffer_types.h" #include "ds/std_formatters.h" #include "frontend.h" #include "node/network_state.h" #include "node/rpc/jwt_management.h" #include "node/rpc/no_create_tx_claims_digest.cpp" #include "node/rpc/serialization.h" +#include "node/self_healing_open_impl.h" #include "node/session_metrics.h" #include "node_interface.h" #include "service/internal_tables_access.h" #include "service/tables/previous_service_identity.h" #include "snapshots/filenames.h" +#include +#include + namespace ccf { struct Quote @@ -414,6 +421,116 @@ namespace ccf } } + template + using SelfHealingOpenHandler = std::function( + endpoints::EndpointContext& args, In& in)>; + + template + HandlerJsonParamsAndForward wrap_self_healing_open( + SelfHealingOpenHandler cb) + { + return [cb = std::move(cb), this]( + endpoints::EndpointContext& args, const nlohmann::json& params) { + auto config = this->context.get_subsystem(); + if ( + config == nullptr || + !config->get().node_config.recover.self_healing_open.has_value()) + { + return make_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidNodeState, + "Unable to get self-healing-open configuration"); + } + + auto in = params.get(); + self_healing_open::RequestNodeInfo info = in.info; + + // ---- Validate the quote and store the node info ---- + + auto cert_der = ccf::crypto::public_key_der_from_cert( + args.rpc_ctx->get_session_context()->caller_cert); + + pal::PlatformAttestationMeasurement measurement; + QuoteVerificationResult verify_result = + this->node_operation.verify_quote( + args.tx, info.quote_info, cert_der, measurement); + if (verify_result != QuoteVerificationResult::Verified) + { + const auto [code, message] = quote_verification_error(verify_result); + LOG_FAIL_FMT( + "Self-healing-open message from intrinsic id {} is invalid: {} " + "({})", + info.intrinsic_id, + code, + message); + return make_error(code, ccf::errors::InvalidQuote, message); + } + + LOG_TRACE_FMT( + "Self-healing-open message from intrinsic id {}'s quote is valid", + info.intrinsic_id); + + // Validating that we haven't heard from this node before, of if we have + // that the cert hasn't changed + auto* node_info_handle = args.tx.rw( + Tables::SELF_HEALING_OPEN_NODES); + auto existing_node_info = node_info_handle->get(info.intrinsic_id); + + if (existing_node_info.has_value()) + { + // If we have seen this node before, check that the cert is the same + if (existing_node_info->cert_der != cert_der) + { + auto message = fmt::format( + "Self-healing-open message from intrinsic id {} is invalid: " + "certificate has changed", + info.intrinsic_id); + LOG_FAIL_FMT("{}", message); + return make_error( + HTTP_STATUS_BAD_REQUEST, ccf::errors::NodeAlreadyExists, message); + } + } + else + { + SelfHealingOpenNodeInfo src_info{ + .quote_info = info.quote_info, + .published_network_address = info.published_network_address, + .cert_der = cert_der, + .service_identity = info.service_identity, + .intrinsic_id = info.intrinsic_id}; + node_info_handle->put(info.intrinsic_id, src_info); + } + + // ---- Run callback ---- + + auto ret = cb(args, in); + if (ret.has_value()) + { + jsonhandler::JsonAdapterResponse res = ret.value(); + return res; + } + + // ---- Advance state machine ---- + + try + { + this->node_operation.self_healing_open().advance(args.tx, false); + } + catch (const std::logic_error& e) + { + LOG_FAIL_FMT( + "Self-healing-open failed to advance state: {}", e.what()); + return make_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::InternalError, + fmt::format( + "Failed to advance self-healing-open state: {}", e.what())); + } + + return make_success(); + }; + } + public: NodeEndpoints(NetworkState& network_, ccf::AbstractNodeContext& context_) : CommonEndpointRegistry(get_actor_prefix(ActorsType::nodes), context_), @@ -1652,6 +1769,8 @@ namespace ccf ctx.rpc_ctx->set_claims_digest(std::move(digest_value)); } + this->node_operation.self_healing_open().try_start(ctx.tx, recovering); + LOG_INFO_FMT("Created service"); return make_success(true); }; @@ -2187,6 +2306,162 @@ namespace ccf .set_forwarding_required(endpoints::ForwardingRequired::Never) .set_openapi_hidden(true) .install(); + + auto self_healing_open_gossip = + [this]( + auto& args, + self_healing_open::GossipRequest in) -> std::optional { + LOG_TRACE_FMT( + "Self-healing-open: recieve gossip from {}", in.info.intrinsic_id); + + // Stop accepting gossips once a node has voted + auto chosen_replica = args.tx.template ro( + Tables::SELF_HEALING_OPEN_CHOSEN_NODE); + if (chosen_replica->get().has_value()) + { + return ErrorDetails{ + .status = HTTP_STATUS_INTERNAL_SERVER_ERROR, + .code = ccf::errors::InternalError, + .msg = fmt::format( + "This node has already voted for {}", + chosen_replica->get().value())}; + } + + auto gossip_handle = args.tx.template rw( + Tables::SELF_HEALING_OPEN_GOSSIPS); + if (gossip_handle->get(in.info.intrinsic_id).has_value()) + { + LOG_INFO_FMT( + "Node {} already gossiped, skipping", in.info.intrinsic_id); + return std::nullopt; + } + gossip_handle->put(in.info.intrinsic_id, in.txid); + return std::nullopt; + }; + make_endpoint( + "/self_healing_open/gossip", + HTTP_PUT, + json_adapter(wrap_self_healing_open( + self_healing_open_gossip)), + no_auth_required) + .set_forwarding_required(endpoints::ForwardingRequired::Never) + .set_openapi_hidden(true) + .install(); + + auto self_healing_open_vote = + [this](auto& args, self_healing_open::TaggedWithNodeInfo in) + -> std::optional { + LOG_TRACE_FMT( + "Self-healing-open: recieve vote from {}", in.info.intrinsic_id); + + args.tx + .template rw(Tables::SELF_HEALING_OPEN_VOTES) + ->insert(in.info.intrinsic_id); + + return std::nullopt; + }; + make_endpoint( + "/self_healing_open/vote", + HTTP_PUT, + json_adapter( + wrap_self_healing_open( + self_healing_open_vote)), + no_auth_required) + .set_forwarding_required(endpoints::ForwardingRequired::Never) + .set_openapi_hidden(true) + .install(); + + auto self_healing_open_iamopen = + [this](auto& args, self_healing_open::TaggedWithNodeInfo in) + -> std::optional { + LOG_TRACE_FMT( + "Self-healing-open: recieve IAmOpen from {}", in.info.intrinsic_id); + args.tx + .template rw( + Tables::SELF_HEALING_OPEN_SM_STATE) + ->put(SelfHealingOpenSM::JOINING); + args.tx + .template rw( + Tables::SELF_HEALING_OPEN_CHOSEN_NODE) + ->put(in.info.intrinsic_id); + return std::nullopt; + }; + make_endpoint( + "/self_healing_open/iamopen", + HTTP_PUT, + json_adapter( + wrap_self_healing_open( + self_healing_open_iamopen)), + no_auth_required) + .set_forwarding_required(endpoints::ForwardingRequired::Never) + .set_openapi_hidden(true) + .install(); + + auto self_healing_open_timeout = [this]( + auto& args, + const nlohmann::json& params) { + (void)params; + auto config = this->context.get_subsystem(); + if ( + config == nullptr || + !config->get().node_config.recover.self_healing_open.has_value()) + { + return make_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidNodeState, + "Unable to get self-healing-open configuration"); + } + + LOG_TRACE_FMT("Self-healing-open timeout received"); + + // Must ensure that the request originates from the primary + auto primary_id = consensus->primary(); + if (!primary_id.has_value()) + { + LOG_FAIL_FMT("self-healing-open timeout: primary unknown"); + return make_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::InternalError, + "Primary is unknown"); + } + const auto& sig_auth_ident = + args.template get_caller(); + if (primary_id.value() != sig_auth_ident.node_id) + { + LOG_FAIL_FMT( + "self-healing-open timeout: request does not originate from " + "primary"); + return make_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::InternalError, + "Request does not originate from primary."); + } + + try + { + this->node_operation.self_healing_open().advance(args.tx, true); + } + catch (const std::logic_error& e) + { + LOG_FAIL_FMT( + "Self-healing-open gossip failed to advance state: {}", e.what()); + return make_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::InternalError, + fmt::format( + "Failed to advance self-healing-open state: {}", e.what())); + } + return make_success("Self-healing-open timeout processed successfully"); + }; + + make_endpoint( + "/self_healing_open/timeout", + HTTP_PUT, + json_adapter(self_healing_open_timeout), + {std::make_shared()}) + .set_forwarding_required(endpoints::ForwardingRequired::Never) + .set_openapi_hidden(true) + .install(); } }; diff --git a/src/node/rpc/node_interface.h b/src/node/rpc/node_interface.h index becfa93fefbd..e56ea19fd2a4 100644 --- a/src/node/rpc/node_interface.h +++ b/src/node/rpc/node_interface.h @@ -15,6 +15,7 @@ #include "node/ledger_secret.h" #include "node/rpc/gov_effects_interface.h" #include "node/rpc/node_operation_interface.h" +#include "node/self_healing_open_impl.h" #include "node/session_metrics.h" namespace ccf @@ -64,6 +65,7 @@ namespace ccf virtual size_t get_jwt_attempts() = 0; virtual ccf::crypto::Pem get_self_signed_certificate() = 0; virtual const ccf::COSESignaturesConfig& get_cose_signatures_config() = 0; + virtual SelfHealingOpenSubsystem& self_healing_open() = 0; virtual const ccf::StartupConfig& get_node_config() const = 0; virtual ccf::crypto::Pem get_network_cert() = 0; virtual void stop_notice() = 0; diff --git a/src/node/rpc/node_operation.h b/src/node/rpc/node_operation.h index ccd11843dd61..35993a52f4b3 100644 --- a/src/node/rpc/node_operation.h +++ b/src/node/rpc/node_operation.h @@ -4,6 +4,7 @@ #include "node/rpc/node_interface.h" #include "node/rpc/node_operation_interface.h" +#include "node/self_healing_open_impl.h" namespace ccf { @@ -109,5 +110,10 @@ namespace ccf { return impl.get_cose_signatures_config(); } + + SelfHealingOpenSubsystem& self_healing_open() override + { + return impl.self_healing_open(); + } }; } \ No newline at end of file diff --git a/src/node/rpc/node_operation_interface.h b/src/node/rpc/node_operation_interface.h index 3667f5438918..72bdd87b30d8 100644 --- a/src/node/rpc/node_operation_interface.h +++ b/src/node/rpc/node_operation_interface.h @@ -10,6 +10,7 @@ #include "ccf/node_subsystem_interface.h" #include "ccf/service/tables/code_id.h" #include "ccf/tx.h" +#include "node/self_healing_open_impl.h" #include "node/session_metrics.h" namespace ccf @@ -60,5 +61,7 @@ namespace ccf virtual ccf::crypto::Pem get_self_signed_node_certificate() = 0; virtual const ccf::COSESignaturesConfig& get_cose_signatures_config() = 0; + + virtual SelfHealingOpenSubsystem& self_healing_open() = 0; }; } \ No newline at end of file diff --git a/src/node/rpc/test/node_stub.h b/src/node/rpc/test/node_stub.h index eae0d03fc54c..658ea06fe358 100644 --- a/src/node/rpc/test/node_stub.h +++ b/src/node/rpc/test/node_stub.h @@ -8,6 +8,7 @@ #include "node/rpc/gov_effects_interface.h" #include "node/rpc/node_interface.h" #include "node/rpc/node_operation_interface.h" +#include "node/self_healing_open_impl.h" namespace ccf { @@ -110,6 +111,11 @@ namespace ccf { return cose_signatures_config; } + + SelfHealingOpenSubsystem& self_healing_open() override + { + throw std::logic_error("Unimplemented"); + } }; class StubGovernanceEffects : public ccf::AbstractGovernanceEffects diff --git a/src/node/self_healing_open_impl.cpp b/src/node/self_healing_open_impl.cpp new file mode 100644 index 000000000000..900d740e5891 --- /dev/null +++ b/src/node/self_healing_open_impl.cpp @@ -0,0 +1,569 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. + +#include "node/self_healing_open_impl.h" + +#include "ccf/service/tables/nodes.h" +#include "ccf/service/tables/self_healing_open.h" +#include "ccf/tx.h" +#include "node_state.h" + +#include + +namespace ccf +{ + + SelfHealingOpenSubsystem::SelfHealingOpenSubsystem(NodeState* node_state_) : + node_state(node_state_) + {} + + void SelfHealingOpenSubsystem::try_start(ccf::kv::Tx& tx, bool recovering) + { + // Clear any previous state + tx.rw(Tables::SELF_HEALING_OPEN_SM_STATE)->clear(); + tx.rw( + Tables::SELF_HEALING_OPEN_TIMEOUT_SM_STATE) + ->clear(); + tx.rw(Tables::SELF_HEALING_OPEN_NODES)->clear(); + tx.rw(Tables::SELF_HEALING_OPEN_GOSSIPS)->clear(); + tx.rw(Tables::SELF_HEALING_OPEN_CHOSEN_NODE) + ->clear(); + tx.rw(Tables::SELF_HEALING_OPEN_VOTES)->clear(); + tx.rw(Tables::SELF_HEALING_OPEN_FAILOVER_FLAG) + ->clear(); + + auto& config = node_state->config.recover.self_healing_open; + if (!recovering || !config.has_value()) + { + LOG_INFO_FMT("Skipping self-healing-open"); + return; + } + + LOG_INFO_FMT("Starting self-healing-open"); + + tx.rw(Tables::SELF_HEALING_OPEN_SM_STATE) + ->put(SelfHealingOpenSM::GOSSIPING); + tx.rw( + Tables::SELF_HEALING_OPEN_TIMEOUT_SM_STATE) + ->put(SelfHealingOpenSM::GOSSIPING); + + start_message_retry_timers(); + start_failover_timers(); + } + + void SelfHealingOpenSubsystem::advance(ccf::kv::Tx& tx, bool timeout) + { + auto& config = node_state->config.recover.self_healing_open; + if (!config.has_value()) + { + throw std::logic_error("Self-healing-open not configured"); + } + + auto* sm_state_handle = + tx.rw(Tables::SELF_HEALING_OPEN_SM_STATE); + auto* timeout_state_handle = tx.rw( + Tables::SELF_HEALING_OPEN_TIMEOUT_SM_STATE); + + auto sm_state_opt = sm_state_handle->get(); + auto timeout_state_opt = timeout_state_handle->get(); + if ((!sm_state_opt.has_value()) || (!timeout_state_opt.has_value())) + { + throw std::logic_error( + "Self-healing-open state not set, cannot advance self-healing-open"); + } + auto& sm_state = sm_state_opt.value(); + auto& timeout_state = timeout_state_opt.value(); + + bool valid_timeout = timeout && sm_state == timeout_state; + + // Advance self-healing-open SM + switch (sm_state) + { + case SelfHealingOpenSM::GOSSIPING: + { + auto* gossip_handle = + tx.ro(Tables::SELF_HEALING_OPEN_GOSSIPS); + auto quorum_size = config->addresses.size(); + if (gossip_handle->size() >= quorum_size || valid_timeout) + { + if (gossip_handle->size() == 0) + { + throw std::logic_error("No gossip addresses provided yet"); + } + + // Lexographically maximum pair + std::optional> maximum; + gossip_handle->foreach([&maximum](const auto& iid, const auto& txid) { + if ( + !maximum.has_value() || + maximum.value() < std::make_pair(txid, iid)) + { + maximum = std::make_pair(txid, iid); + } + return true; + }); + + if (!maximum.has_value()) + { + throw std::logic_error("No valid gossip addresses provided"); + } + tx.rw( + Tables::SELF_HEALING_OPEN_CHOSEN_NODE) + ->put(maximum->second); + + sm_state_handle->put(SelfHealingOpenSM::VOTING); + } + break; + } + case SelfHealingOpenSM::VOTING: + { + auto* votes = + tx.rw(Tables::SELF_HEALING_OPEN_VOTES); + + auto sufficient_quorum = + votes->size() >= config->addresses.size() / 2 + 1; + if (sufficient_quorum || valid_timeout) + { + if (valid_timeout && !sufficient_quorum) + { + tx.rw( + Tables::SELF_HEALING_OPEN_FAILOVER_FLAG) + ->put(true); + } + if (votes->size() == 0) + { + throw std::logic_error( + "We didn't even vote for ourselves, so why should we open?"); + } + LOG_INFO_FMT("Self-healing-open succeeded, now opening network"); + + auto* service = tx.ro(Tables::SERVICE); + auto service_info = service->get(); + if (!service_info.has_value()) + { + throw std::logic_error( + "Service information cannot be found to transition service to " + "open"); + } + const auto prev_ident = + tx.ro(Tables::PREVIOUS_SERVICE_IDENTITY) + ->get(); + AbstractGovernanceEffects::ServiceIdentities identities{ + .previous = prev_ident, .next = service_info->cert}; + + sm_state_handle->put(SelfHealingOpenSM::OPENING); + + node_state->transition_service_to_open(tx, identities); + } + break; + } + case SelfHealingOpenSM::JOINING: + { + auto chosen_replica = tx.ro( + Tables::SELF_HEALING_OPEN_CHOSEN_NODE) + ->get(); + if (!chosen_replica.has_value()) + { + throw std::logic_error( + "Self-healing-open chosen node not set, cannot join"); + } + auto node_config = + tx.ro(Tables::SELF_HEALING_OPEN_NODES) + ->get(chosen_replica.value()); + if (!node_config.has_value()) + { + throw std::logic_error(fmt::format( + "Self-healing-open chosen node {} not found", + chosen_replica.value())); + } + + LOG_INFO_FMT( + "Self-healing-open joining {} with service identity {}", + node_config->published_network_address, + node_config->service_identity); + + RINGBUFFER_WRITE_MESSAGE(AdminMessage::restart, node_state->to_host); + } + case SelfHealingOpenSM::OPENING: + { + if (valid_timeout) + { + sm_state_handle->put(SelfHealingOpenSM::OPEN); + } + break; + } + case SelfHealingOpenSM::OPEN: + { + // Nothing to do here, we are already opening or open or joining + break; + } + default: + throw std::logic_error(fmt::format( + "Unknown self-healing-open state: {}", static_cast(sm_state))); + } + + // Advance timeout SM + if (timeout) + { + switch (timeout_state) + { + case SelfHealingOpenSM::GOSSIPING: + LOG_TRACE_FMT("Advancing timeout SM to VOTING"); + timeout_state_handle->put(SelfHealingOpenSM::VOTING); + break; + case SelfHealingOpenSM::VOTING: + LOG_TRACE_FMT("Advancing timeout SM to OPENING"); + timeout_state_handle->put(SelfHealingOpenSM::OPENING); + break; + case SelfHealingOpenSM::OPENING: + case SelfHealingOpenSM::JOINING: + case SelfHealingOpenSM::OPEN: + default: + LOG_TRACE_FMT("Timeout SM complete"); + } + } + } + + void SelfHealingOpenSubsystem::start_message_retry_timers() + { + LOG_TRACE_FMT("Self-healing-open: Setting up retry timers"); + auto retry_timer_msg = std::make_unique<::threading::Tmsg>( + [](std::unique_ptr<::threading::Tmsg> msg) { + auto& config = + msg->data.self.node_state->config.recover.self_healing_open; + if (!config.has_value()) + { + throw std::logic_error("Self-healing-open not configured"); + } + auto& node_state_ = msg->data.self.node_state; + std::lock_guard guard(node_state_->lock); + + auto tx = node_state_->network.tables->create_read_only_tx(); + auto* sm_state_handle = + tx.ro(Tables::SELF_HEALING_OPEN_SM_STATE); + + auto sm_state_opt = sm_state_handle->get(); + if (!sm_state_opt.has_value()) + { + throw std::logic_error( + "Self-healing-open state not set, cannot retry " + "self-healing-open"); + } + auto& sm_state = sm_state_opt.value(); + + // Keep doing this until the node is no longer in recovery + if (sm_state == SelfHealingOpenSM::OPEN) + { + LOG_INFO_FMT("Self-healing-open complete, stopping timers."); + return; + } + + switch (sm_state) + { + case SelfHealingOpenSM::GOSSIPING: + msg->data.self.send_gossip_unsafe(tx); + break; + case SelfHealingOpenSM::VOTING: + { + auto* node_info_handle = tx.ro( + Tables::SELF_HEALING_OPEN_NODES); + auto* chosen_replica_handle = tx.ro( + Tables::SELF_HEALING_OPEN_CHOSEN_NODE); + if (!chosen_replica_handle->get().has_value()) + { + throw std::logic_error( + "Self-healing-open chosen node not set, cannot vote"); + } + auto chosen_node_info = + node_info_handle->get(chosen_replica_handle->get().value()); + if (!chosen_node_info.has_value()) + { + throw std::logic_error(fmt::format( + "Self-healing-open chosen node {} not found", + chosen_replica_handle->get().value())); + } + msg->data.self.send_vote_unsafe(tx, chosen_node_info.value()); + // keep gossiping to allow lagging nodes to eventually vote + msg->data.self.send_gossip_unsafe(tx); + break; + } + case SelfHealingOpenSM::OPENING: + msg->data.self.send_iamopen_unsafe(tx); + break; + case SelfHealingOpenSM::JOINING: + return; + default: + throw std::logic_error(fmt::format( + "Unknown self-healing-open state: {}", + static_cast(sm_state))); + } + + auto delay = config->retry_timeout; + ::threading::ThreadMessaging::instance().add_task_after( + std::move(msg), delay); + }, + *this); + // kick this off asynchronously as this can be called from a curl callback + ::threading::ThreadMessaging::instance().add_task( + threading::get_current_thread_id(), std::move(retry_timer_msg)); + } + + void SelfHealingOpenSubsystem::start_failover_timers() + { + auto& config = node_state->config.recover.self_healing_open; + if (!config.has_value()) + { + throw std::logic_error("Self-healing-open not configured"); + } + + LOG_TRACE_FMT("Self-healing-open: Setting up failover timers"); + // Dispatch timeouts + auto timeout_msg = std::make_unique<::threading::Tmsg>( + [](std::unique_ptr<::threading::Tmsg> msg) { + auto& config = + msg->data.self.node_state->config.recover.self_healing_open; + if (!config.has_value()) + { + throw std::logic_error("Self-healing-open not configured"); + } + auto* node_state_ = msg->data.self.node_state; + std::lock_guard guard(node_state_->lock); + LOG_TRACE_FMT( + "Self-healing-open timeout, sending timeout to internal handlers"); + + // Stop the timer if the node has completed its self-healing-open + auto tx = node_state_->network.tables->create_read_only_tx(); + auto* sm_state_handle = + tx.ro(Tables::SELF_HEALING_OPEN_SM_STATE); + if (!sm_state_handle->get().has_value()) + { + throw std::logic_error( + "Self-healing-open state not set, cannot retry " + "self-healing-open"); + } + auto sm_state = sm_state_handle->get().value(); + if (sm_state == SelfHealingOpenSM::OPEN) + { + LOG_INFO_FMT("Self-healing-open complete, stopping timers."); + return; + } + + // Send a timeout to the internal handlers + curl::UniqueCURL curl_handle; + + auto cert = node_state_->self_signed_node_cert; + curl_handle.set_opt(CURLOPT_SSL_VERIFYHOST, 0L); + curl_handle.set_opt(CURLOPT_SSL_VERIFYPEER, 0L); + curl_handle.set_opt(CURLOPT_SSL_VERIFYSTATUS, 0L); + + curl_handle.set_blob_opt( + CURLOPT_SSLCERT_BLOB, cert.data(), cert.size()); + curl_handle.set_opt(CURLOPT_SSLCERTTYPE, "PEM"); + + auto privkey_pem = node_state_->node_sign_kp->private_key_pem(); + curl_handle.set_blob_opt( + CURLOPT_SSLKEY_BLOB, privkey_pem.data(), privkey_pem.size()); + curl_handle.set_opt(CURLOPT_SSLKEYTYPE, "PEM"); + + auto url = fmt::format( + "https://{}/{}/self_healing_open/timeout", + node_state_->config.network.rpc_interfaces.at("primary_rpc_interface") + .published_address, + get_actor_prefix(ActorsType::nodes)); + + curl::UniqueSlist headers; + headers.append("Content-Type: application/json"); + + auto curl_request = std::make_unique( + std::move(curl_handle), + HTTP_PUT, + std::move(url), + std::move(headers), + nullptr, + nullptr, + std::nullopt); + curl::CurlmLibuvContextSingleton::get_instance()->attach_request( + std::move(curl_request)); + + auto delay = config->failover_timeout; + ::threading::ThreadMessaging::instance().add_task_after( + std::move(msg), delay); + }, + *this); + ::threading::ThreadMessaging::instance().add_task_after( + std::move(timeout_msg), config->failover_timeout); + } + + void dispatch_authenticated_message( + nlohmann::json& request, + const std::string& target_address, + const std::string& endpoint, + const crypto::Pem& self_signed_node_cert, + const crypto::Pem& privkey_pem) + { + curl::UniqueCURL curl_handle; + + // diable SSL verification as no private information is sent + curl_handle.set_opt(CURLOPT_SSL_VERIFYHOST, 0L); + curl_handle.set_opt(CURLOPT_SSL_VERIFYPEER, 0L); + curl_handle.set_opt(CURLOPT_SSL_VERIFYSTATUS, 0L); + + curl_handle.set_blob_opt( + CURLOPT_SSLCERT_BLOB, + self_signed_node_cert.data(), + self_signed_node_cert.size()); + curl_handle.set_opt(CURLOPT_SSLCERTTYPE, "PEM"); + + curl_handle.set_blob_opt( + CURLOPT_SSLKEY_BLOB, privkey_pem.data(), privkey_pem.size()); + curl_handle.set_opt(CURLOPT_SSLKEYTYPE, "PEM"); + + auto url = fmt::format( + "https://{}/{}/self_healing_open/{}", + target_address, + get_actor_prefix(ActorsType::nodes), + endpoint); + + curl::UniqueSlist headers; + headers.append("Content-Type", "application/json"); + + auto body = std::make_unique(request); + + auto response_callback = []( + const ccf::curl::CurlRequest& request, + CURLcode curl_code, + long status_code) { + LOG_TRACE_FMT( + "Response received for {} to {}: curl_result {} ({}), status code {}", + request.get_method().c_str(), + request.get_url(), + curl_easy_strerror(curl_code), + curl_code, + status_code); + }; + + auto curl_request = std::make_unique( + std::move(curl_handle), + HTTP_PUT, + std::move(url), + std::move(headers), + std::move(body), + nullptr, + std::move(response_callback)); + + LOG_TRACE_FMT( + "Dispatching attested {} message to {}", + curl_request->get_method().c_str(), + curl_request->get_url()); + + curl::CurlmLibuvContextSingleton::get_instance()->attach_request( + std::move(curl_request)); + } + + self_healing_open::RequestNodeInfo SelfHealingOpenSubsystem::make_node_info( + kv::ReadOnlyTx& tx) + { + auto* nodes_handle = tx.ro(Tables::NODES); + auto node_info_opt = nodes_handle->get(node_state->get_node_id()); + if (!node_info_opt.has_value()) + { + throw std::logic_error(fmt::format( + "Node {} not found in nodes table", node_state->get_node_id())); + } + return { + .quote_info = node_info_opt->quote_info, + .published_network_address = + node_state->config.network.rpc_interfaces.at("primary_rpc_interface") + .published_address, + .intrinsic_id = + node_state->config.network.rpc_interfaces.at("primary_rpc_interface") + .published_address, + .service_identity = node_state->network.identity->cert.str(), + }; + } + + void SelfHealingOpenSubsystem::send_gossip_unsafe(kv::ReadOnlyTx& tx) + { + auto& config = node_state->config.recover.self_healing_open; + if (!config.has_value()) + { + throw std::logic_error("Self-healing-open not configured"); + } + + LOG_TRACE_FMT("Broadcasting self-healing-open gossip"); + + self_healing_open::GossipRequest request; + request.info = make_node_info(tx); + request.txid = node_state->last_recovered_signed_idx; + nlohmann::json request_json = request; + + for (auto& target_address : config->addresses) + { + dispatch_authenticated_message( + request_json, + target_address, + "gossip", + node_state->self_signed_node_cert, + node_state->node_sign_kp->private_key_pem()); + } + } + + void SelfHealingOpenSubsystem::send_vote_unsafe( + kv::ReadOnlyTx& tx, const SelfHealingOpenNodeInfo& node_info) + { + auto& config = node_state->config.recover.self_healing_open; + if (!config.has_value()) + { + throw std::logic_error("Self-healing-open not configured"); + } + + LOG_TRACE_FMT( + "Sending self-healing-open vote to {} at {}", + node_info.intrinsic_id, + node_info.published_network_address); + + self_healing_open::TaggedWithNodeInfo request{.info = make_node_info(tx)}; + + nlohmann::json request_json = request; + + dispatch_authenticated_message( + request_json, + node_info.published_network_address, + "vote", + node_state->self_signed_node_cert, + node_state->node_sign_kp->private_key_pem()); + } + + void SelfHealingOpenSubsystem::send_iamopen_unsafe(ccf::kv::ReadOnlyTx& tx) + { + auto& config = node_state->config.recover.self_healing_open; + if (!config.has_value()) + { + throw std::logic_error("Self-healing-open not configured"); + } + + LOG_TRACE_FMT("Sending self-healing-open iamopen"); + + self_healing_open::TaggedWithNodeInfo request{.info = make_node_info(tx)}; + nlohmann::json request_json = request; + + for (auto& target_address : config->addresses) + { + if ( + target_address == + node_state->config.network.rpc_interfaces.at("primary_rpc_interface") + .published_address) + { + // Don't send to self + continue; + } + dispatch_authenticated_message( + request_json, + target_address, + "iamopen", + node_state->self_signed_node_cert, + node_state->node_sign_kp->private_key_pem()); + } + } + +} \ No newline at end of file diff --git a/src/node/self_healing_open_impl.h b/src/node/self_healing_open_impl.h new file mode 100644 index 000000000000..f7bf6c6f3666 --- /dev/null +++ b/src/node/self_healing_open_impl.h @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "ccf/ds/json.h" +#include "ccf/node/startup_config.h" +#include "ccf/service/tables/self_healing_open.h" +#include "ccf/tx.h" + +namespace ccf::self_healing_open +{ + struct RequestNodeInfo + { + QuoteInfo quote_info; + std::string published_network_address; + std::string intrinsic_id; + std::string service_identity; + }; + DECLARE_JSON_TYPE(RequestNodeInfo); + DECLARE_JSON_REQUIRED_FIELDS( + RequestNodeInfo, + quote_info, + published_network_address, + intrinsic_id, + service_identity); + + struct TaggedWithNodeInfo + { + public: + RequestNodeInfo info; + }; + DECLARE_JSON_TYPE(TaggedWithNodeInfo); + DECLARE_JSON_REQUIRED_FIELDS(TaggedWithNodeInfo, info); + + struct GossipRequest : public TaggedWithNodeInfo + { + ccf::kv::Version txid{}; + }; + DECLARE_JSON_TYPE_WITH_BASE(GossipRequest, TaggedWithNodeInfo); + DECLARE_JSON_REQUIRED_FIELDS(GossipRequest, txid); +} + +namespace ccf +{ + class NodeState; + class SelfHealingOpenSubsystem + { + private: + // SelfHealingOpenService is solely owned by NodeState, and all tasks should + // finish before NodeState is destroyed + NodeState* node_state; + + public: + SelfHealingOpenSubsystem(NodeState* node_state); + void try_start(ccf::kv::Tx& tx, bool recovering); + void advance(ccf::kv::Tx& tx, bool timeout); + + private: + struct SHOMsg + { + SHOMsg(SelfHealingOpenSubsystem& self_) : self(self_) {} + SelfHealingOpenSubsystem& self; + }; + + // Start path + void start_message_retry_timers(); + void start_failover_timers(); + + // Steady state operations + self_healing_open::RequestNodeInfo make_node_info(kv::ReadOnlyTx&); + void send_gossip_unsafe(kv::ReadOnlyTx&); + void send_vote_unsafe(kv::ReadOnlyTx&, const SelfHealingOpenNodeInfo&); + void send_iamopen_unsafe(kv::ReadOnlyTx&); + }; +} \ No newline at end of file diff --git a/src/service/internal_tables_access.h b/src/service/internal_tables_access.h index 24128ec1040f..09ff344b2ae4 100644 --- a/src/service/internal_tables_access.h +++ b/src/service/internal_tables_access.h @@ -30,12 +30,12 @@ namespace ccf { /* We can't query the past epochs' TXs if the service hasn't been opened * yet. We do guess values based on epoch value and seqno changing rules. */ - ccf::TxID previous_tx_if_recovery(ccf::TxID txid) + inline ccf::TxID previous_tx_if_recovery(ccf::TxID txid) { return ccf::TxID{ .view = txid.view - aft::starting_view_change, .seqno = txid.seqno - 1}; } - ccf::TxID next_tx_if_recovery(ccf::TxID txid) + inline ccf::TxID next_tx_if_recovery(ccf::TxID txid) { return ccf::TxID{ .view = txid.view + aft::starting_view_change, .seqno = txid.seqno + 1}; diff --git a/src/service/network_tables.h b/src/service/network_tables.h index a52cd5dc2c72..2f94a2b21db1 100644 --- a/src/service/network_tables.h +++ b/src/service/network_tables.h @@ -16,6 +16,7 @@ #include "ccf/service/tables/modules.h" #include "ccf/service/tables/nodes.h" #include "ccf/service/tables/proposals.h" +#include "ccf/service/tables/self_healing_open.h" #include "ccf/service/tables/service.h" #include "ccf/service/tables/snp_measurements.h" #include "ccf/service/tables/tcb_verification.h" diff --git a/tests/config.jinja b/tests/config.jinja index de1dabef5a53..84b92b500d75 100644 --- a/tests/config.jinja +++ b/tests/config.jinja @@ -52,10 +52,15 @@ }, "recover": { "initial_service_certificate_validity_days": {{ initial_service_cert_validity_days }}, - {% if previous_sealed_ledger_secret_location %} - "previous_sealed_ledger_secret_location": "{{ previous_sealed_ledger_secret_location }}", - {% endif %} - "previous_service_identity_file": "{{ previous_service_identity_file }}" + "previous_service_identity_file": "{{ previous_service_identity_file }}" {% if previous_sealed_ledger_secret_location %}, + "previous_sealed_ledger_secret_location": "{{ previous_sealed_ledger_secret_location }}"{% endif %} {% if self_healing_open_addresses %}, + "self_healing_open": { + "addresses" : [ + {% for address in self_healing_open_addresses %} + "{{ address }}" {% if not loop.last %},{% endif %} + {% endfor %} + ] + } {% endif %} } }, "ledger": diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index b18920161130..7760baf753ff 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -1097,7 +1097,7 @@ def run_initial_uvm_descriptor_checks(args): with recovered_primary.client() as c: r = c.get("/node/network").body.json() recovery_seqno = int(r["current_service_create_txid"].split(".")[1]) - network.stop_all_nodes() + recovered_network.stop_all_nodes() ledger = ccf.ledger.Ledger( recovered_primary.remote.ledger_paths(), committed_only=False, @@ -1172,7 +1172,7 @@ def run_initial_tcb_version_checks(args): with recovered_primary.client() as c: r = c.get("/node/network").body.json() recovery_seqno = int(r["current_service_create_txid"].split(".")[1]) - network.stop_all_nodes() + recovered_network.stop_all_nodes() ledger = ccf.ledger.Ledger( recovered_primary.remote.ledger_paths(), committed_only=False, @@ -1466,6 +1466,218 @@ def run(self, src_dir, dst_dir): prev_network = recovery_network +def run_self_healing_open(const_args): + args = copy.deepcopy(const_args) + args.nodes = infra.e2e_args.min_nodes(args, f=1) + with infra.network.network( + args.nodes, + args.binary_dir, + args.debug_nodes, + ) as network: + LOG.info("Start a network and stop it") + network.start_and_open(args) + network.save_service_identity(args) + network.stop_all_nodes() + + recovery_args = copy.deepcopy(args) + + ledger_dirs = {} + committed_ledger_dirs = {} + for i, node in enumerate(network.nodes): + l_dir, c = node.get_ledger() + ledger_dirs[i] = l_dir + committed_ledger_dirs[i] = c + + LOG.info("Start a recovery network and stop it") + recovered_network = infra.network.Network( + recovery_args.nodes, + recovery_args.binary_dir, + recovery_args.debug_nodes, + existing_network=network, + ) + recovered_network.start_in_self_healing_open( + recovery_args, + ledger_dirs=ledger_dirs, + committed_ledger_dirs=committed_ledger_dirs, + ) + + # Wait until all relevant nodes have restarted + time.sleep(3) + + # Refresh the the declared state of nodes which have shut themselves down to join. + for node in recovered_network.nodes: + node.refresh_network_state(verify_ca=False) + + recovered_network.refresh_service_identity_file(recovery_args) + + recovered_network.consortium.recover_with_shares( + recovered_network.find_random_node() + ) + + LOG.info("Submitted recovery shares") + + # Wait for all live replicas to report being part of the opened network + successfully_opened = 0 + for node in recovered_network.get_joined_nodes(): + try: + recovered_network.wait_for_status( + node, + "Open", + timeout=10, + ) + recovered_network._wait_for_app_open(node) + successfully_opened += 1 + except TimeoutError: + pass + + assert successfully_opened == 1 + + LOG.info("Completed self-healing open successfully") + + recovered_network.stop_all_nodes(accept_ledger_diff=True) + + +def run_self_healing_open_timeout_path(const_args): + args = copy.deepcopy(const_args) + args.nodes = infra.e2e_args.min_nodes(args, f=1) + with infra.network.network( + args.nodes, + args.binary_dir, + args.debug_nodes, + ) as network: + LOG.info("Start a network and stop it") + network.start_and_open(args) + network.save_service_identity(args) + network.stop_all_nodes() + + recovery_args = copy.deepcopy(args) + + ledger_dirs = {} + committed_ledger_dirs = {} + for i, node in enumerate(network.nodes): + l_dir, c = node.get_ledger() + ledger_dirs[i] = l_dir + committed_ledger_dirs[i] = c + + LOG.info("Start a recovery network and stop it") + recovered_network = infra.network.Network( + recovery_args.nodes, + recovery_args.binary_dir, + recovery_args.debug_nodes, + existing_network=network, + ) + recovered_network.start_in_self_healing_open( + recovery_args, + ledger_dirs=ledger_dirs, + committed_ledger_dirs=committed_ledger_dirs, + starting_nodes=1, # Force timeout path by starting only one node + ) + + # Wait until all relevant nodes have restarted + time.sleep(3) + + # Refresh the the declared state of nodes which have shut themselves down to join. + for node in recovered_network.nodes: + node.refresh_network_state(verify_ca=False) + + recovered_network.refresh_service_identity_file(recovery_args) + + recovered_network.consortium.recover_with_shares( + recovered_network.find_random_node() + ) + + LOG.info("Submitted recovery shares") + + # Wait for all live replicas to report being part of the opened network + successfully_opened = 0 + for node in recovered_network.get_joined_nodes(): + try: + recovered_network.wait_for_status( + node, + "Open", + timeout=10, + ) + recovered_network._wait_for_app_open(node) + successfully_opened += 1 + except TimeoutError: + pass + + assert successfully_opened == 1 + + LOG.info("Completed self-healing open successfully") + + recovered_network.stop_all_nodes() + + +def run_self_healing_open_local_unsealing(const_args): + args = copy.deepcopy(const_args) + args.nodes = infra.e2e_args.min_nodes(args, f=1) + args.enable_local_sealing = True + + with infra.network.network( + args.nodes, + args.binary_dir, + args.debug_nodes, + ) as network: + LOG.info("Start a network and stop it") + network.start_and_open(args) + network.save_service_identity(args) + node_secrets = [node.save_sealed_ledger_secret() for node in network.nodes] + network.stop_all_nodes() + + recovery_args = copy.deepcopy(args) + + ledger_dirs = {} + committed_ledger_dirs = {} + for i, node in enumerate(network.nodes): + l_dir, c = node.get_ledger() + ledger_dirs[i] = l_dir + committed_ledger_dirs[i] = c + + LOG.info("Start a recovery network") + recovered_network = infra.network.Network( + recovery_args.nodes, + recovery_args.binary_dir, + recovery_args.debug_nodes, + existing_network=network, + ) + recovered_network.start_in_self_healing_open( + recovery_args, + ledger_dirs=ledger_dirs, + committed_ledger_dirs=committed_ledger_dirs, + sealed_ledger_secrets=node_secrets, + ) + + # Wait until all relevant nodes have restarted + time.sleep(3) + + # Refresh the the declared state of nodes which have shut themselves down to join. + for node in recovered_network.nodes: + node.refresh_network_state(verify_ca=False) + + recovered_network.refresh_service_identity_file(recovery_args) + + # Wait for all live replicas to report being part of the opened network + successfully_opened = 0 + for node in recovered_network.get_joined_nodes(): + try: + recovered_network.wait_for_status( + node, + "Open", + timeout=10, + ) + recovered_network._wait_for_app_open(node) + successfully_opened += 1 + except TimeoutError: + pass + + assert successfully_opened == 1 + + LOG.info("Completed self-healing open successfully") + + recovered_network.stop_all_nodes() + + def run_read_ledger_on_testdata(args): for testdata_dir in os.scandir(args.historical_testdata): assert testdata_dir.is_dir() @@ -1835,5 +2047,8 @@ def run(args): run_recovery_unsealing_corrupt(args) run_recovery_unsealing_validate_audit(args) test_error_message_on_failure_to_read_aci_sec_context(args) + run_self_healing_open_local_unsealing(args) run_read_ledger_on_testdata(args) run_ledger_chunk_bytes_check(args) + run_self_healing_open(args) + run_self_healing_open_timeout_path(args) diff --git a/tests/infra/clients.py b/tests/infra/clients.py index 8ef645061a74..ccbf267c5f7b 100644 --- a/tests/infra/clients.py +++ b/tests/infra/clients.py @@ -486,7 +486,10 @@ def __init__( assert signing_auth is None, signing_auth self.cose_signing_auth = cose_signing_auth self.common_headers = common_headers or {} - self.ca_curve = get_curve(self.ca) + if self.ca: + self.ca_curve = get_curve(self.ca) + else: + self.ca_curve = None self.protocol = kwargs.get("protocol") if "protocol" in kwargs else "https" self.extra_args = [] if kwargs.get("http2"): @@ -579,6 +582,8 @@ def request( if self.session_auth: cmd.extend(["--key", self.session_auth.key]) cmd.extend(["--cert", self.session_auth.cert]) + if not self.ca and not self.session_auth: + cmd.extend(["-k"]) # Allow insecure connections for arg in self.extra_args: cmd.append(arg) @@ -600,9 +605,15 @@ def request( if rc.returncode != 0: if rc.returncode in [ + # COULDNT_CONNECT, + 7, + # PEER_FAILED_VERIFICATION, 35, + # SEND_ERROR, + 55, + # SSL_CONNECT_ERROR 60, - ]: # PEER_FAILED_VERIFICATION, SSL_CONNECT_ERROR + ]: raise CCFConnectionException if rc.returncode == 28: # OPERATION_TIMEDOUT raise TimeoutError diff --git a/tests/infra/network.py b/tests/infra/network.py index 59b751b03a3a..ca5a0ccefac2 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -5,7 +5,7 @@ from contextlib import contextmanager from enum import Enum, IntEnum, auto -from infra.clients import flush_info +from infra.clients import flush_info, CCFConnectionException, CCFIOException import infra.member import infra.path import infra.proc @@ -442,6 +442,7 @@ def _start_all_nodes( self, args, recovery=False, + self_healing_open=False, ledger_dir=None, read_only_ledger_dirs=None, snapshots_dir=None, @@ -463,12 +464,16 @@ def _start_all_nodes( for arg in infra.network.Network.node_args_to_forward } + self_healing_open_addresses = [ + node.get_public_rpc_address() for node in self.nodes + ] + for i, node in enumerate(self.nodes): forwarded_args_with_overrides = forwarded_args.copy() forwarded_args_with_overrides.update(self.per_node_args_override.get(i, {})) try: - if i == 0: - if not recovery: + if i == 0 or self_healing_open: + if not (recovery or self_healing_open): node.start( lib_name=args.package, workspace=args.workspace, @@ -480,17 +485,26 @@ def _start_all_nodes( **kwargs, ) else: - node.recover( - lib_name=args.package, - workspace=args.workspace, - label=args.label, - common_dir=self.common_dir, - ledger_dir=ledger_dir, - read_only_ledger_dirs=read_only_ledger_dirs, - snapshots_dir=snapshots_dir, - **forwarded_args_with_overrides, - **kwargs, + node_kwargs = { + "lib_name": args.package, + "workspace": args.workspace, + "label": args.label, + "common_dir": self.common_dir, + "ledger_dir": ledger_dir, + "read_only_ledger_dirs": read_only_ledger_dirs, + "snapshots_dir": snapshots_dir, + } + self_healing_open_kwargs = { + "self_healing_open_addresses": self_healing_open_addresses + } + # If a kwarg is passed in override automatically set variants + node_kwargs = ( + node_kwargs + | self_healing_open_kwargs + | forwarded_args_with_overrides + | kwargs ) + node.recover(**node_kwargs) self.wait_for_state( node, infra.node.State.PART_OF_PUBLIC_NETWORK.value, @@ -763,6 +777,133 @@ def start_in_recovery( self.wait_for_all_nodes_to_commit(primary=primary, timeout=20) LOG.success("All nodes joined public network") + def start_in_self_healing_open( + self, + args, + ledger_dirs, + committed_ledger_dirs=None, + snapshot_dirs=None, + common_dir=None, + set_authenticate_session=None, + starting_nodes=None, + timeout=10, + sealed_ledger_secrets=None, + **kwargs, + ): + self.common_dir = common_dir or get_common_folder_name( + args.workspace, args.label + ) + + self.per_node_args_override = self.per_node_args_override or { + i: {} for i in range(len(self.nodes)) + } + committed_ledger_dirs = committed_ledger_dirs or { + i: None for i in range(len(self.nodes)) + } + snapshot_dirs = snapshot_dirs or {i: None for i in range(len(self.nodes))} + + # separate out all starting nodes' directories such that they recover independently + self.per_node_args_override = { + i: ( + d + | { + "ledger_dir": ledger_dirs[i], + "read_only_ledger_dirs": committed_ledger_dirs[i] or [], + "snapshots_dir": snapshot_dirs[i] or None, + } + | ( + {"previous_sealed_ledger_secret_location": sealed_ledger_secrets[i]} + if sealed_ledger_secrets and i < len(sealed_ledger_secrets) + else {} + ) + ) + for i, d in self.per_node_args_override.items() + } + + # Fix the port numbers to make all nodes _well known_ + for i, node in enumerate(self.nodes): + port = 1000 + random.randint(0, 64534) + node.host.get_primary_interface().port = port + node.host.get_primary_interface().public_port = port + + LOG.info("Set up nodes") + for node in self.nodes: + LOG.info(node.host) + + self.status = ServiceStatus.RECOVERING + LOG.debug(f"Opening CCF service on {self.hosts}") + + forwarded_args = { + arg: getattr(args, arg, None) + for arg in infra.network.Network.node_args_to_forward + } + self_healing_open_addresses = [ + node.get_public_rpc_address() for node in self.nodes + ] + + for i, node in enumerate(self.nodes): + if starting_nodes is not None and i > starting_nodes: + break + + forwarded_args_with_overrides = forwarded_args.copy() + forwarded_args_with_overrides.update(self.per_node_args_override.get(i, {})) + try: + node_kwargs = { + "lib_name": args.package, + "workspace": args.workspace, + "label": args.label, + "common_dir": self.common_dir, + } + self_healing_open_kwargs = { + "self_healing_open_addresses": self_healing_open_addresses + } + # If a kwarg is passed in override automatically set variants + node_kwargs = ( + node_kwargs + | self_healing_open_kwargs + | forwarded_args_with_overrides + | kwargs + ) + node.recover(**node_kwargs) + except Exception: + LOG.exception(f"Failed to start node {node.local_node_id}") + raise + + self.election_duration = args.election_timeout_ms / 1000 + self.observed_election_duration = self.election_duration + 1 + + def cycle(items): + while True: + for item in items: + yield item + + # Waiting for any node to transition-to-open + end_time = time.time() + timeout + for node in cycle(self.nodes): + LOG.info(f"Seeing if node {node.local_node_id} has opened") + if time.time() > end_time: + LOG.error("Timed out waiting for any node to open") + raise TimeoutError("Timed out waiting for any node to open") + try: + self.wait_for_statuses( + node, + ["WaitingForRecoveryShares", "Open"], + timeout=0.5, + verify_ca=False, + ) + break + except Exception as e: + if isinstance(e, (CCFIOException, TimeoutError)) or ( + isinstance(e, RuntimeError) and "node is stopped" in str(e).lower() + ): + LOG.info( + f"Failed to get the status of {node.local_node_id}, retrying..." + ) + continue + raise e + + LOG.info("One node opened") + def recover( self, args, @@ -1221,24 +1362,52 @@ def get_live_nodes(self): def get_f(self): return infra.e2e_args.max_f(self.args, len(self.nodes)) - def wait_for_state(self, node, state, timeout=3): + def wait_for_states(self, node, states, timeout=3, **client_kwargs): end_time = time.time() + timeout + final_state = None while time.time() < end_time: try: - with node.client(connection_timeout=timeout) as c: + with node.client(connection_timeout=timeout, **client_kwargs) as c: r = c.get("/node/state").body.json() - if r["state"] == state: + if r["state"] in states: + final_state = r["state"] break except ConnectionRefusedError: pass + except CCFConnectionException: + pass time.sleep(0.1) else: raise TimeoutError( - f"Timed out waiting for state {state} on node {node.node_id}" + f"Timed out waiting for a state in {states} on node {node.node_id}" ) - if state == infra.node.State.PART_OF_NETWORK.value: + if final_state == infra.node.State.PART_OF_NETWORK.value: self.status = ServiceStatus.OPEN + def wait_for_state(self, node, state, timeout=3): + self.wait_for_states(node, [state], timeout=timeout) + + def wait_for_statuses(self, node, statuses, timeout=3, **client_kwargs): + end_time = time.time() + timeout + while time.time() < end_time: + try: + with node.client(connection_timeout=timeout, **client_kwargs) as c: + r = c.get("/node/network").body.json() + if r["service_status"] in statuses: + break + except ConnectionRefusedError: + pass + except CCFConnectionException: + pass + time.sleep(0.1) + else: + raise TimeoutError( + f"Timed out waiting for a network status in {statuses} on node {node.node_id}" + ) + + def wait_for_status(self, node, status, timeout=3): + self.wait_for_statuses(node, [status], timeout=timeout) + def _wait_for_app_open(self, node, timeout=3): end_time = time.time() + timeout logs = [] @@ -1731,7 +1900,7 @@ def refresh_service_identity_file(self, args): connections pick up the new service certificate. """ primary = self.find_random_node() - with primary.client() as c: + with primary.client(verify_ca=False) as c: r = c.get("/node/network") assert r.status_code == 200, r new_service_identity = r.body.json()["service_certificate"] diff --git a/tests/infra/node.py b/tests/infra/node.py index 46fda053f0c5..a3fd3970deeb 100644 --- a/tests/infra/node.py +++ b/tests/infra/node.py @@ -854,6 +854,21 @@ def wait_for_leadership_state(self, min_view, leadership_states, timeout=3): f"Node {self.local_node_id} was not in leadership states {leadership_states} in view > {min_view} after {timeout}s: {r}" ) + def refresh_network_state(self, **client_kwargs): + try: + with self.client(**client_kwargs) as c: + LOG.info(f"Trying to refresh using {c}") + r = c.get(f"/node/network/nodes/{self.node_id}").body.json() + LOG.info(r) + + if r["status"] == "Pending": + self.network_state = NodeNetworkState.started + elif r["status"] == "Trusted": + self.network_state = NodeNetworkState.joined + except Exception as e: + LOG.debug(f"Failed to connect {e}") + self.network_state = NodeNetworkState.stopped + @contextmanager def node( diff --git a/tests/infra/remote.py b/tests/infra/remote.py index 8ce2ff88d24f..c17956bc0129 100644 --- a/tests/infra/remote.py +++ b/tests/infra/remote.py @@ -317,6 +317,7 @@ def __init__( cose_signatures_subject="ledger.signature", sealed_ledger_secret_location=None, previous_sealed_ledger_secret_location=None, + self_healing_open_addresses=None, **kwargs, ): """ @@ -522,6 +523,7 @@ def __init__( historical_cache_soft_limit=historical_cache_soft_limit, cose_signatures_issuer=cose_signatures_issuer, cose_signatures_subject=cose_signatures_subject, + self_healing_open_addresses=self_healing_open_addresses, **auto_dr_args, **kwargs, ) diff --git a/tla/disaster-recovery/.gitignore b/tla/disaster-recovery/.gitignore new file mode 100644 index 000000000000..eb5a316cbd19 --- /dev/null +++ b/tla/disaster-recovery/.gitignore @@ -0,0 +1 @@ +target diff --git a/tla/disaster-recovery/Cargo.lock b/tla/disaster-recovery/Cargo.lock new file mode 100644 index 000000000000..b339a5f99ee6 --- /dev/null +++ b/tla/disaster-recovery/Cargo.lock @@ -0,0 +1,609 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "getrandom 0.3.3", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "anstream" +version = "0.6.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys", +] + +[[package]] +name = "ascii" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" + +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "bitflags" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" + +[[package]] +name = "ccf-selfhealingopen" +version = "0.0.0" +dependencies = [ + "clap", + "stateright", +] + +[[package]] +name = "cfg-if" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" + +[[package]] +name = "choice" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3b71fc821deaf602a933ada5c845d088156d0cdf2ebf43ede390afe93466553" + +[[package]] +name = "chunked_transfer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" + +[[package]] +name = "clap" +version = "4.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40b6887a1d8685cebccf115538db5c0efe625ccac9696ad45c409d96566e910f" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0c66c08ce9f0c698cbce5c0279d0bb6ac936d8674174fe48f736533b964f59e" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c7947ae4cc3d851207c1adb5b5e260ff0cca11446b1d6d1423788e442257ce" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" + +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.11.1+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "id-set" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9633fadf6346456cf8531119ba4838bc6d82ac4ce84d9852126dd2aa34d49264" + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + +[[package]] +name = "libc" +version = "0.2.173" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8cfeafaffdbc32176b64fb251369d52ea9f0a8fbc6f8759edffef7b525d64bb" + +[[package]] +name = "lock_api" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" + +[[package]] +name = "memchr" +version = "2.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" + +[[package]] +name = "nohash-hasher" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + +[[package]] +name = "once_cell_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" + +[[package]] +name = "parking_lot" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro2" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.16", +] + +[[package]] +name = "redox_syscall" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d04b7d0ee6b4a0207a0a7adb104d23ecb0b47d6beae7152d0fa34b692b29fd6" +dependencies = [ + "bitflags", +] + +[[package]] +name = "ryu" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "serde" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.140" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "stateright" +version = "0.30.2" +source = "git+https://github.com/cjen1-msft/stateright?branch=master#01959045c9c0cd69f7aa0498bf8dc48982eacdee" +dependencies = [ + "ahash", + "choice", + "crossbeam-utils", + "dashmap", + "id-set", + "log", + "nohash-hasher", + "parking_lot", + "rand", + "serde", + "serde_json", + "tiny_http", +] + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + +[[package]] +name = "syn" +version = "2.0.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4307e30089d6fd6aff212f2da3a1f9e32f3223b1f010fb09b7c95f90f3ca1e8" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tiny_http" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389915df6413a2e74fb181895f933386023c71110878cd0825588928e64cdc82" +dependencies = [ + "ascii", + "chunked_transfer", + "httpdate", + "log", +] + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags", +] + +[[package]] +name = "zerocopy" +version = "0.8.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1702d9583232ddb9174e01bb7c15a2ab8fb1bc6f227aa1233858c351a3ba0cb" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28a6e20d751156648aa063f3800b706ee209a32c0b4d9f24be3d980b01be55ef" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/tla/disaster-recovery/Cargo.toml b/tla/disaster-recovery/Cargo.toml new file mode 100644 index 000000000000..6769d9a6349f --- /dev/null +++ b/tla/disaster-recovery/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "ccf-selfhealingopen" +version = "0.0.0" + +[dependencies] +clap = { version = "4.5.38", features = ["derive"] } +stateright = { git = "https://github.com/cjen1-msft/stateright", branch="master" } \ No newline at end of file diff --git a/tla/disaster-recovery/Readme.md b/tla/disaster-recovery/Readme.md new file mode 100644 index 000000000000..e337787e0027 --- /dev/null +++ b/tla/disaster-recovery/Readme.md @@ -0,0 +1,11 @@ +# Self-healing-open specification in [stateright](https://github.com/stateright/stateright) + +The properties are specified in [main.rs](./src/main.rs), while the model is specified in [model.rs](./src/model.rs). + +Due to stateright being executable, there is little syntactic sugar, and so there is quite a bit of boilerplate. +The functional parts of the specification are in `advance_step`, `on_start`, `on_timeout` and `on_msg`. + +The specification can be checked from the command line via `cargo run check`. + +However, a more useful UX is via the web-view which is hosted locally via `cargo run serve`. +This allows you to explore the specification actions interactively, and the checker can be exhaustively run using the `Run to completion` button, which should find several useful examples of states where the network is opened, and where a deadlock is reached. diff --git a/tla/disaster-recovery/src/main.rs b/tla/disaster-recovery/src/main.rs new file mode 100644 index 000000000000..e6676d08d7c2 --- /dev/null +++ b/tla/disaster-recovery/src/main.rs @@ -0,0 +1,245 @@ +extern crate clap; +extern crate stateright; +use clap::Parser; +mod model; +use model::{ModelCfg, Msg, NextStep, Node, State}; +use stateright::{actor::*, report::WriteReporter, util::HashableHashSet, Checker, Model}; +use std::sync::Arc; + +fn implies(a: bool, b: bool) -> bool { + !a || b +} + +fn reached_open(state: &ActorModelState) -> bool { + state + .actor_states + .iter() + .any(|actor_state: &Arc| matches!(actor_state.next_step, NextStep::Open { .. })) +} + +fn reached_open_timeout(state: &ActorModelState, expected_to_timeout: bool) -> bool { + state.actor_states.iter().any(|actor_state: &Arc| { + matches! ( + actor_state.next_step, + NextStep::Open {timeout} if timeout == expected_to_timeout + ) + }) +} + +fn unanimous_votes(model: &ActorModel, state: &ActorModelState) -> bool { + let peers: HashableHashSet = (0..model.cfg.n_nodes) + .map(|i| Id::from(i as usize)) + .collect(); + state.actor_states.iter().all(|actor_state: &Arc| { + actor_state.submitted_vote.is_some() + && peers.iter().all(|peer| { + actor_state + .submitted_vote + .clone() + .unwrap() + .1 + .recv + .iter() + .any(|g| g.src == *peer) + }) + }) +} + +fn majority_have_same_maximum(state: &ActorModelState) -> bool { + // get the chosen replica of each replica into a vector and sort that vector + // that there is only one value up to the n/2th index + let mut chosen_replicas: Vec = state + .actor_states + .iter() + .filter(|actor_state: &&Arc| actor_state.submitted_vote.is_some()) + .map(|actor_state| { + actor_state + .submitted_vote + .clone() + .unwrap() + .1 + .recv + .iter() + .max_by_key(|g| (g.txid, g.src)) + .unwrap() + .src + }) + .collect(); + chosen_replicas.sort(); + let majority_idx = state.actor_states.len() / 2; + let majority_chosen_replica = chosen_replicas.get(majority_idx); + majority_chosen_replica.is_some() + && chosen_replicas[0..majority_idx] + .iter() + .all(|&r| r == *majority_chosen_replica.unwrap()) +} + +fn liveness_properties(model: ActorModel) -> ActorModel { + let model = model + .property( + stateright::Expectation::Eventually, + "Unanimous votes => no chance of a fork", + |model: &ActorModel, state: &ActorModelState| { + // Define deadlock as a path which does not reach open without + // Hence unanimous votes => reach open + // Hence on every path unanimous votes => <> reached open + // Since votes are not forgotten on a node, we check for a state where unanimous votes => reached open + return implies( + unanimous_votes(model, state), + reached_open_timeout(state, false), + ); + }, + ) + .property( + stateright::Expectation::Eventually, + "Open", + |_, state: &ActorModelState| { + // all runs should eventually open, either via the reliable method, or via the failover timeout + reached_open(state) + }, + ) + .property( + stateright::Expectation::Eventually, + "Majority votes => no fork", + |_, state: &ActorModelState| { + return implies( + majority_have_same_maximum(state), + reached_open_timeout(state, false), + ); + }, + ); + return model; +} + +fn invariant_properties(model: ActorModel) -> ActorModel { + let model = model + .property( + stateright::Expectation::Always, + "No open with timeout, no fork", + |_model: &ActorModel, state: &ActorModelState| { + // Check if there is no fork in the state + let open_node_count = state + .actor_states + .iter() + .filter(|actor_state: &&Arc| { + matches!(actor_state.next_step, NextStep::Open { .. }) + }) + .count(); + implies(!reached_open_timeout(state, true), open_node_count <= 1) + }, + ) + .property( + stateright::Expectation::Always, + "Deadlock", + |_model, state| { + let all_open_join = state + .actor_states + .iter() + .all(|actor_state: &Arc| actor_state.next_step == NextStep::OpenJoin); + let all_votes_delivered = state + .network + .iter_all() + .filter(|msg| matches!(msg.msg, Msg::Vote(_))) + .count() + == 0; + !(all_open_join && all_votes_delivered) + }, + ) + .property( + stateright::Expectation::Always, + "Persist committed txs", + |_model: &ActorModel, state: &ActorModelState| { + let majority_idx = state.actor_states.len() / 2; + let commit_txid = state + .actor_states + .iter() + .map(|actor_state| actor_state.txid) + .collect::>()[majority_idx]; + let cond = state + .actor_states + .iter() + .filter(|actor_state: &&Arc| { + matches!(actor_state.next_step, NextStep::Open { .. }) + }) + .all(|actor_state: &Arc| actor_state.txid >= commit_txid); + implies(!reached_open_timeout(state, true), cond) + }, + ); + return model; +} + +fn reachable_properties(model: ActorModel) -> ActorModel { + let model = model + .property( + stateright::Expectation::Sometimes, + "Open is possible", + |_, state| implies(state.actor_states.len() > 1, reached_open(state)), + ) + .property( + stateright::Expectation::Sometimes, + "Unsafe open with timeout", + |_, state| reached_open_timeout(state, true), + ) + .property( + stateright::Expectation::Sometimes, + "Majority vote still opens without timeout", + |_model, state| majority_have_same_maximum(state) && reached_open_timeout(state, false), + ); + return model; +} + +fn properties(model: ActorModel) -> ActorModel { + let model = liveness_properties(model); + let model = invariant_properties(model); + let model = reachable_properties(model); + return model; +} + +#[derive(Parser, Debug)] +#[command(version, about = "Model for CCF's self-healing-open", long_about = None)] +struct CliArgs { + #[clap(short, long, default_value = "3")] + n_nodes: usize, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Parser, Debug)] +enum Commands { + /// Check the model + Check, + /// Serve the model on localhost:8080 + Serve, +} + +fn check(model: ActorModel) { + let checker = model + .checker() + //.symmetry() + .spawn_bfs() + .join_and_report(&mut WriteReporter::new(&mut std::io::stderr())); + checker.assert_properties(); +} + +fn serve(model: ActorModel) { + let checker = model.checker(); + println!("Serving model on http://localhost:8080"); + checker.serve("localhost:8080"); +} + +fn main() { + let args = CliArgs::parse(); + + let model = ModelCfg { + n_nodes: args.n_nodes, + } + .into_model(); + + let model = properties(model); + + match args.command { + Commands::Check => check(model), + Commands::Serve => serve(model), + } +} diff --git a/tla/disaster-recovery/src/model.rs b/tla/disaster-recovery/src/model.rs new file mode 100644 index 000000000000..d156ef314a12 --- /dev/null +++ b/tla/disaster-recovery/src/model.rs @@ -0,0 +1,195 @@ +extern crate stateright; +use stateright::{actor::*, util::HashableHashSet}; +use std::borrow::Cow; + +type Txid = u64; + +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct GossipStruct { + pub src: Id, + pub txid: Txid, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct VoteStruct { + pub src: Id, + pub recv: HashableHashSet, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub enum Msg { + Gossip(GossipStruct), + Vote(VoteStruct), + IAmOpen(Id), +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub enum Timer { + ElectionTimeout, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub enum NextStep { + Vote, + OpenJoin, + Open { timeout: bool }, + Join, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct State { + pub next_step: NextStep, + pub gossips: HashableHashSet, + pub votes: HashableHashSet, + pub submitted_vote: Option<(Id, VoteStruct)>, + pub txid: Txid, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct Node { + pub peers: HashableHashSet, +} + +impl Node { + fn vote_for_max<'a>(gossips: &HashableHashSet, id: Id) -> (Id, VoteStruct) +where { + let dst = gossips + .iter() + .clone() + .max_by_key(|g| (g.txid, g.src)) + .unwrap() + .src; + let vote = VoteStruct { + src: id, + recv: gossips.clone(), + }; + return (dst, vote); + } + + fn other_peers(&self, id: Id) -> Vec { + self.peers.iter().filter(|&&p| p != id).cloned().collect() + } + + fn advance_step(&self, state: &mut State, o: &mut Out, id: Id, timeout: bool) -> bool { + match state.next_step { + NextStep::Vote if state.gossips.len() == self.peers.len() || timeout => { + let (dst, vote) = Node::vote_for_max(&state.gossips, id); + state.submitted_vote = Some((dst, vote.clone())); + if dst == id { + state.votes.insert(vote); + } else { + o.send(dst, Msg::Vote(vote)); + } + state.next_step = NextStep::OpenJoin; + return true; + } + NextStep::OpenJoin if state.votes.len() >= (self.peers.len() + 1) / 2 || timeout => { + state.next_step = NextStep::Open { timeout }; + o.broadcast(&self.other_peers(id), &Msg::IAmOpen(id)); + return true; + } + _ => false, + } + } + + fn advance_several(&self, state: &mut State, o: &mut Out, id: Id, timeout: bool) { + while self.advance_step(state, o, id, timeout) {} + } +} + +impl Actor for Node { + type Msg = Msg; + type State = State; + type Timer = Timer; + type Storage = (); + type Random = (); + + fn on_start(&self, id: Id, _storage: &Option, o: &mut Out) -> Self::State { + let txid = usize::from(id) as Txid; // Use id as txid for simplicity + let gossip = GossipStruct { src: id, txid }; + let mut gossips = HashableHashSet::new(); + gossips.insert(gossip.clone()); + let mut state = State { + next_step: NextStep::Vote, + gossips, + votes: HashableHashSet::new(), + submitted_vote: None, + txid: usize::from(id) as Txid, + }; + o.broadcast(&self.other_peers(id), &Msg::Gossip(gossip)); + o.set_timer(Timer::ElectionTimeout, model_timeout()); + self.advance_several(&mut state, o, id, false); + return state; + } + + fn on_timeout(&self, id: Id, state: &mut Cow, timer: &Timer, o: &mut Out) { + match timer { + Timer::ElectionTimeout => match state.next_step { + NextStep::Vote if !state.gossips.is_empty() => { + let state = state.to_mut(); + self.advance_several(state, o, id, true); + o.set_timer(Timer::ElectionTimeout, model_timeout()); + } + NextStep::OpenJoin if !state.votes.is_empty() => { + let state = state.to_mut(); + self.advance_several(state, o, id, true); + } + _ => { + o.set_timer(Timer::ElectionTimeout, model_timeout()); + } + }, + } + } + + fn on_msg( + &self, + id: Id, + state: &mut Cow, + _src: Id, + msg: Self::Msg, + o: &mut Out, + ) { + let state = state.to_mut(); + match msg { + Msg::Gossip(gossip) => { + // Freeze gossip collection after voting is submitted + if !state.gossips.contains(&gossip) && state.submitted_vote.is_none() { + state.gossips.insert(gossip.clone()); + } + } + Msg::Vote(vote) => { + if !state.votes.contains(&vote) { + state.votes.insert(vote); + } + } + Msg::IAmOpen(_) => { + if !matches!(state.next_step, NextStep::Open { .. }) { + state.next_step = NextStep::Join; + } + } + }; + self.advance_several(state, o, id, false); + } +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct ModelCfg { + pub n_nodes: usize, +} + +impl ModelCfg { + pub fn into_model(self) -> ActorModel { + let peers: HashableHashSet = (0..self.n_nodes).map(|i| Id::from(i as usize)).collect(); + ActorModel::new(self.clone(), ()) + .actors( + (0..self.n_nodes) + .map(|_| Node { + peers: peers.clone(), + }) + .collect::>(), + ) + //.init_network(Network::new_ordered([])) + .init_network(Network::new_unordered_nonduplicating([])) + .lossy_network(LossyNetwork::No) + } +}