diff --git a/.github/workflows/cpp_tagged_release.yaml b/.github/workflows/cpp_tagged_release.yaml new file mode 100644 index 000000000..416676c6e --- /dev/null +++ b/.github/workflows/cpp_tagged_release.yaml @@ -0,0 +1,57 @@ +--- +name: "cpp-tagged-release" + +on: + push: + tags: + - "cpp-*" + +jobs: + tagged-release: + name: "C++ Tagged Release" + runs-on: "ubuntu-latest" + + steps: + - uses: actions/checkout@v4 + - uses: actions/checkout@v4 + with: + repository: grpc/grpc + # The branch, tag or SHA to checkout. When checking out the repository that + # triggered a workflow, this defaults to the reference or SHA for that event. + # Otherwise, uses the default branch. + ref: 'v1.46.3' + # Relative path under $GITHUB_WORKSPACE to place the repository + path: cpp/repo/grpc + submodules: true + - name: "Install Dependencies" + run: | + sudo apt-get install -y build-essential autoconf libtool pkg-config cmake git libprotobuf-dev libssl-dev zlib1g-dev libgflags-dev + - name: "Build gRPC" + working-directory: ./cpp/repo/grpc + run: | + mkdir _build && cd _build + cmake -DCMAKE_INSTALL_PREFIX=$HOME/grpc -DgRPC_SSL_PROVIDER=package -DgRPC_ZLIB_PROVIDER=package -DgRPC_PROTOBUF_PACKAGE_TYPE=CONFIG -DgRPC_ZLIB_PROVIDER=package .. + make + make install + - name: "Build Libraries" + working-directory: ./cpp + run: | + mkdir _build && cd _build + cmake .. + make + - name: "Package" + working-directory: ./cpp + run: | + mkdir -p dist/lib + mkdir -p dist/include + cp -r include/rocketmq dist/include/ + cp _build/librocketmq.so dist/lib/ + cp _build/librocketmq.a dist/lib/ + tar -czvf dist.tar.gz dist + - uses: "marvinpinto/action-automatic-releases@latest" + with: + repo_token: "${{ secrets.GITHUB_TOKEN }}" + prerelease: false + automatic_release_tag: cpp + files: | + cpp/dist.tar.gz \ No newline at end of file diff --git a/.licenserc.yaml b/.licenserc.yaml index 4edcd18fc..cb5c87478 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -48,6 +48,7 @@ header: - 'cpp/.gitignore' - 'cpp/third_party' - 'cpp/cmake' + - 'cpp/source/exports.map' - 'php/grpc/**/*.php' - 'php/composer.json' - 'rust/.cargo/Cargo.lock.min' diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 374466e69..42f7cd702 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.19) +cmake_minimum_required(VERSION 3.16) project(rocketmq) set(CMAKE_CXX_STANDARD 11) set(CMAKE_POSITION_INDEPENDENT_CODE ON) @@ -31,4 +31,4 @@ if (BUILD_EXAMPLES) find_package(gflags REQUIRED) find_package(ZLIB REQUIRED) add_subdirectory(examples) -endif () \ No newline at end of file +endif () diff --git a/cpp/README.md b/cpp/README.md index 117a2426d..a6d983fb6 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -217,19 +217,13 @@ if "com_google_googletest" not in native.existing_rules(): 1. VSCode + Clangd [Clangd](https://clangd.llvm.org/) is a really nice code completion tool. Clangd requires compile_commands.json to work properly. - To generate the file, we need clone another repository along with the current one. + To generate the file, run the following command: ```sh - git clone git@github.com:grailbio/bazel-compilation-database.git + ./tools/gen_compile_commands.sh ``` - From current repository root, - - ```sh - ../bazel-compilation-database/generate.sh - ``` - - Once the script completes, you should have compile_commands.json file in the repository root directory. + Once the script completes, you should have compile_commands.json file in the workspace directory, aka, ${repository}/cpp. LLVM project has an extension for [clangd](https://marketplace.visualstudio.com/items?itemName=llvm-vs-code-extensions.vscode-clangd). Please install it from the extension market. @@ -239,8 +233,15 @@ if "com_google_googletest" not in native.existing_rules(): "C_Cpp.intelliSenseEngine": "Disabled", "C_Cpp.autocomplete": "Disabled", // So you don't get autocomplete from both extensions. "C_Cpp.errorSquiggles": "Disabled", // So you don't get error squiggles from both extensions (clangd's seem to be more reliable anyway). - "clangd.path": "/Users/lizhanhui/usr/clangd_12.0.0/bin/clangd", - "clangd.arguments": ["-log=verbose", "-pretty", "--background-index"], + "clangd.path": "/usr/bin/clangd", + "clangd.arguments": [ + "-log=verbose", + "-pretty", + "--background-index", + "--header-insertion=never", + "--compile-commands-dir=${workspaceFolder}/", + "--query-driver=**" + ], "clangd.onConfigChanged": "restart", ``` diff --git a/cpp/WORKSPACE b/cpp/WORKSPACE index d09dd4458..3c3d64763 100644 --- a/cpp/WORKSPACE +++ b/cpp/WORKSPACE @@ -27,4 +27,28 @@ http_archive( load("@io_buildbuddy_buildbuddy_toolchain//:deps.bzl", "buildbuddy_deps") buildbuddy_deps() load("@io_buildbuddy_buildbuddy_toolchain//:rules.bzl", "buildbuddy") -buildbuddy(name = "buildbuddy_toolchain") \ No newline at end of file +buildbuddy(name = "buildbuddy_toolchain") + +# Generate compile_commands.json +load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") + + +# Hedron's Compile Commands Extractor for Bazel +# https://github.com/hedronvision/bazel-compile-commands-extractor +http_archive( + name = "hedron_compile_commands", + + # Replace the commit hash (0e990032f3c5a866e72615cf67e5ce22186dcb97) in both places (below) with the latest (https://github.com/hedronvision/bazel-compile-commands-extractor/commits/main), rather than using the stale one here. + # Even better, set up Renovate and let it do the work for you (see "Suggestion: Updates" in the README). + url = "https://github.com/hedronvision/bazel-compile-commands-extractor/archive/204aa593e002cbd177d30f11f54cff3559110bb9.tar.gz", + strip_prefix = "bazel-compile-commands-extractor-204aa593e002cbd177d30f11f54cff3559110bb9", + # When you first run this tool, it'll recommend a sha256 hash to put here with a message like: "DEBUG: Rule 'hedron_compile_commands' indicated that a canonical reproducible form can be obtained by modifying arguments sha256 = ..." +) +load("@hedron_compile_commands//:workspace_setup.bzl", "hedron_compile_commands_setup") +hedron_compile_commands_setup() +load("@hedron_compile_commands//:workspace_setup_transitive.bzl", "hedron_compile_commands_setup_transitive") +hedron_compile_commands_setup_transitive() +load("@hedron_compile_commands//:workspace_setup_transitive_transitive.bzl", "hedron_compile_commands_setup_transitive_transitive") +hedron_compile_commands_setup_transitive_transitive() +load("@hedron_compile_commands//:workspace_setup_transitive_transitive_transitive.bzl", "hedron_compile_commands_setup_transitive_transitive_transitive") +hedron_compile_commands_setup_transitive_transitive_transitive() \ No newline at end of file diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl index eae31a6ff..684e55eb0 100644 --- a/cpp/bazel/rocketmq_deps.bzl +++ b/cpp/bazel/rocketmq_deps.bzl @@ -16,7 +16,6 @@ def rocketmq_deps(): sha256 = "b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5", strip_prefix = "googletest-release-1.11.0", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz", "https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz", ], ) @@ -27,7 +26,6 @@ def rocketmq_deps(): strip_prefix = "filesystem-1.5.0", sha256 = "eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz", "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz", ], build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD", @@ -39,7 +37,6 @@ def rocketmq_deps(): strip_prefix = "spdlog-1.9.2", sha256 = "6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz", "https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz", ], build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD", @@ -51,7 +48,6 @@ def rocketmq_deps(): strip_prefix = "fmt-8.0.1", sha256 = "b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz", "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz", ], build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD", @@ -63,7 +59,6 @@ def rocketmq_deps(): sha256 = "8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930", strip_prefix = "protobuf-3.20.1", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz", "https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz", ], ) @@ -74,7 +69,6 @@ def rocketmq_deps(): sha256 = "507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731", strip_prefix = "rules_proto_grpc-4.1.1", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz", "https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz", ], ) @@ -84,7 +78,6 @@ def rocketmq_deps(): name = "io_opencensus_cpp", sha256 = "317f2bfdaba469561c7e64b1a55282b87e677c109c9d8877097940e6d5cbca08", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/opencensus-cpp/opencensus-cpp-0.4.1.tar.gz", "https://github.com/lizhanhui/opencensus-cpp/archive/refs/tags/v0.4.1.tar.gz", ], strip_prefix = "opencensus-cpp-0.4.1", @@ -96,7 +89,6 @@ def rocketmq_deps(): sha256 = "dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4", strip_prefix = "abseil-cpp-20211102.0", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz", "https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz", ], ) @@ -107,7 +99,6 @@ def rocketmq_deps(): strip_prefix = "gflags-2.2.2", sha256 = "34af2f15cf7367513b352bdcd2493ab14ce43692d2dcd9dfc499492966c64dcf", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/gflags/gflags-2.2.2.tar.gz", "https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz", ], ) @@ -118,7 +109,6 @@ def rocketmq_deps(): strip_prefix = "grpc-1.46.3", sha256 = "d6cbf22cb5007af71b61c6be316a79397469c58c82a942552a62e708bce60964", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.3.tar.gz", "https://github.com/grpc/grpc/archive/refs/tags/v1.46.3.tar.gz", ], ) @@ -130,7 +120,6 @@ def rocketmq_deps(): build_file = "@org_apache_rocketmq//third_party:asio.BUILD", strip_prefix = "asio-1.18.2", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/asio/asio-1.18.2.tar.gz", "https://github.com/lizhanhui/asio/archive/refs/tags/v1.18.2.tar.gz", ], ) @@ -140,7 +129,6 @@ def rocketmq_deps(): name = "com_google_googleapis", sha256 = "e89f15d54b0ddab0cd41d18cb2299e5447db704e2b05ff141cb1769170671466", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googleapis/googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip", "https://github.com/googleapis/googleapis/archive/af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip", ], strip_prefix = "googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95", @@ -152,7 +140,6 @@ def rocketmq_deps(): sha256 = "cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd", strip_prefix = "rules_python-0.8.1", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz", "https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz", ], ) @@ -161,7 +148,6 @@ def rocketmq_deps(): http_archive, name = "rules_swift", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_swift/rules_swift-0.27.0.tar.gz", "https://github.com/bazelbuild/rules_swift/archive/refs/tags/0.27.0.tar.gz", ], strip_prefix = "rules_swift-0.27.0", @@ -172,7 +158,6 @@ def rocketmq_deps(): name = "io_bazel_rules_go", sha256 = "685052b498b6ddfe562ca7a97736741d87916fe536623afb7da2824c0211c369", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-go/rules_go-v0.33.0.zip", "https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip", "https://github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip", ], @@ -184,7 +169,15 @@ def rocketmq_deps(): sha256 = "e017528fd1c91c5a33f15493e3a398181a9e821a804eb7ff5acdd1d2d6c2b18d", strip_prefix = "rules_proto-4.0.0-3.20.0", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto/rules_proto-4.0.0-3.20.0.tar.gz", "https://github.com/bazelbuild/rules_proto/archive/refs/tags/4.0.0-3.20.0.tar.gz", ], ) + + maybe( + http_archive, + name = "com_github_opentelemetry", + strip_prefix = "opentelemetry-cpp-1.14.2", + urls = [ + "https://github.com/open-telemetry/opentelemetry-cpp/archive/refs/tags/v1.14.2.tar.gz" + ] + ) diff --git a/cpp/examples/BUILD.bazel b/cpp/examples/BUILD.bazel index 5113e1508..771b0e04d 100644 --- a/cpp/examples/BUILD.bazel +++ b/cpp/examples/BUILD.bazel @@ -91,4 +91,15 @@ cc_binary( "//source/rocketmq:rocketmq_library", "@com_github_gflags_gflags//:gflags", ], +) + +cc_binary( + name = "example_fifo_producer", + srcs = [ + "ExampleFifoProducer.cpp", + ], + deps = [ + "//source/rocketmq:rocketmq_library", + "@com_github_gflags_gflags//:gflags", + ] ) \ No newline at end of file diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index 8d6b0399e..273044777 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -4,6 +4,7 @@ function(add_example name file) endfunction() add_example(example_producer ExampleProducer.cpp) +add_example(example_fifo_producer ExampleFifoProducer.cpp) add_example(example_producer_with_async ExampleProducerWithAsync.cpp) add_example(example_producer_with_fifo_message ExampleProducerWithFifoMessage.cpp) add_example(example_producer_with_timed_message ExampleProducerWithTimedMessage.cpp) diff --git a/cpp/examples/ExampleFifoProducer.cpp b/cpp/examples/ExampleFifoProducer.cpp new file mode 100644 index 000000000..9d99be367 --- /dev/null +++ b/cpp/examples/ExampleFifoProducer.cpp @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "gflags/gflags.h" +#include "rocketmq/CredentialsProvider.h" +#include "rocketmq/FifoProducer.h" +#include "rocketmq/Logger.h" +#include "rocketmq/Message.h" +#include "rocketmq/Producer.h" +#include "rocketmq/SendReceipt.h" + +using namespace ROCKETMQ_NAMESPACE; + +/** + * @brief A simple Semaphore to limit request concurrency. + */ +class Semaphore { +public: + Semaphore(std::size_t permits) : permits_(permits) { + } + + /** + * @brief Acquire a permit. + */ + void acquire() { + while (true) { + std::unique_lock lk(mtx_); + if (permits_ > 0) { + permits_--; + return; + } + cv_.wait(lk, [this]() { return permits_ > 0; }); + } + } + + /** + * @brief Release the permit back to semaphore. + */ + void release() { + std::unique_lock lk(mtx_); + permits_++; + if (1 == permits_) { + cv_.notify_one(); + } + } + +private: + std::size_t permits_{0}; + std::mutex mtx_; + std::condition_variable cv_; +}; + +const std::string& alphaNumeric() { + static std::string alpha_numeric("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); + return alpha_numeric; +} + +std::string randomString(std::string::size_type len) { + std::string result; + result.reserve(len); + std::random_device rd; + std::mt19937 generator(rd()); + std::string source(alphaNumeric()); + std::string::size_type generated = 0; + while (generated < len) { + std::shuffle(source.begin(), source.end(), generator); + std::string::size_type delta = std::min({len - generated, source.length()}); + result.append(source.substr(0, delta)); + generated += delta; + } + return result; +} + +DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published"); +DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider"); +DEFINE_int32(message_body_size, 4096, "Message body size"); +DEFINE_uint32(total, 256, "Number of sample messages to publish"); +DEFINE_string(access_key, "", "Your access key ID"); +DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); +DEFINE_uint32(concurrency, 16, "Concurrency of FIFO producer"); + +int main(int argc, char* argv[]) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + auto& logger = getLogger(); + logger.setConsoleLevel(Level::Debug); + logger.setLevel(Level::Debug); + logger.init(); + + // Access Key/Secret pair may be acquired from management console + CredentialsProviderPtr credentials_provider; + if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) { + credentials_provider = std::make_shared(FLAGS_access_key, FLAGS_access_secret); + } + + // In most case, you don't need to create too many producers, singleton pattern is recommended. + auto producer = FifoProducer::newBuilder() + .withConfiguration(Configuration::newBuilder() + .withEndpoints(FLAGS_access_point) + .withCredentialsProvider(credentials_provider) + .withSsl(FLAGS_tls) + .build()) + .withConcurrency(FLAGS_concurrency) + .withTopics({FLAGS_topic}) + .build(); + + std::atomic_bool stopped; + std::atomic_long count(0); + + auto stats_lambda = [&] { + while (!stopped.load(std::memory_order_relaxed)) { + long cnt = count.load(std::memory_order_relaxed); + while (!count.compare_exchange_weak(cnt, 0)) { + cnt = count.load(std::memory_order_relaxed); + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + std::cout << "QPS: " << cnt << std::endl; + } + }; + + std::thread stats_thread(stats_lambda); + + std::string body = randomString(FLAGS_message_body_size); + + std::size_t completed = 0; + std::mutex mtx; + std::condition_variable cv; + + std::unique_ptr semaphore(new Semaphore(FLAGS_concurrency)); + + try { + for (std::size_t i = 0; i < FLAGS_total; ++i) { + auto message = Message::newBuilder() + .withTopic(FLAGS_topic) + .withTag("TagA") + .withKeys({"Key-" + std::to_string(i)}) + .withGroup("message-group" + std::to_string(i % FLAGS_concurrency)) + .withBody(body) + .build(); + std::error_code ec; + auto callback = [&](const std::error_code& ec, const SendReceipt& receipt) mutable { + completed++; + count++; + semaphore->release(); + + if (completed >= FLAGS_total) { + cv.notify_all(); + } + }; + + semaphore->acquire(); + producer.send(std::move(message), callback); + std::cout << "Cached No." << i << " message" << std::endl; + } + } catch (...) { + std::cerr << "Ah...No!!!" << std::endl; + } + + { + std::unique_lock lk(mtx); + cv.wait(lk, [&]() { return completed >= FLAGS_total; }); + std::cout << "Completed: " << completed << ", total: " << FLAGS_total << std::endl; + } + + stopped.store(true, std::memory_order_relaxed); + if (stats_thread.joinable()) { + stats_thread.join(); + } + + return EXIT_SUCCESS; +} diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp index 57293c247..5e20cc12d 100644 --- a/cpp/examples/ExampleProducer.cpp +++ b/cpp/examples/ExampleProducer.cpp @@ -57,6 +57,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -77,7 +78,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); @@ -88,8 +89,8 @@ int main(int argc, char* argv[]) { auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); - while (count.compare_exchange_weak(cnt, 0)) { - break; + while (!count.compare_exchange_weak(cnt, 0)) { + cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; diff --git a/cpp/examples/ExampleProducerWithAsync.cpp b/cpp/examples/ExampleProducerWithAsync.cpp index 62ee77811..d88dfc85d 100644 --- a/cpp/examples/ExampleProducerWithAsync.cpp +++ b/cpp/examples/ExampleProducerWithAsync.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -97,6 +96,7 @@ DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_uint32(concurrency, 128, "Concurrency of async send"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -116,7 +116,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); @@ -127,8 +127,8 @@ int main(int argc, char* argv[]) { auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); - while (count.compare_exchange_weak(cnt, 0)) { - break; + while (!count.compare_exchange_weak(cnt, 0)) { + cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp b/cpp/examples/ExampleProducerWithFifoMessage.cpp index 09b8d407e..4fa34f9d8 100644 --- a/cpp/examples/ExampleProducerWithFifoMessage.cpp +++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp @@ -54,6 +54,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -74,7 +75,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); @@ -83,10 +84,11 @@ int main(int argc, char* argv[]) { std::atomic_long count(0); auto stats_lambda = [&] { + std::cout << "Stats thread starts" << std::endl; while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); - while (count.compare_exchange_weak(cnt, 0)) { - break; + while (!count.compare_exchange_weak(cnt, 0)) { + cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; @@ -109,7 +111,7 @@ int main(int argc, char* argv[]) { .build(); std::error_code ec; SendReceipt send_receipt = producer.send(std::move(message), ec); - std::cout << "Message-ID: " << send_receipt.message_id << std::endl; + // std::cout << "Message-ID: " << send_receipt.message_id << std::endl; count++; } } catch (...) { diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp b/cpp/examples/ExampleProducerWithTimedMessage.cpp index 8f12f5b6d..d62374599 100644 --- a/cpp/examples/ExampleProducerWithTimedMessage.cpp +++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp @@ -56,6 +56,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -75,7 +76,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); @@ -86,8 +87,8 @@ int main(int argc, char* argv[]) { auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); - while (count.compare_exchange_weak(cnt, 0)) { - break; + while (!count.compare_exchange_weak(cnt, 0)) { + cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp index befb18ca7..50620c5a0 100644 --- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp +++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp @@ -54,6 +54,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -79,7 +80,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .withTransactionChecker(checker) @@ -91,8 +92,8 @@ int main(int argc, char* argv[]) { auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); - while (count.compare_exchange_weak(cnt, 0)) { - break; + while (!count.compare_exchange_weak(cnt, 0)) { + cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; diff --git a/cpp/examples/ExamplePushConsumer.cpp b/cpp/examples/ExamplePushConsumer.cpp index 1e20b2eef..66a85f4b5 100644 --- a/cpp/examples/ExamplePushConsumer.cpp +++ b/cpp/examples/ExamplePushConsumer.cpp @@ -16,7 +16,6 @@ */ #include #include -#include #include #include "gflags/gflags.h" @@ -30,6 +29,7 @@ DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provide DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -58,7 +58,7 @@ int main(int argc, char* argv[]) { .withEndpoints(FLAGS_access_point) .withRequestTimeout(std::chrono::seconds(3)) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withConsumeThreads(4) .withListener(listener) diff --git a/cpp/examples/ExampleSimpleConsumer.cpp b/cpp/examples/ExampleSimpleConsumer.cpp index 4c30214fb..aedec71e8 100644 --- a/cpp/examples/ExampleSimpleConsumer.cpp +++ b/cpp/examples/ExampleSimpleConsumer.cpp @@ -16,7 +16,6 @@ */ #include #include -#include #include "gflags/gflags.h" #include "rocketmq/Logger.h" @@ -29,6 +28,7 @@ DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provide DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -51,7 +51,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .subscribe(FLAGS_topic, tag) .build(); diff --git a/cpp/include/rocketmq/Configuration.h b/cpp/include/rocketmq/Configuration.h index 0037c270d..6dcd4137c 100644 --- a/cpp/include/rocketmq/Configuration.h +++ b/cpp/include/rocketmq/Configuration.h @@ -44,7 +44,7 @@ class Configuration { } bool withSsl() const { - return withSsl_; + return tls_; } protected: @@ -56,7 +56,7 @@ class Configuration { std::string endpoints_; CredentialsProviderPtr credentials_provider_; std::chrono::milliseconds request_timeout_{ConfigurationDefaults::RequestTimeout}; - bool withSsl_ = true; + bool tls_ = true; }; class ConfigurationBuilder { @@ -67,7 +67,7 @@ class ConfigurationBuilder { ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds request_timeout); - ConfigurationBuilder& withSsl(bool enable); + ConfigurationBuilder& withSsl(bool with_ssl); Configuration build(); diff --git a/cpp/include/rocketmq/FifoProducer.h b/cpp/include/rocketmq/FifoProducer.h new file mode 100644 index 000000000..1ee20d916 --- /dev/null +++ b/cpp/include/rocketmq/FifoProducer.h @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include "Configuration.h" +#include "Message.h" +#include "RocketMQ.h" +#include "SendCallback.h" + +ROCKETMQ_NAMESPACE_BEGIN + +class FifoProducerImpl; +class FifoProducerBuilder; +class ProducerImpl; + +class FifoProducer { +public: + static FifoProducerBuilder newBuilder(); + + void send(MessageConstPtr message, SendCallback callback); + +private: + std::shared_ptr impl_; + + explicit FifoProducer(std::shared_ptr impl) : impl_(std::move(impl)) { + } + + void start(); + + friend class FifoProducerBuilder; +}; + +class FifoProducerBuilder { +public: + FifoProducerBuilder(); + + FifoProducerBuilder& withConfiguration(Configuration configuration); + + FifoProducerBuilder& withTopics(const std::vector& topics); + + FifoProducerBuilder& withConcurrency(std::size_t concurrency); + + FifoProducer build(); + +private: + std::shared_ptr impl_; + std::shared_ptr producer_impl_; +}; + +ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h index 42004eb31..6b42843d3 100644 --- a/cpp/include/rocketmq/Producer.h +++ b/cpp/include/rocketmq/Producer.h @@ -16,20 +16,17 @@ */ #pragma once -#include -#include #include #include #include #include "Configuration.h" -#include "ErrorCode.h" -#include "Logger.h" #include "Message.h" #include "SendCallback.h" #include "SendReceipt.h" #include "Transaction.h" #include "TransactionChecker.h" +#include "rocketmq/Logger.h" ROCKETMQ_NAMESPACE_BEGIN diff --git a/cpp/include/rocketmq/SendReceipt.h b/cpp/include/rocketmq/SendReceipt.h index 489df5ec3..7eef6e790 100644 --- a/cpp/include/rocketmq/SendReceipt.h +++ b/cpp/include/rocketmq/SendReceipt.h @@ -16,20 +16,21 @@ */ #pragma once -#include #include -#include #include "RocketMQ.h" +#include "rocketmq/Message.h" ROCKETMQ_NAMESPACE_BEGIN struct SendReceipt { + std::string target; + std::string message_id; std::string transaction_id; - std::string target; + MessageConstPtr message; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/CMakeLists.txt b/cpp/source/CMakeLists.txt index d42c6a1ae..dbf5ab01f 100644 --- a/cpp/source/CMakeLists.txt +++ b/cpp/source/CMakeLists.txt @@ -53,7 +53,10 @@ target_link_libraries(rocketmq_shared opencensus::stats opencensus_proto spdlog) +set(VERSION_SCRIPT ${CMAKE_CURRENT_SOURCE_DIR}/exports.map) +set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -Wl,--version-script=${VERSION_SCRIPT}") set_target_properties(rocketmq_shared PROPERTIES + LINK_DEPENDS ${VERSION_SCRIPT} LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR} LIBRARY_OUTPUT_NAME rocketmq) \ No newline at end of file diff --git a/cpp/source/base/Configuration.cpp b/cpp/source/base/Configuration.cpp index 2a136d5d9..66cff2e8c 100644 --- a/cpp/source/base/Configuration.cpp +++ b/cpp/source/base/Configuration.cpp @@ -38,8 +38,8 @@ ConfigurationBuilder& ConfigurationBuilder::withRequestTimeout(std::chrono::mill return *this; } -ConfigurationBuilder& ConfigurationBuilder::withSsl(bool enable) { - configuration_.withSsl_ = enable; +ConfigurationBuilder& ConfigurationBuilder::withSsl(bool with_ssl) { + configuration_.tls_ = with_ssl; return *this; } diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp index 5865dbb23..7d724c7b0 100644 --- a/cpp/source/client/ClientManagerImpl.cpp +++ b/cpp/source/client/ClientManagerImpl.cpp @@ -24,34 +24,30 @@ #include #include -#include "apache/rocketmq/v2/definition.pb.h" #include "InvocationContext.h" #include "LogInterceptor.h" #include "LogInterceptorFactory.h" -#include "rocketmq/Logger.h" -#include "spdlog/spdlog.h" -#include "MessageExt.h" -#include "MetadataConstants.h" #include "MixAll.h" #include "Protocol.h" #include "ReceiveMessageContext.h" #include "RpcClient.h" #include "RpcClientImpl.h" #include "Scheduler.h" -#include "TlsHelper.h" +#include "SchedulerImpl.h" #include "UtilAll.h" #include "google/protobuf/util/time_util.h" #include "grpcpp/create_channel.h" #include "rocketmq/ErrorCode.h" -#include "rocketmq/SendReceipt.h" +#include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN -ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool withSsl) - : scheduler_(std::make_shared()), resource_namespace_(std::move(resource_namespace)), +ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_ssl) + : scheduler_(std::make_shared()), + resource_namespace_(std::move(resource_namespace)), state_(State::CREATED), callback_thread_pool_(absl::make_unique(std::thread::hardware_concurrency())), - withSsl_(withSsl){ + with_ssl_(with_ssl) { certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create(); tls_channel_credential_options_.set_verify_server_certs(false); tls_channel_credential_options_.set_check_call_host(false); @@ -175,8 +171,10 @@ std::vector ClientManagerImpl::cleanOfflineRpcClients() { return removed; } -void ClientManagerImpl::heartbeat(const std::string& target_host, const Metadata& metadata, - const HeartbeatRequest& request, std::chrono::milliseconds timeout, +void ClientManagerImpl::heartbeat(const std::string& target_host, + const Metadata& metadata, + const HeartbeatRequest& request, + std::chrono::milliseconds timeout, const std::function& cb) { SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host, request.DebugString()); auto client = getRpcClient(target_host, true); @@ -279,8 +277,10 @@ void ClientManagerImpl::doHeartbeat() { } } -bool ClientManagerImpl::send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request, - SendCallback cb) { +bool ClientManagerImpl::send(const std::string& target_host, + const Metadata& metadata, + SendMessageRequest& request, + SendResultCallback cb) { assert(cb); SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.DebugString()); RpcClientSharedPtr client = getRpcClient(target_host); @@ -306,15 +306,14 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met return; } - SendReceipt send_receipt = {}; - send_receipt.target = target_host; - std::error_code ec; + SendResult send_result = {}; + send_result.target = target_host; if (!invocation_context->status.ok()) { SPDLOG_WARN("Failed to send message to {} due to gRPC error. gRPC code: {}, gRPC error message: {}", invocation_context->remote_address, invocation_context->status.error_code(), invocation_context->status.error_message()); - ec = ErrorCode::RequestTimeout; - cb(ec, send_receipt); + send_result.ec = ErrorCode::RequestTimeout; + cb(send_result); return; } @@ -323,8 +322,8 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met case rmq::Code::OK: { if (!invocation_context->response.entries().empty()) { auto first = invocation_context->response.entries().begin(); - send_receipt.message_id = first->message_id(); - send_receipt.transaction_id = first->transaction_id(); + send_result.message_id = first->message_id(); + send_result.transaction_id = first->transaction_id(); } else { SPDLOG_ERROR("Unexpected send-message-response: {}", invocation_context->response.DebugString()); } @@ -333,126 +332,127 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met case rmq::Code::ILLEGAL_TOPIC: { SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalTopic; + send_result.ec = ErrorCode::IllegalTopic; break; } case rmq::Code::ILLEGAL_MESSAGE_TAG: { SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalMessageTag; + send_result.ec = ErrorCode::IllegalMessageTag; break; } case rmq::Code::ILLEGAL_MESSAGE_KEY: { SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalMessageKey; + send_result.ec = ErrorCode::IllegalMessageKey; break; } case rmq::Code::ILLEGAL_MESSAGE_GROUP: { SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalMessageGroup; + send_result.ec = ErrorCode::IllegalMessageGroup; break; } case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: { SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalMessageProperty; + send_result.ec = ErrorCode::IllegalMessageProperty; break; } case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: { SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::MessagePropertiesTooLarge; + send_result.ec = ErrorCode::MessagePropertiesTooLarge; break; } case rmq::Code::MESSAGE_BODY_TOO_LARGE: { SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::MessageBodyTooLarge; + send_result.ec = ErrorCode::MessageBodyTooLarge; break; } case rmq::Code::TOPIC_NOT_FOUND: { SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::TopicNotFound; + send_result.ec = ErrorCode::TopicNotFound; break; } case rmq::Code::NOT_FOUND: { SPDLOG_WARN("NotFound: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::NotFound; + send_result.ec = ErrorCode::NotFound; break; } case rmq::Code::UNAUTHORIZED: { SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::Unauthorized; + send_result.ec = ErrorCode::Unauthorized; break; } - + case rmq::Code::FORBIDDEN: { SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::Forbidden; + send_result.ec = ErrorCode::Forbidden; break; } case rmq::Code::MESSAGE_CORRUPTED: { SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::MessageCorrupted; + send_result.ec = ErrorCode::MessageCorrupted; break; } case rmq::Code::TOO_MANY_REQUESTS: { SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::TooManyRequests; + send_result.ec = ErrorCode::TooManyRequests; break; } case rmq::Code::INTERNAL_SERVER_ERROR: { SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::InternalServerError; + send_result.ec = ErrorCode::InternalServerError; break; } case rmq::Code::HA_NOT_AVAILABLE: { SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::InternalServerError; + send_result.ec = ErrorCode::InternalServerError; break; } case rmq::Code::PROXY_TIMEOUT: { SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::GatewayTimeout; + send_result.ec = ErrorCode::GatewayTimeout; break; } case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: { SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::GatewayTimeout; + send_result.ec = ErrorCode::GatewayTimeout; break; } case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: { SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::GatewayTimeout; + send_result.ec = ErrorCode::GatewayTimeout; break; } case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: { - SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address, invocation_context->response.DebugString()); - ec = ErrorCode::MessagePropertyConflictWithType; + SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address, + invocation_context->response.DebugString()); + send_result.ec = ErrorCode::MessagePropertyConflictWithType; break; } default: { SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. Host={}", invocation_context->remote_address); - ec = ErrorCode::NotSupported; + send_result.ec = ErrorCode::NotSupported; break; } } - cb(ec, send_receipt); + cb(send_result); }; invocation_context->callback = completion_callback; @@ -470,7 +470,8 @@ std::shared_ptr ClientManagerImpl::createChannel(const std::strin std::vector> interceptor_factories; interceptor_factories.emplace_back(absl::make_unique()); auto channel = grpc::experimental::CreateCustomChannelWithInterceptors( - target_host, withSsl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_, std::move(interceptor_factories)); + target_host, with_ssl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_, + std::move(interceptor_factories)); return channel; } @@ -513,27 +514,28 @@ void ClientManagerImpl::cleanRpcClients() { rpc_clients_.clear(); } -SendReceipt ClientManagerImpl::processSendResponse(const rmq::MessageQueue& message_queue, - const SendMessageResponse& response, std::error_code& ec) { - SendReceipt send_receipt; +SendResult ClientManagerImpl::processSendResponse(const rmq::MessageQueue& message_queue, + const SendMessageResponse& response, + std::error_code& ec) { + SendResult send_result; switch (response.status().code()) { case rmq::Code::OK: { assert(response.entries_size() > 0); - send_receipt.message_id = response.entries().begin()->message_id(); - send_receipt.transaction_id = response.entries().begin()->transaction_id(); - return send_receipt; + send_result.message_id = response.entries().begin()->message_id(); + send_result.transaction_id = response.entries().begin()->transaction_id(); + return send_result; } case rmq::Code::ILLEGAL_TOPIC: { ec = ErrorCode::BadRequest; - return send_receipt; + return send_result; } default: { // TODO: handle other cases. break; } } - return send_receipt; + return send_result; } void ClientManagerImpl::addClientObserver(std::weak_ptr client) { @@ -541,8 +543,10 @@ void ClientManagerImpl::addClientObserver(std::weak_ptr client) { clients_.emplace_back(std::move(client)); } -void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metadata& metadata, - const QueryRouteRequest& request, std::chrono::milliseconds timeout, +void ClientManagerImpl::resolveRoute(const std::string& target_host, + const Metadata& metadata, + const QueryRouteRequest& request, + std::chrono::milliseconds timeout, const std::function& cb) { SPDLOG_DEBUG("Name server connection URL: {}", target_host); SPDLOG_DEBUG("Query route request: {}", request.DebugString()); @@ -646,7 +650,9 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metad } void ClientManagerImpl::queryAssignment( - const std::string& target, const Metadata& metadata, const QueryAssignmentRequest& request, + const std::string& target, + const Metadata& metadata, + const QueryAssignmentRequest& request, std::chrono::milliseconds timeout, const std::function& cb) { SPDLOG_DEBUG("Prepare to send query assignment request to broker[address={}]", target); @@ -748,8 +754,10 @@ void ClientManagerImpl::queryAssignment( client->asyncQueryAssignment(request, invocation_context); } -void ClientManagerImpl::receiveMessage(const std::string& target_host, const Metadata& metadata, - const ReceiveMessageRequest& request, std::chrono::milliseconds timeout, +void ClientManagerImpl::receiveMessage(const std::string& target_host, + const Metadata& metadata, + const ReceiveMessageRequest& request, + std::chrono::milliseconds timeout, ReceiveMessageCallback cb) { SPDLOG_DEBUG("Prepare to receive message from {} asynchronously. Request: {}", target_host, request.DebugString()); RpcClientSharedPtr client = getRpcClient(target_host); @@ -765,7 +773,6 @@ State ClientManagerImpl::state() const { } MessageConstSharedPtr ClientManagerImpl::wrapMessage(const rmq::Message& item) { - assert(item.topic().resource_namespace() == resource_namespace_); auto builder = Message::newBuilder(); // base @@ -955,8 +962,11 @@ SchedulerSharedPtr ClientManagerImpl::getScheduler() { return scheduler_; } -void ClientManagerImpl::ack(const std::string& target, const Metadata& metadata, const AckMessageRequest& request, - std::chrono::milliseconds timeout, const std::function& cb) { +void ClientManagerImpl::ack(const std::string& target, + const Metadata& metadata, + const AckMessageRequest& request, + std::chrono::milliseconds timeout, + const std::function& cb) { std::string target_host(target.data(), target.length()); SPDLOG_DEBUG("Prepare to ack message against {} asynchronously. AckMessageRequest: {}", target_host, request.DebugString()); @@ -1066,8 +1076,11 @@ void ClientManagerImpl::ack(const std::string& target, const Metadata& metadata, } void ClientManagerImpl::changeInvisibleDuration( - const std::string& target_host, const Metadata& metadata, const ChangeInvisibleDurationRequest& request, - std::chrono::milliseconds timeout, const std::function& completion_callback) { + const std::string& target_host, + const Metadata& metadata, + const ChangeInvisibleDurationRequest& request, + std::chrono::milliseconds timeout, + const std::function& completion_callback) { RpcClientSharedPtr client = getRpcClient(target_host); assert(client); auto invocation_context = new InvocationContext(); @@ -1133,7 +1146,7 @@ void ClientManagerImpl::changeInvisibleDuration( ec = ErrorCode::Forbidden; break; } - + case rmq::Code::INTERNAL_SERVER_ERROR: { SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address); ec = ErrorCode::InternalServerError; @@ -1159,7 +1172,9 @@ void ClientManagerImpl::changeInvisibleDuration( } void ClientManagerImpl::endTransaction( - const std::string& target_host, const Metadata& metadata, const EndTransactionRequest& request, + const std::string& target_host, + const Metadata& metadata, + const EndTransactionRequest& request, std::chrono::milliseconds timeout, const std::function& cb) { RpcClientSharedPtr client = getRpcClient(target_host); @@ -1339,7 +1354,7 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe ec = ErrorCode::ServiceUnavailable; break; } - + case rmq::Code::TOO_MANY_REQUESTS: { ec = ErrorCode::TooManyRequests; break; @@ -1362,7 +1377,8 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe client->asyncForwardMessageToDeadLetterQueue(request, invocation_context); } -std::error_code ClientManagerImpl::notifyClientTermination(const std::string& target_host, const Metadata& metadata, +std::error_code ClientManagerImpl::notifyClientTermination(const std::string& target_host, + const Metadata& metadata, const NotifyClientTerminationRequest& request, std::chrono::milliseconds timeout) { std::error_code ec; @@ -1446,4 +1462,4 @@ void ClientManagerImpl::submit(std::function task) { const char* ClientManagerImpl::HEARTBEAT_TASK_NAME = "heartbeat-task"; const char* ClientManagerImpl::STATS_TASK_NAME = "stats-task"; -ROCKETMQ_NAMESPACE_END \ No newline at end of file +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/client/SessionImpl.cpp b/cpp/source/client/SessionImpl.cpp index 0ca3fff23..364168296 100644 --- a/cpp/source/client/SessionImpl.cpp +++ b/cpp/source/client/SessionImpl.cpp @@ -16,12 +16,14 @@ */ #include "SessionImpl.h" + #include "rocketmq/Logger.h" #include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN -SessionImpl::SessionImpl(std::weak_ptr client, std::shared_ptr rpc_client) : client_(client), rpc_client_(rpc_client) { +SessionImpl::SessionImpl(std::weak_ptr client, std::shared_ptr rpc_client) + : client_(client), rpc_client_(rpc_client) { telemetry_ = rpc_client->asyncTelemetry(client_); syncSettings(); } @@ -39,8 +41,8 @@ void SessionImpl::syncSettings() { } SessionImpl::~SessionImpl() { - SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress()); telemetry_->fireClose(); + SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress()); } ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp index 6c5b2f93d..d75ac361d 100644 --- a/cpp/source/client/TelemetryBidiReactor.cpp +++ b/cpp/source/client/TelemetryBidiReactor.cpp @@ -22,21 +22,24 @@ #include #include "ClientManager.h" -#include "rocketmq/Logger.h" -#include "spdlog/spdlog.h" #include "MessageExt.h" #include "Metadata.h" #include "RpcClient.h" #include "Signature.h" #include "google/protobuf/util/time_util.h" +#include "rocketmq/Logger.h" +#include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr client, rmq::MessagingService::Stub* stub, std::string peer_address) - : client_(client), peer_address_(std::move(peer_address)), stream_state_(StreamState::Created) { - auto ptr = client.lock(); + : client_(client), + peer_address_(std::move(peer_address)), + read_state_(StreamState::Created), + write_state_(StreamState::Created) { + auto ptr = client_.lock(); auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1); context_.set_deadline(deadline); Metadata metadata; @@ -45,69 +48,126 @@ TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr client, context_.AddMetadata(entry.first, entry.second); } stub->async()->Telemetry(&context_, this); + write_state_ = StreamState::Ready; + // Increase hold for write stream. + AddHold(); StartCall(); } TelemetryBidiReactor::~TelemetryBidiReactor() { - SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", peer_address_, - static_cast(stream_state_)); + SPDLOG_INFO("Telemetry stream for {} destructed. ReadStreamState={}, WriteStreamState={}", peer_address_, + static_cast(read_state_), static_cast(read_state_)); } bool TelemetryBidiReactor::await() { - absl::MutexLock lk(&server_setting_received_mtx_); - if (server_setting_received_) { + absl::MutexLock lk(&state_mtx_); + if (StreamState::Created != write_state_) { return true; } - server_setting_received_cv_.Wait(&server_setting_received_mtx_); - return server_setting_received_; + state_cv_.Wait(&state_mtx_); + return StreamState::Error != write_state_; } void TelemetryBidiReactor::OnWriteDone(bool ok) { - SPDLOG_DEBUG("OnWriteDone: {}", ok); - - { - bool expect = true; - if (!command_inflight_.compare_exchange_strong(expect, false, std::memory_order_relaxed)) { - SPDLOG_WARN("Illegal command-inflight state"); - } - } + SPDLOG_DEBUG("{}#OnWriteDone", peer_address_); if (!ok) { - SPDLOG_WARN("Failed to write telemetry command {} to {}", write_.DebugString(), peer_address_); + RemoveHold(); { - absl::MutexLock lk(&stream_state_mtx_); - stream_state_ = StreamState::WriteDone; - } + absl::MutexLock lk(&state_mtx_); + SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().DebugString(), peer_address_); + write_state_ = StreamState::Error; + + // Sync read state. + switch (read_state_) { + case StreamState::Created: + case StreamState::Ready: { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Closed)); + read_state_ = StreamState::Closed; + break; + } + case StreamState::Inflight: { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Closing)); + read_state_ = StreamState::Closing; + break; + } + case StreamState::Closing: + case StreamState::Error: + case StreamState::Closed: { + break; + } + } - fireClose(); + state_cv_.SignalAll(); + } return; + } else { + absl::MutexLock lk(&state_mtx_); + if (StreamState::Inflight == write_state_) { + write_state_ = StreamState::Ready; + } } + // Check if the read stream has started. + fireRead(); + + // Remove the command that has been written to server. { - absl::MutexLock lk(&stream_state_mtx_); - if (StreamState::Created == stream_state_) { - stream_state_ = StreamState::Active; - fireRead(); - } + absl::MutexLock lk(&writes_mtx_); + writes_.pop_front(); } - fireWrite(); + tryWriteNext(); } void TelemetryBidiReactor::OnReadDone(bool ok) { - SPDLOG_DEBUG("OnReadDone: ok={}", ok); - if (!ok) { - if (client_.lock()) { - SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_); - } - - { - absl::MutexLock lk(&stream_state_mtx_); - stream_state_ = StreamState::ReadDone; + SPDLOG_DEBUG("{}#OnReadDone", peer_address_); + { + absl::MutexLock lk(&state_mtx_); + if (!ok) { + // Remove read hold. + RemoveHold(); + { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Error)); + read_state_ = StreamState::Error; + SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_); + + // Sync write state + switch (write_state_) { + case StreamState::Created: { + // Not reachable + break; + } + case StreamState::Ready: { + write_state_ = StreamState::Closed; + // There is no inflight write request, remove write hold on its behalf. + RemoveHold(); + state_cv_.SignalAll(); + break; + } + case StreamState::Inflight: { + write_state_ = StreamState::Closing; + break; + } + case StreamState::Closing: + case StreamState::Error: + case StreamState::Closed: { + break; + } + } + } + return; + } else if (StreamState::Closing == read_state_) { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Closed)); + read_state_ = StreamState::Closed; + state_cv_.SignalAll(); + return; } - fireClose(); - return; } SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString()); @@ -122,13 +182,6 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { auto settings = read_.settings(); SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString()); applySettings(settings); - { - absl::MutexLock lk(&server_setting_received_mtx_); - if (!server_setting_received_) { - server_setting_received_ = true; - server_setting_received_cv_.SignalAll(); - } - } break; } case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: { @@ -151,11 +204,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { TelemetryCommand response; response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce()); response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace is not supported"); - { - absl::MutexLock lk(&writes_mtx_); - writes_.push_back(response); - } - fireWrite(); + write(std::move(response)); break; } @@ -188,7 +237,18 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { } } - fireRead(); + { + absl::MutexLock lk(&state_mtx_); + if (StreamState::Inflight == read_state_) { + SPDLOG_DEBUG("Spawn new read op, read-state={}", static_cast(read_state_)); + StartRead(&read_); + } else if (read_state_ == StreamState::Closing) { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Closed)); + read_state_ = StreamState::Closed; + state_cv_.SignalAll(); + } + } } void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) { @@ -247,10 +307,33 @@ void TelemetryBidiReactor::applyBackoffPolicy(const rmq::Settings& settings, std } void TelemetryBidiReactor::applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr client) { + // The server may have implicitly assumed a namespace for the client. + if (!settings.publishing().topics().empty()) { + for (const auto& topic : settings.publishing().topics()) { + if (topic.resource_namespace() != client->config().resource_namespace) { + SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", client->config().resource_namespace, + topic.resource_namespace()); + client->config().resource_namespace = topic.resource_namespace(); + break; + } + } + } client->config().publisher.max_body_size = settings.publishing().max_body_size(); } void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr client) { + // The server may have implicitly assumed a namespace for the client. + if (!settings.subscription().subscriptions().empty()) { + for (const auto& subscription : settings.subscription().subscriptions()) { + if (subscription.topic().resource_namespace() != client->config().resource_namespace) { + SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", client->config().resource_namespace, + subscription.topic().resource_namespace()); + client->config().resource_namespace = subscription.topic().resource_namespace(); + break; + } + } + } + client->config().subscriber.fifo = settings.subscription().fifo(); auto polling_timeout = google::protobuf::util::TimeUtil::DurationToMilliseconds(settings.subscription().long_polling_timeout()); @@ -259,48 +342,144 @@ void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings } void TelemetryBidiReactor::fireRead() { - SPDLOG_DEBUG("{}#fireRead", peer_address_); + absl::MutexLock lk(&state_mtx_); + if (StreamState::Created != read_state_) { + SPDLOG_DEBUG("Further read from {} is not allowded due to stream-state={}", peer_address_, + static_cast(read_state_)); + return; + } + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Ready)); + read_state_ = StreamState::Ready; + AddHold(); + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Inflight)); + read_state_ = StreamState::Inflight; StartRead(&read_); } void TelemetryBidiReactor::write(TelemetryCommand command) { + SPDLOG_DEBUG("{}#write", peer_address_); + { + absl::MutexLock lk(&state_mtx_); + // Reject incoming write commands if the stream state is closing or has witnessed some error. + switch (write_state_) { + case StreamState::Closing: + case StreamState::Error: + case StreamState::Closed: + return; + default: + // no-op + break; + } + } + { absl::MutexLock lk(&writes_mtx_); writes_.push_back(command); } - fireWrite(); + tryWriteNext(); } -void TelemetryBidiReactor::fireWrite() { - SPDLOG_DEBUG("{}#fireWrite", peer_address_); +void TelemetryBidiReactor::tryWriteNext() { + SPDLOG_DEBUG("{}#tryWriteNext", peer_address_); + { + absl::MutexLock lk(&state_mtx_); + if (StreamState::Error == write_state_ || StreamState::Closed == write_state_) { + SPDLOG_WARN("Further write to {} is not allowded due to stream-state={}", peer_address_, + static_cast(write_state_)); + return; + } + } + { absl::MutexLock lk(&writes_mtx_); - if (writes_.empty()) { + if (writes_.empty() && StreamState::Closing != write_state_) { SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_); return; } - bool expect = false; - if (command_inflight_.compare_exchange_strong(expect, true, std::memory_order_relaxed)) { - write_ = std::move(*writes_.begin()); - writes_.erase(writes_.begin()); + if (StreamState::Ready == write_state_) { + write_state_ = StreamState::Inflight; + } + + if (writes_.empty()) { + // Tell server there is no more write requests. + StartWritesDone(); } else { - SPDLOG_DEBUG("Another command is already on the wire. Peer={}", peer_address_); - return; + SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().DebugString()); + StartWrite(&(writes_.front())); } } - SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, write_.DebugString()); - StartWrite(&write_); } void TelemetryBidiReactor::fireClose() { SPDLOG_INFO("{}#fireClose", peer_address_); - if (StreamState::Active == stream_state_) { - StartWritesDone(); - { - absl::MutexLock lk(&stream_state_mtx_); - if (StreamState::Active == stream_state_) { - stream_state_cv_.Wait(&stream_state_mtx_); + + { + // Acquire state lock + absl::MutexLock lk(&state_mtx_); + + // Transition read state + switch (read_state_) { + case StreamState::Created: + case StreamState::Ready: { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Closed)); + read_state_ = StreamState::Closed; + state_cv_.SignalAll(); + break; + } + + case StreamState::Inflight: { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Closing)); + read_state_ = StreamState::Closing; + break; + } + case StreamState::Closing: { + break; + } + case StreamState::Closed: + case StreamState::Error: { + state_cv_.SignalAll(); + break; + } + } + + // Transition write state + switch (write_state_) { + case StreamState::Created: + case StreamState::Ready: + case StreamState::Inflight: { + SPDLOG_DEBUG("Change write-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Closing)); + write_state_ = StreamState::Closing; + break; + } + case StreamState::Closing: { + break; + } + case StreamState::Closed: + case StreamState::Error: { + state_cv_.SignalAll(); + break; + } + } + } + + if (StreamState::Closing == write_state_) { + tryWriteNext(); + } + + { + // Acquire state lock + absl::MutexLock lk(&state_mtx_); + while ((StreamState::Closed != read_state_ && StreamState::Error != read_state_) || + (StreamState::Closed != write_state_ && StreamState::Error != write_state_)) { + if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) { + SPDLOG_WARN("StreamState CondVar timed out before getting signalled: read-state={}, write-state={}", + static_cast(read_state_), static_cast(write_state_)); } } } @@ -308,6 +487,20 @@ void TelemetryBidiReactor::fireClose() { void TelemetryBidiReactor::OnWritesDoneDone(bool ok) { SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_); + assert(StreamState::Closing == write_state_); + + absl::MutexLock lk(&state_mtx_); + // Remove the hold for the write stream. + RemoveHold(); + + if (!ok) { + write_state_ = StreamState::Error; + SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_); + } else { + write_state_ = StreamState::Closed; + SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_); + } + state_cv_.SignalAll(); } void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) { @@ -315,7 +508,7 @@ void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) { absl::MutexLock lk(&writes_mtx_); writes_.emplace_back(command); } - fireWrite(); + tryWriteNext(); } /// Notifies the application that all operations associated with this RPC @@ -334,9 +527,18 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& status) { { SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_); - absl::MutexLock lk(&stream_state_mtx_); - stream_state_ = StreamState::Closed; - stream_state_cv_.SignalAll(); + absl::MutexLock lk(&state_mtx_); + if (StreamState::Error != read_state_) { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Closed)); + read_state_ = StreamState::Closed; + } + if (StreamState::Error != write_state_) { + SPDLOG_DEBUG("Change write-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Closed)); + write_state_ = StreamState::Closed; + } + state_cv_.SignalAll(); } auto client = client_.lock(); @@ -349,4 +551,20 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& status) { } } -ROCKETMQ_NAMESPACE_END \ No newline at end of file +void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) { + SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_); + + if (!ok) { + absl::MutexLock lk(&state_mtx_); + SPDLOG_DEBUG("Change write-state {} --> {}", static_cast(read_state_), + static_cast(StreamState::Error)); + read_state_ = StreamState::Error; + state_cv_.SignalAll(); + SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_); + return; + } + + SPDLOG_DEBUG("Received initial metadata from {}", peer_address_); +} + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/client/include/ClientManager.h b/cpp/source/client/include/ClientManager.h index 56325fa4d..02b232b2f 100644 --- a/cpp/source/client/include/ClientManager.h +++ b/cpp/source/client/include/ClientManager.h @@ -22,14 +22,12 @@ #include #include "Client.h" -#include "MessageExt.h" #include "Metadata.h" #include "ReceiveMessageCallback.h" #include "RpcClient.h" #include "Scheduler.h" -#include "TelemetryBidiReactor.h" +#include "SendResultCallback.h" #include "TopicRouteData.h" -#include "rocketmq/SendCallback.h" #include "rocketmq/State.h" ROCKETMQ_NAMESPACE_BEGIN @@ -93,8 +91,10 @@ class ClientManager { virtual void receiveMessage(const std::string& target, const Metadata& metadata, const ReceiveMessageRequest& request, std::chrono::milliseconds timeout, ReceiveMessageCallback callback) = 0; - virtual bool send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request, - SendCallback cb) = 0; + virtual bool send(const std::string& target_host, + const Metadata& metadata, + SendMessageRequest& request, + SendResultCallback cb) = 0; virtual std::error_code notifyClientTermination(const std::string& target_host, const Metadata& metadata, const NotifyClientTerminationRequest& request, diff --git a/cpp/source/client/include/ClientManagerImpl.h b/cpp/source/client/include/ClientManagerImpl.h index 653fcad35..5f1b27cae 100644 --- a/cpp/source/client/include/ClientManagerImpl.h +++ b/cpp/source/client/include/ClientManagerImpl.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -29,18 +28,13 @@ #include "Client.h" #include "ClientManager.h" #include "InsecureCertificateVerifier.h" -#include "InvocationContext.h" #include "ReceiveMessageCallback.h" #include "RpcClientImpl.h" -#include "SchedulerImpl.h" -#include "SendMessageContext.h" -#include "TelemetryBidiReactor.h" #include "ThreadPoolImpl.h" #include "TopicRouteData.h" #include "absl/base/thread_annotations.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" -#include "absl/strings/string_view.h" #include "absl/synchronization/mutex.h" #include "rocketmq/State.h" @@ -54,7 +48,7 @@ class ClientManagerImpl : virtual public ClientManager, public std::enable_share * effectively. * @param resource_namespace Abstract resource namespace, in which this client manager lives. */ - explicit ClientManagerImpl(std::string resource_namespace, bool withSsl = true); + explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl = true); ~ClientManagerImpl() override; @@ -89,7 +83,7 @@ class ClientManagerImpl : virtual public ClientManager, public std::enable_share bool send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request, - SendCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_); + SendResultCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_); /** * Get a RpcClient according to the given target hosts, which follows scheme specified @@ -105,7 +99,7 @@ class ClientManagerImpl : virtual public ClientManager, public std::enable_share RpcClientSharedPtr getRpcClient(const std::string& target_host, bool need_heartbeat = true) override LOCKS_EXCLUDED(rpc_clients_mtx_); - static SendReceipt processSendResponse(const rmq::MessageQueue& message_queue, + static SendResult processSendResponse(const rmq::MessageQueue& message_queue, const SendMessageResponse& response, std::error_code& ec); @@ -242,7 +236,7 @@ class ClientManagerImpl : virtual public ClientManager, public std::enable_share grpc::ChannelArguments channel_arguments_; bool trace_{false}; - bool withSsl_; + bool with_ssl_; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/include/SendResult.h b/cpp/source/client/include/SendResult.h new file mode 100644 index 000000000..3596f61fc --- /dev/null +++ b/cpp/source/client/include/SendResult.h @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "rocketmq/RocketMQ.h" + +ROCKETMQ_NAMESPACE_BEGIN + +struct SendResult { + std::error_code ec; + std::string target; + + std::string message_id; + std::string transaction_id; +}; + +ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/include/SendResultCallback.h b/cpp/source/client/include/SendResultCallback.h new file mode 100644 index 000000000..182f649a1 --- /dev/null +++ b/cpp/source/client/include/SendResultCallback.h @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "SendResult.h" + +ROCKETMQ_NAMESPACE_BEGIN + +using SendResultCallback = std::function; + +ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/include/TelemetryBidiReactor.h b/cpp/source/client/include/TelemetryBidiReactor.h index 9fe65f31e..3bdbe3d35 100644 --- a/cpp/source/client/include/TelemetryBidiReactor.h +++ b/cpp/source/client/include/TelemetryBidiReactor.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -34,12 +35,29 @@ ROCKETMQ_NAMESPACE_BEGIN enum class StreamState : std::uint8_t { Created = 0, - Active = 1, - ReadDone = 2, - WriteDone = 3, + Ready = 1, + Inflight = 2, + Closing = 3, Closed = 4, + Error = 5, }; +/// write-stream-state: created --> ready --> inflight --> ready --> ... +/// --> error +/// --> closing --> closed +/// --> closing --> closed +/// --> error +/// +/// +/// read-stream-state: created --> ready --> inflight --> inflight +/// --> closing --> closed +/// --> error +/// --> closed +/// requirement: +/// 1, fireClose --> blocking await till bidireactor is closed; +/// 2, when session is closed and client is still active, recreate a new session to accept incoming commands from +/// server 3, after writing the first Settings telemetry command, launch the read directional stream +/// class TelemetryBidiReactor : public grpc::ClientBidiReactor, public std::enable_shared_from_this { public: @@ -47,24 +65,54 @@ class TelemetryBidiReactor : public grpc::ClientBidiReactor writes_ GUARDED_BY(writes_mtx_); + std::list writes_ GUARDED_BY(writes_mtx_); absl::Mutex writes_mtx_; /** - * @brief The command that is currently being written back to server. - */ - TelemetryCommand write_; - - /** - * @brief Each TelemetryBidiReactor belongs to a specific client as its owner. + * @brief Each TelemetryBidiReactor belongs to a specific client as its owner. */ std::weak_ptr client_; @@ -94,18 +140,12 @@ class TelemetryBidiReactor : public grpc::ClientBidiReactor client); void applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr client); + + /// Start the read stream. + /// + /// Once got the OnReadDone and status is OK, call StartRead immediately. + void fireRead(); + + /// Attempt to write pending telemetry command to server. + void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_); }; -ROCKETMQ_NAMESPACE_END \ No newline at end of file +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/exports.map b/cpp/source/exports.map new file mode 100644 index 000000000..99ae5d1d2 --- /dev/null +++ b/cpp/source/exports.map @@ -0,0 +1,6 @@ +{ + global: + *rocketmq*; + local: + *; +}; \ No newline at end of file diff --git a/cpp/source/rocketmq/FifoContext.cpp b/cpp/source/rocketmq/FifoContext.cpp new file mode 100644 index 000000000..1cf5bab21 --- /dev/null +++ b/cpp/source/rocketmq/FifoContext.cpp @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "FifoContext.h" + +#include "rocketmq/RocketMQ.h" + +ROCKETMQ_NAMESPACE_BEGIN + +FifoContext::FifoContext(MessageConstPtr message, SendCallback callback) + : message(std::move(message)), callback(callback) { +} + +FifoContext::FifoContext(FifoContext&& rhs) noexcept { + this->message = std::move(rhs.message); + this->callback = rhs.callback; +} + +ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/FifoProducer.cpp b/cpp/source/rocketmq/FifoProducer.cpp new file mode 100644 index 000000000..b52630b06 --- /dev/null +++ b/cpp/source/rocketmq/FifoProducer.cpp @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "rocketmq/FifoProducer.h" + +#include +#include + +#include "FifoProducerImpl.h" +#include "ProducerImpl.h" +#include "StaticNameServerResolver.h" +#include "rocketmq/Configuration.h" +#include "rocketmq/Message.h" +#include "rocketmq/RocketMQ.h" +#include "rocketmq/SendCallback.h" + +ROCKETMQ_NAMESPACE_BEGIN + +FifoProducerBuilder FifoProducer::newBuilder() { + return {}; +} + +FifoProducerBuilder::FifoProducerBuilder() : producer_impl_(std::make_shared()) { +} + +FifoProducerBuilder& FifoProducerBuilder::withConfiguration(Configuration configuration) { + auto name_server_resolver = std::make_shared(configuration.endpoints()); + producer_impl_->withNameServerResolver(std::move(name_server_resolver)); + producer_impl_->withCredentialsProvider(configuration.credentialsProvider()); + producer_impl_->withRequestTimeout(configuration.requestTimeout()); + producer_impl_->withSsl(configuration.withSsl()); + return *this; +} + +FifoProducerBuilder& FifoProducerBuilder::withTopics(const std::vector& topics) { + producer_impl_->withTopics(topics); + return *this; +} + +FifoProducerBuilder& FifoProducerBuilder::withConcurrency(std::size_t concurrency) { + this->impl_ = std::make_shared(producer_impl_, concurrency); + return *this; +} + +FifoProducer FifoProducerBuilder::build() { + FifoProducer fifo_producer(this->impl_); + fifo_producer.start(); + return fifo_producer; +} + +void FifoProducer::start() { + impl_->internalProducer()->start(); +} + +void FifoProducer::send(MessageConstPtr message, SendCallback callback) { + impl_->send(std::move(message), callback); +} + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/rocketmq/FifoProducerImpl.cpp b/cpp/source/rocketmq/FifoProducerImpl.cpp new file mode 100644 index 000000000..ad08c4d97 --- /dev/null +++ b/cpp/source/rocketmq/FifoProducerImpl.cpp @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "FifoProducerImpl.h" + +#include + +#include "FifoContext.h" +#include "rocketmq/Message.h" +#include "rocketmq/RocketMQ.h" +#include "rocketmq/SendCallback.h" + +ROCKETMQ_NAMESPACE_BEGIN + +void FifoProducerImpl::send(MessageConstPtr message, SendCallback callback) { + auto& group = message->group(); + std::size_t hash = hash_fn_(group); + std::size_t slot = hash % concurrency_; + + FifoContext context(std::move(message), callback); + partitions_[slot]->add(std::move(context)); +} + +ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp b/cpp/source/rocketmq/FifoProducerPartition.cpp new file mode 100644 index 000000000..8a2f06ff4 --- /dev/null +++ b/cpp/source/rocketmq/FifoProducerPartition.cpp @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "FifoProducerPartition.h" + +#include "absl/synchronization/mutex.h" + +#include +#include +#include + +#include "FifoContext.h" +#include "rocketmq/Message.h" +#include "rocketmq/RocketMQ.h" +#include "rocketmq/SendCallback.h" +#include "rocketmq/SendReceipt.h" +#include "spdlog/spdlog.h" + +ROCKETMQ_NAMESPACE_BEGIN + +void FifoProducerPartition::add(FifoContext&& context) { + { + absl::MutexLock lk(&messages_mtx_); + messages_.emplace_back(std::move(context)); + SPDLOG_DEBUG("{} has {} pending messages after #add", name_, messages_.size()); + } + + trySend(); +} + +void FifoProducerPartition::trySend() { + bool expected = false; + if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) { + absl::MutexLock lk(&messages_mtx_); + + if (messages_.empty()) { + SPDLOG_DEBUG("There is no more messages to send"); + return; + } + + FifoContext& ctx = messages_.front(); + MessageConstPtr message = std::move(ctx.message); + SendCallback send_callback = ctx.callback; + + std::shared_ptr partition = shared_from_this(); + auto fifo_callback = [=](const std::error_code& ec, const SendReceipt& receipt) mutable { + partition->onComplete(ec, receipt, send_callback); + }; + SPDLOG_DEBUG("Sending FIFO message from {}", name_); + producer_->send(std::move(message), fifo_callback); + messages_.pop_front(); + SPDLOG_DEBUG("In addition to the inflight one, there is {} messages pending in {}", messages_.size(), name_); + } else { + SPDLOG_DEBUG("There is an inflight message"); + } +} + +void FifoProducerPartition::onComplete(const std::error_code& ec, const SendReceipt& receipt, SendCallback& callback) { + if (ec) { + SPDLOG_INFO("{} completed with a failure: {}", name_, ec.message()); + } else { + SPDLOG_DEBUG("{} completed OK", name_); + } + + if (!ec) { + callback(ec, receipt); + // update inflight status + bool expected = true; + if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { + trySend(); + } else { + SPDLOG_ERROR("{}: Unexpected inflight status", name_); + } + return; + } + + // Put the message back to the front of the list + SendReceipt& receipt_mut = const_cast(receipt); + FifoContext retry_context(std::move(receipt_mut.message), callback); + { + absl::MutexLock lk(&messages_mtx_); + messages_.emplace_front(std::move(retry_context)); + } + + // Update inflight status + bool expected = true; + if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { + trySend(); + } else { + SPDLOG_ERROR("Unexpected inflight status"); + } +} + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp index 78d812edf..907d0a280 100644 --- a/cpp/source/rocketmq/Producer.cpp +++ b/cpp/source/rocketmq/Producer.cpp @@ -21,12 +21,8 @@ #include #include -#include "rocketmq/Logger.h" -#include "spdlog/spdlog.h" -#include "MixAll.h" #include "ProducerImpl.h" #include "StaticNameServerResolver.h" -#include "absl/strings/str_split.h" #include "rocketmq/ErrorCode.h" #include "rocketmq/SendReceipt.h" #include "rocketmq/Transaction.h" diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp index 73130161c..34c5b29c6 100644 --- a/cpp/source/rocketmq/ProducerImpl.cpp +++ b/cpp/source/rocketmq/ProducerImpl.cpp @@ -17,38 +17,27 @@ #include "ProducerImpl.h" #include -#include - #include #include #include -#include #include #include #include -#include "Client.h" -#include "MessageGroupQueueSelector.h" -#include "MetadataConstants.h" +#include "apache/rocketmq/v2/definition.pb.h" #include "MixAll.h" #include "Protocol.h" #include "PublishInfoCallback.h" -#include "RpcClient.h" #include "SendContext.h" -#include "SendMessageContext.h" #include "Signature.h" -#include "Tag.h" #include "TracingUtility.h" #include "TransactionImpl.h" -#include "UniqueIdGenerator.h" #include "UtilAll.h" -#include "absl/strings/str_join.h" #include "opencensus/trace/propagation/trace_context.h" #include "opencensus/trace/span.h" #include "rocketmq/ErrorCode.h" #include "rocketmq/Message.h" #include "rocketmq/SendReceipt.h" -#include "rocketmq/Tracing.h" #include "rocketmq/Transaction.h" #include "rocketmq/TransactionChecker.h" @@ -203,19 +192,30 @@ void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageReq SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) noexcept { ensureRunning(ec); if (ec) { - return {}; + SPDLOG_WARN("Producer is not running"); + SendReceipt send_receipt{}; + send_receipt.message = std::move(message); + return send_receipt; } auto topic_publish_info = getPublishInfo(message->topic()); if (!topic_publish_info) { + SPDLOG_WARN("Route of topic[{}] is not found", message->topic()); ec = ErrorCode::NotFound; - return {}; + SendReceipt send_receipt{}; + send_receipt.message = std::move(message); + return send_receipt; } std::vector message_queue_list; - if (!topic_publish_info->selectMessageQueues(absl::make_optional(), message_queue_list)) { + // null_opt_t + absl::optional message_group{}; + if (!topic_publish_info->selectMessageQueues(message_group, message_queue_list)) { + SPDLOG_WARN("Failed to select an addressable message queue for topic[{}]", message->topic()); ec = ErrorCode::NotFound; - return {}; + SendReceipt send_receipt{}; + send_receipt.message = std::move(message); + return send_receipt; } auto mtx = std::make_shared(); @@ -224,9 +224,10 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) noe SendReceipt send_receipt; // Define callback - auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) { + auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) mutable { ec = code; - send_receipt = receipt; + SendReceipt& receipt_mut = const_cast(receipt); + send_receipt.message = std::move(receipt_mut.message); { absl::MutexLock lk(mtx.get()); completed = true; @@ -251,6 +252,7 @@ void ProducerImpl::send(MessageConstPtr message, SendCallback cb) { ensureRunning(ec); if (ec) { SendReceipt send_receipt; + send_receipt.message = std::move(message); cb(ec, send_receipt); } @@ -264,6 +266,7 @@ void ProducerImpl::send(MessageConstPtr message, SendCallback cb) { // No route entries of the given topic is available if (ec) { SendReceipt send_receipt; + send_receipt.message = std::move(ptr); cb(ec, send_receipt); return; } @@ -271,6 +274,7 @@ void ProducerImpl::send(MessageConstPtr message, SendCallback cb) { if (!publish_info) { std::error_code ec = ErrorCode::NotFound; SendReceipt send_receipt; + send_receipt.message = std::move(ptr); cb(ec, send_receipt); return; } @@ -280,6 +284,7 @@ void ProducerImpl::send(MessageConstPtr message, SendCallback cb) { if (!publish_info->selectMessageQueues(ptr->group(), message_queue_list)) { std::error_code ec = ErrorCode::NotFound; SendReceipt send_receipt; + send_receipt.message = std::move(ptr); cb(ec, send_receipt); return; } @@ -338,12 +343,12 @@ void ProducerImpl::sendImpl(std::shared_ptr context) { Metadata metadata; Signature::sign(client_config_, metadata); - auto callback = [context](const std::error_code& ec, const SendReceipt& send_receipt) { - if (ec) { - context->onFailure(ec); + auto callback = [context](const SendResult& send_result) { + if (send_result.ec) { + context->onFailure(send_result.ec); return; } - context->onSuccess(send_receipt); + context->onSuccess(send_result); }; client_manager_->send(target, metadata, request, callback); @@ -354,12 +359,14 @@ void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, std::ve std::error_code ec; validate(*message, ec); if (ec) { + send_receipt.message = std::move(message); callback(ec, send_receipt); return; } if (list.empty()) { ec = ErrorCode::NotFound; + send_receipt.message = std::move(message); callback(ec, send_receipt); return; } diff --git a/cpp/source/rocketmq/SendContext.cpp b/cpp/source/rocketmq/SendContext.cpp index 385a1a99a..bd97384d8 100644 --- a/cpp/source/rocketmq/SendContext.cpp +++ b/cpp/source/rocketmq/SendContext.cpp @@ -21,41 +21,44 @@ #include "ProducerImpl.h" #include "PublishStats.h" #include "Tag.h" -#include "TransactionImpl.h" -#include "opencensus/trace/propagation/trace_context.h" #include "opencensus/trace/span.h" -#include "rocketmq/Logger.h" +#include "rocketmq/ErrorCode.h" #include "rocketmq/SendReceipt.h" #include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN -void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept { +void SendContext::onSuccess(const SendResult& send_result) noexcept { { // Mark end of send-message span. span_.SetStatus(opencensus::trace::StatusCode::OK); span_.End(); } - auto publisher = producer_.lock(); - if (!publisher) { + auto producer = producer_.lock(); + if (!producer) { + SPDLOG_WARN("Producer has been destructed"); return; } // Collect metrics { auto duration = std::chrono::steady_clock::now() - request_time_; - opencensus::stats::Record({{publisher->stats().latency(), MixAll::millisecondsOf(duration)}}, + opencensus::stats::Record({{producer->stats().latency(), MixAll::millisecondsOf(duration)}}, { {Tag::topicTag(), message_->topic()}, - {Tag::clientIdTag(), publisher->config().client_id}, + {Tag::clientIdTag(), producer->config().client_id}, {Tag::invocationStatusTag(), "success"}, }); } // send_receipt.traceContext(opencensus::trace::propagation::ToTraceParentHeader(span_.context())); - std::error_code ec; - callback_(ec, send_receipt); + SendReceipt send_receipt = {}; + send_receipt.target = send_result.target; + send_receipt.message_id = send_result.message_id; + send_receipt.transaction_id = send_result.transaction_id; + send_receipt.message = std::move(message_); + callback_(send_result.ec, send_receipt); } void SendContext::onFailure(const std::error_code& ec) noexcept { @@ -65,38 +68,36 @@ void SendContext::onFailure(const std::error_code& ec) noexcept { span_.End(); } - auto publisher = producer_.lock(); - if (!publisher) { + auto producer = producer_.lock(); + if (!producer) { + SPDLOG_WARN("Producer has been destructed"); return; } // Collect metrics { auto duration = std::chrono::steady_clock::now() - request_time_; - opencensus::stats::Record({{publisher->stats().latency(), MixAll::millisecondsOf(duration)}}, + opencensus::stats::Record({{producer->stats().latency(), MixAll::millisecondsOf(duration)}}, { {Tag::topicTag(), message_->topic()}, - {Tag::clientIdTag(), publisher->config().client_id}, + {Tag::clientIdTag(), producer->config().client_id}, {Tag::invocationStatusTag(), "failure"}, }); } - if (++attempt_times_ >= publisher->maxAttemptTimes()) { - SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", attempt_times_, publisher->maxAttemptTimes()); - callback_(ec, {}); - return; - } - - std::shared_ptr producer = producer_.lock(); - if (!producer) { - SPDLOG_WARN("Producer has been destructed"); - callback_(ec, {}); + if (++attempt_times_ >= producer->maxAttemptTimes()) { + SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", attempt_times_, producer->maxAttemptTimes()); + SendReceipt receipt{}; + receipt.message = std::move(message_); + callback_(ec, receipt); return; } if (candidates_.empty()) { SPDLOG_WARN("No alternative hosts to perform additional retries"); - callback_(ec, {}); + SendReceipt receipt{}; + receipt.message = std::move(message_); + callback_(ec, receipt); return; } @@ -106,7 +107,7 @@ void SendContext::onFailure(const std::error_code& ec) noexcept { auto ctx = shared_from_this(); // If publish message requests are throttled, retry after backoff if (ErrorCode::TooManyRequests == ec) { - auto&& backoff = publisher->backoff(attempt_times_); + auto&& backoff = producer->backoff(attempt_times_); SPDLOG_DEBUG("Publish message[topic={}, message-id={}] is throttled. Retry after {}ms", message_->topic(), message_->id(), MixAll::millisecondsOf(backoff)); auto retry_cb = [=]() { producer->sendImpl(ctx); }; diff --git a/cpp/source/rocketmq/include/ClientImpl.h b/cpp/source/rocketmq/include/ClientImpl.h index c266047a8..d7693962c 100644 --- a/cpp/source/rocketmq/include/ClientImpl.h +++ b/cpp/source/rocketmq/include/ClientImpl.h @@ -94,8 +94,8 @@ class ClientImpl : virtual public Client { client_config_.request_timeout = absl::FromChrono(request_timeout); } - void withSsl(bool enable) { - client_config_.withSsl = enable; + void withSsl(bool with_ssl) { + client_config_.withSsl = with_ssl; } /** diff --git a/cpp/source/rocketmq/include/FifoContext.h b/cpp/source/rocketmq/include/FifoContext.h new file mode 100644 index 000000000..2fc3492e9 --- /dev/null +++ b/cpp/source/rocketmq/include/FifoContext.h @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "rocketmq/Message.h" +#include "rocketmq/RocketMQ.h" +#include "rocketmq/SendCallback.h" + +ROCKETMQ_NAMESPACE_BEGIN + +struct FifoContext { + MessageConstPtr message; + SendCallback callback; + + FifoContext(MessageConstPtr message, SendCallback callback); + + FifoContext(FifoContext&& rhs) noexcept; +}; + +ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h b/cpp/source/rocketmq/include/FifoProducerImpl.h new file mode 100644 index 000000000..ffc9c8a68 --- /dev/null +++ b/cpp/source/rocketmq/include/FifoProducerImpl.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include "FifoProducerPartition.h" +#include "ProducerImpl.h" +#include "fmt/format.h" +#include "rocketmq/Message.h" +#include "rocketmq/SendCallback.h" + +ROCKETMQ_NAMESPACE_BEGIN + +class FifoProducerImpl : std::enable_shared_from_this { +public: + FifoProducerImpl(std::shared_ptr producer, std::size_t concurrency) + : producer_(producer), concurrency_(concurrency), partitions_(concurrency) { + for (auto i = 0; i < concurrency; i++) { + partitions_[i] = std::make_shared(producer_, fmt::format("slot-{}", i)); + } + }; + + void send(MessageConstPtr message, SendCallback callback); + + std::shared_ptr& internalProducer() { + return producer_; + } + +private: + std::shared_ptr producer_; + std::vector> partitions_; + std::size_t concurrency_; + std::hash hash_fn_; +}; + +ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h b/cpp/source/rocketmq/include/FifoProducerPartition.h new file mode 100644 index 000000000..8d0e00d81 --- /dev/null +++ b/cpp/source/rocketmq/include/FifoProducerPartition.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "absl/base/internal/thread_annotations.h" + +#include +#include +#include +#include + +#include "FifoContext.h" +#include "ProducerImpl.h" +#include "absl/base/thread_annotations.h" +#include "absl/synchronization/mutex.h" +#include "rocketmq/SendCallback.h" +#include "rocketmq/SendReceipt.h" + +ROCKETMQ_NAMESPACE_BEGIN + +class FifoProducerPartition : public std::enable_shared_from_this { +public: + FifoProducerPartition(std::shared_ptr producer, std::string&& name) + : producer_(producer), name_(std::move(name)) { + } + + void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_); + + void trySend() LOCKS_EXCLUDED(messages_mtx_); + + void onComplete(const std::error_code& ec, const SendReceipt& receipt, SendCallback& callback); + +private: + std::shared_ptr producer_; + std::list messages_ GUARDED_BY(messages_mtx_); + absl::Mutex messages_mtx_; + std::atomic_bool inflight_{false}; + std::string name_; +}; + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/rocketmq/include/ProducerImpl.h b/cpp/source/rocketmq/include/ProducerImpl.h index d7260a936..b572f20d6 100644 --- a/cpp/source/rocketmq/include/ProducerImpl.h +++ b/cpp/source/rocketmq/include/ProducerImpl.h @@ -16,28 +16,23 @@ */ #pragma once -#include #include -#include #include #include #include "ClientImpl.h" -#include "ClientManagerImpl.h" #include "MixAll.h" #include "PublishInfoCallback.h" +#include "PublishStats.h" #include "SendContext.h" #include "TopicPublishInfo.h" #include "TransactionImpl.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" -#include "absl/strings/string_view.h" #include "rocketmq/Message.h" #include "rocketmq/SendCallback.h" #include "rocketmq/SendReceipt.h" -#include "rocketmq/State.h" #include "rocketmq/TransactionChecker.h" -#include "PublishStats.h" ROCKETMQ_NAMESPACE_BEGIN @@ -53,8 +48,22 @@ class ProducerImpl : virtual public ClientImpl, public std::enable_shared_from_t void shutdown() override; + /** + * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during + * sent. + * + * Regardless of the send result, SendReceipt would have the std::unique_ptr, facilliating + * application to conduct customized retry policy. + */ SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept; + /** + * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during + * sent. + * + * Regardless of the send result, SendReceipt would have the std::unique_ptr, facilliating + * application to conduct customized retry policy. + */ void send(MessageConstPtr message, SendCallback callback); void setTransactionChecker(TransactionChecker checker); @@ -64,6 +73,13 @@ class ProducerImpl : virtual public ClientImpl, public std::enable_shared_from_t return absl::make_unique(producer); } + /** + * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during + * sent. + * + * TODO: Refine this API. Current API is not good enough as it cannot handle the message back to its caller on publish + * failure. + */ void send(MessageConstPtr message, std::error_code& ec, Transaction& transaction); /** diff --git a/cpp/source/rocketmq/include/SendContext.h b/cpp/source/rocketmq/include/SendContext.h index 4c05cebeb..4067532be 100644 --- a/cpp/source/rocketmq/include/SendContext.h +++ b/cpp/source/rocketmq/include/SendContext.h @@ -19,16 +19,12 @@ #include #include -#include "absl/container/flat_hash_map.h" -#include "absl/synchronization/mutex.h" -#include "opencensus/trace/span.h" - #include "Protocol.h" +#include "SendResult.h" #include "TransactionImpl.h" -#include "rocketmq/ErrorCode.h" +#include "opencensus/trace/span.h" #include "rocketmq/Message.h" #include "rocketmq/SendCallback.h" -#include "rocketmq/SendReceipt.h" ROCKETMQ_NAMESPACE_BEGIN @@ -47,7 +43,7 @@ class SendContext : public std::enable_shared_from_this { span_(opencensus::trace::Span::BlankSpan()) { } - void onSuccess(const SendReceipt& send_receipt) noexcept; + void onSuccess(const SendResult& send_result) noexcept; void onFailure(const std::error_code& ec) noexcept; diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h b/cpp/source/rocketmq/include/SimpleConsumerImpl.h index 45aa61b97..7fc63b6d4 100644 --- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h +++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h @@ -25,7 +25,7 @@ using namespace std::chrono; ROCKETMQ_NAMESPACE_BEGIN -class SimpleConsumerImpl : public ClientImpl, public std::enable_shared_from_this { +class SimpleConsumerImpl : virtual public ClientImpl, public std::enable_shared_from_this { public: SimpleConsumerImpl(std::string group); diff --git a/cpp/source/rocketmq/tests/BUILD.bazel b/cpp/source/rocketmq/tests/BUILD.bazel index a8d10e92b..74751c354 100644 --- a/cpp/source/rocketmq/tests/BUILD.bazel +++ b/cpp/source/rocketmq/tests/BUILD.bazel @@ -59,4 +59,12 @@ cc_test( "ConsumeMessageServiceTest.cpp", ], deps = base_deps, +) + +cc_test( + name = "optional_test", + srcs = [ + "OptionalTest.cpp", + ], + deps = base_deps ) \ No newline at end of file diff --git a/cpp/source/rocketmq/tests/OptionalTest.cpp b/cpp/source/rocketmq/tests/OptionalTest.cpp new file mode 100644 index 000000000..1266a73e6 --- /dev/null +++ b/cpp/source/rocketmq/tests/OptionalTest.cpp @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "gtest/gtest.h" +#include "absl/types/optional.h" +#include "rocketmq/RocketMQ.h" + +#include + +ROCKETMQ_NAMESPACE_BEGIN + +TEST(OptionalTest, test_optional) { + absl::optional opt{}; + ASSERT_EQ(false, opt.has_value()); + + auto opt2 = absl::make_optional(); + ASSERT_EQ(true, opt2.has_value()); +} + +ROCKETMQ_NAMESPACE_END + diff --git a/cpp/source/stats/MetricBidiReactor.cpp b/cpp/source/stats/MetricBidiReactor.cpp index 22363bd0c..e69213785 100644 --- a/cpp/source/stats/MetricBidiReactor.cpp +++ b/cpp/source/stats/MetricBidiReactor.cpp @@ -18,10 +18,10 @@ #include -#include "rocketmq/Logger.h" -#include "spdlog/spdlog.h" #include "OpencensusExporter.h" #include "Signature.h" +#include "rocketmq/Logger.h" +#include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN @@ -43,12 +43,15 @@ MetricBidiReactor::MetricBidiReactor(std::weak_ptr client, std::weak_ptr return; } exporter_ptr->stub()->async()->Export(&context_, this); + AddHold(); StartCall(); } void MetricBidiReactor::OnReadDone(bool ok) { if (!ok) { SPDLOG_WARN("Failed to read response"); + // match the AddHold() call in MetricBidiReactor::fireRead + RemoveHold(); return; } SPDLOG_DEBUG("OnReadDone OK"); @@ -56,16 +59,32 @@ void MetricBidiReactor::OnReadDone(bool ok) { } void MetricBidiReactor::OnWriteDone(bool ok) { + { + bool expected = true; + if (!inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { + SPDLOG_WARN("Illegal command-inflight state"); + return; + } + } + if (!ok) { SPDLOG_WARN("Failed to report metrics"); + // match AddHold() call in MetricBidiReactor::MetricBidiReactor + RemoveHold(); return; } SPDLOG_DEBUG("OnWriteDone OK"); + + // If the read stream has not started yet, start it now. fireRead(); - bool expected = true; - if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { - fireWrite(); + + // Remove the one that been written. + { + absl::MutexLock lk(&requests_mtx_); + requests_.pop_front(); } + + tryWriteNext(); } void MetricBidiReactor::OnDone(const grpc::Status& s) { @@ -89,13 +108,13 @@ void MetricBidiReactor::write(ExportMetricsServiceRequest request) { SPDLOG_DEBUG("Append ExportMetricsServiceRequest to buffer"); { absl::MutexLock lk(&requests_mtx_); - requests_.emplace_back(std::move(request)); + requests_.push_back(std::move(request)); } - fireWrite(); + tryWriteNext(); } -void MetricBidiReactor::fireWrite() { +void MetricBidiReactor::tryWriteNext() { { absl::MutexLock lk(&requests_mtx_); if (requests_.empty()) { @@ -107,16 +126,15 @@ void MetricBidiReactor::fireWrite() { bool expected = false; if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) { absl::MutexLock lk(&requests_mtx_); - request_ = std::move(*requests_.begin()); - requests_.erase(requests_.begin()); SPDLOG_DEBUG("MetricBidiReactor#StartWrite"); - StartWrite(&request_); + StartWrite(&(requests_.front())); } } void MetricBidiReactor::fireRead() { bool expected = false; if (read_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) { + AddHold(); StartRead(&response_); } } diff --git a/cpp/source/stats/include/MetricBidiReactor.h b/cpp/source/stats/include/MetricBidiReactor.h index 0a10cd882..e4d753444 100644 --- a/cpp/source/stats/include/MetricBidiReactor.h +++ b/cpp/source/stats/include/MetricBidiReactor.h @@ -16,6 +16,8 @@ */ #pragma once +#include + #include "Client.h" #include "grpcpp/grpcpp.h" #include "grpcpp/impl/codegen/client_callback.h" @@ -25,12 +27,17 @@ ROCKETMQ_NAMESPACE_BEGIN class OpencensusExporter; -using ExportMetricsServiceRequest = opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest; -using ExportMetricsServiceResponse = opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse; +using ExportMetricsServiceRequest = + opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest; +using ExportMetricsServiceResponse = + opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse; -class MetricBidiReactor : public grpc::ClientBidiReactor { +class MetricBidiReactor + : public grpc::ClientBidiReactor { public: - MetricBidiReactor(std::weak_ptr client, std::weak_ptr exporter); + MetricBidiReactor(std::weak_ptr client, + std::weak_ptr exporter); /// Notifies the application that a StartRead operation completed. /// @@ -52,7 +59,7 @@ class MetricBidiReactor : public grpc::ClientBidiReactor exporter_; grpc::ClientContext context_; - ExportMetricsServiceRequest request_; - - std::vector requests_ GUARDED_BY(requests_mtx_); + /// Pending ExportMetricsServiceRequest items to write to server + std::list requests_ GUARDED_BY(requests_mtx_); absl::Mutex requests_mtx_; std::atomic_bool inflight_{false}; @@ -71,7 +77,7 @@ class MetricBidiReactor : public grpc::ClientBidiReactor