diff --git a/vda5050_master/CMakeLists.txt b/vda5050_master/CMakeLists.txt new file mode 100644 index 0000000..57b6da7 --- /dev/null +++ b/vda5050_master/CMakeLists.txt @@ -0,0 +1,119 @@ +cmake_minimum_required(VERSION 3.8) +project(vda5050_master) + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +# uncomment the following section in order to fill in +# further dependencies manually. +# find_package( REQUIRED) + +find_package(rclcpp REQUIRED) +find_package(std_msgs REQUIRED) +find_package(vda5050_core REQUIRED) +find_package(vda5050_msgs REQUIRED) +find_package(vda5050_json_utils REQUIRED) +find_package(fmt REQUIRED) + +add_library(vda5050_master + src/agv/agv.cpp + src/vda5050_master/master.cpp + src/vda5050_master/communication/mqtt.cpp + src/vda5050_master/communication/ros2.cpp +) + +ament_target_dependencies(vda5050_master + vda5050_core + vda5050_msgs + vda5050_json_utils + fmt + rclcpp + std_msgs +) + +target_compile_definitions(vda5050_master PRIVATE ENABLE_ROS2) + +target_include_directories(vda5050_master PUBLIC + $ + $) +target_compile_features(vda5050_master PUBLIC c_std_99 cxx_std_17) # Require C99 and C++17 + + +install(TARGETS vda5050_master + DESTINATION lib/${PROJECT_NAME}) + +if(BUILD_TESTING) + find_package(ament_cmake_cppcheck REQUIRED) + ament_cppcheck() + + find_package(ament_cmake_cpplint REQUIRED) + set(cpplint_filters + "-whitespace/newline" + ) + ament_cpplint( + FILTERS "${cpplint_filters}" + ) + + find_package(ament_cmake_clang_format REQUIRED) + ament_clang_format( + CONFIG_FILE "${CMAKE_CURRENT_SOURCE_DIR}/../.clang-format" + ) + + find_package(ament_cmake_copyright REQUIRED) + ament_copyright() + + # find_package(ament_cmake_gtest REQUIRED) + # ament_add_gtest(test_mqtt_client + # test/integration/heartbeat.cpp + # test/integration/mqtt_communications.cpp + # ) + + find_package(ament_cmake_gmock REQUIRED) + ament_add_gmock(test_communications + test/communication/mqtt/communications.cpp + test/communication/ros2/communications.cpp + test/communication/vda5050/vda5050_mqtt.cpp + test/integration/heartbeat.cpp + ) + + target_link_libraries(test_communications vda5050_master) + + # AGV unit tests (no external dependencies like MQTT broker) + ament_add_gmock(test_agv + test/agv/agv_test.cpp + ) + target_link_libraries(test_agv vda5050_master) + + # ===== MQTT Test Broker Automation (Docker) ===== + # Setup: Start Docker MQTT broker before tests (with anonymous auth enabled) + add_test(NAME mqtt_broker_setup + COMMAND bash -c "docker run -d --name vda5050-mqtt-test -p 1884:1883 eclipse-mosquitto:latest mosquitto -c /mosquitto-no-auth.conf || docker start vda5050-mqtt-test" + ) + set_tests_properties(mqtt_broker_setup PROPERTIES + FIXTURES_SETUP mqtt_broker + TIMEOUT 30 + ) + + # Main test: Require MQTT broker fixture + set_tests_properties(test_communications PROPERTIES + FIXTURES_REQUIRED mqtt_broker + ENVIRONMENT "VDA5050_TEST_MQTT_BROKER=tcp://localhost:1884" + TIMEOUT 120 + ) + + # Cleanup: Stop and remove Docker container after tests + add_test(NAME mqtt_broker_cleanup + COMMAND bash -c "docker stop vda5050-mqtt-test && docker rm vda5050-mqtt-test || true" + ) + set_tests_properties(mqtt_broker_cleanup PROPERTIES + FIXTURES_CLEANUP mqtt_broker + TIMEOUT 30 + ) + # =========== End MQTT Broker Automation =========== + +endif() + +ament_package() diff --git a/vda5050_master/LICENSE b/vda5050_master/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/vda5050_master/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vda5050_master/include/vda5050_master/agv/agv.hpp b/vda5050_master/include/vda5050_master/agv/agv.hpp new file mode 100644 index 0000000..98b617c --- /dev/null +++ b/vda5050_master/include/vda5050_master/agv/agv.hpp @@ -0,0 +1,316 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef VDA5050_MASTER__AGV__AGV_HPP_ +#define VDA5050_MASTER__AGV__AGV_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "vda5050_master/communication/communication.hpp" +#include "vda5050_master/vda5050_interfaces.hpp" + +// Forward declaration +class VDA5050Master; + +/** + * @brief Represents an individual AGV managed by VDA5050Master + * + * This class encapsulates all per-AGV data including: + * - Identity information (manufacturer, serial number) + * - Communication instance + * - Cached VDA5050 messages (connection, state, factsheet, visualization) + * - Connection status and timestamps + * + * Thread safety: Methods are thread-safe. Cached data access is protected + * by a mutex. + */ +class AGV +{ +public: + // Type aliases + using Clock = std::chrono::steady_clock; + using TimePoint = std::chrono::time_point; + + // Callback types for VDA5050 message handling + using ConnectionCallback = std::function; + using StateCallback = + std::function; + using VisualizationCallback = std::function; + + // Default maximum queue size for outgoing messages + static constexpr size_t DEFAULT_MAX_QUEUE_SIZE = 10; + + /** + * @brief Construct an AGV instance + * @param manufacturer Manufacturer name + * @param serial_number Serial number + * @param communication Communication strategy for this AGV + * @param max_queue_size Maximum number of outgoing messages to queue (default: 10) + * @param drop_oldest If true, drop oldest message when queue full; if false, reject new message (default: true) + */ + AGV( + const std::string& manufacturer, const std::string& serial_number, + std::unique_ptr communication, + size_t max_queue_size = DEFAULT_MAX_QUEUE_SIZE, bool drop_oldest = true); + + /** + * @brief Destructor - stops the queue processing thread + */ + ~AGV(); + + // Non-copyable, non-movable (due to thread member) + AGV(const AGV&) = delete; + AGV& operator=(const AGV&) = delete; + AGV(AGV&&) = delete; + AGV& operator=(AGV&&) = delete; + + // ============================================================================ + // Identity + // ============================================================================ + + /** + * @brief Get the manufacturer name + */ + const std::string& get_manufacturer() const + { + return manufacturer_; + } + + /** + * @brief Get the serial number + */ + const std::string& get_serial_number() const + { + return serial_number_; + } + + /** + * @brief Get the AGV ID (manufacturer/serial_number) + */ + const std::string& get_agv_id() const + { + return agv_id_; + } + + // ============================================================================ + // Communication + // ============================================================================ + + /** + * @brief Connect the AGV's communication + */ + void connect(); + + /** + * @brief Disconnect the AGV's communication + */ + void disconnect(); + + /** + * @brief Check if the AGV is connected + */ + bool is_connected() const; + + /** + * @brief Setup VDA5050 topic subscriptions for this AGV + * @param on_connection Callback for connection messages + * @param on_state Callback for state messages + * @param on_visualization Callback for visualization messages (optional, cache always updated) + * + * Sets up subscriptions to all standard VDA5050 topics using the AGV's + * manufacturer and serial number. Received messages are parsed from JSON + * and cached in the AGV. User callbacks are invoked if provided. + */ + void setup_subscriptions( + ConnectionCallback on_connection, StateCallback on_state, + VisualizationCallback on_visualization = nullptr); + + // ============================================================================ + // Cached Messages (read-only access) + // ============================================================================ + + /** + * @brief Get the last received connection message + * @return Optional containing the message if received, nullopt otherwise + */ + std::optional get_last_connection() const; + + /** + * @brief Get the last received state message + * @return Optional containing the message if received, nullopt otherwise + */ + std::optional get_last_state() const; + + /** + * @brief Get the last received visualization message + * @return Optional containing the message if received, nullopt otherwise + */ + std::optional get_last_visualization() + const; + + // ============================================================================ + // Timestamps + // ============================================================================ + + /** + * @brief Get the time when the AGV was registered + */ + TimePoint get_registered_time() const + { + return registered_time_; + } + + /** + * @brief Get the time of the last received connection message + * @return Optional containing the timestamp if received, nullopt otherwise + */ + std::optional get_last_connection_time() const; + + /** + * @brief Get the time of the last received state message + * @return Optional containing the timestamp if received, nullopt otherwise + */ + std::optional get_last_state_time() const; + + /** + * @brief Get the time of the last received visualization message + * @return Optional containing the timestamp if received, nullopt otherwise + */ + std::optional get_last_visualization_time() const; + +private: + // VDA5050Master needs access to send/update methods + friend class VDA5050Master; + + // ============================================================================ + // Master-only methods (accessed via friend class) + // ============================================================================ + + /** + * @brief Queue an order to be sent to this AGV + * @param order The order message + * @return true if queued successfully, false if queue is full + * + * Messages are processed FIFO by a background thread. + */ + bool send_order(const vda5050_msgs::msg::Order& order); + + /** + * @brief Queue instant actions to be sent to this AGV + * @param actions The instant actions message + * @return true if queued successfully, false if queue is full + * + * Messages are processed FIFO by a background thread. + */ + bool send_instant_actions(const vda5050_msgs::msg::InstantActions& actions); + + /** + * @brief Update the cached connection message + * @param msg The connection message + */ + void update_connection(const vda5050_msgs::msg::Connection& msg); + + /** + * @brief Update the cached state message + * @param msg The state message + */ + void update_state(const vda5050_msgs::msg::State& msg); + + /** + * @brief Update the cached visualization message + * @param msg The visualization message + */ + void update_visualization(const vda5050_msgs::msg::Visualization& msg); + + // ============================================================================ + // Helper methods + // ============================================================================ + + // Helper methods for building VDA5050 topic paths + std::string build_topic(const std::string& topic_name) const; + + // Internal message handlers that parse JSON, update cache, and invoke callbacks + void handle_connection_message( + const std::string& payload, const ConnectionCallback& callback); + void handle_state_message( + const std::string& payload, const StateCallback& callback); + void handle_visualization_message( + const std::string& payload, const VisualizationCallback& callback); + + // Identity + std::string manufacturer_; + std::string serial_number_; + std::string agv_id_; + + // Communication + std::unique_ptr communication_; + + // Timestamps + TimePoint registered_time_; + + // Cached messages and timestamps (protected by mutex) + mutable std::mutex data_mutex_; + + std::optional last_connection_; + std::optional last_connection_time_; + + std::optional last_state_; + std::optional last_state_time_; + + std::optional last_visualization_; + std::optional last_visualization_time_; + + // ============================================================================ + // Outgoing message queues + // ============================================================================ + + // Queue processing thread function + void process_queues(); + + // Sends messages immediately (called by queue processor) + void send_order_now(const vda5050_msgs::msg::Order& order); + void send_instant_actions_now( + const vda5050_msgs::msg::InstantActions& actions); + + // Queue configuration + size_t max_queue_size_; + bool drop_oldest_; + + // Queue state (protected by queue_mutex_) + std::mutex queue_mutex_; + std::condition_variable queue_cv_; + std::queue order_queue_; + std::queue instant_actions_queue_; + + // Queue processing thread + std::atomic stop_processing_{false}; + std::thread queue_thread_; +}; + +#endif // VDA5050_MASTER__AGV__AGV_HPP_ diff --git a/vda5050_master/include/vda5050_master/communication/communication.hpp b/vda5050_master/include/vda5050_master/communication/communication.hpp new file mode 100644 index 0000000..e1b5fc2 --- /dev/null +++ b/vda5050_master/include/vda5050_master/communication/communication.hpp @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef VDA5050_MASTER__COMMUNICATION__COMMUNICATION_HPP_ +#define VDA5050_MASTER__COMMUNICATION__COMMUNICATION_HPP_ + +#include +#include +#include +#include +#include + +/** + * @brief Protocol-agnostic callback signature + * All protocols (MQTT, ROS2, etc.) must adapt their native callbacks to this format + * @param topic The topic the message was received on + * @param payload The message payload as a JSON string + */ +using MessageCallback = + std::function; + +/** + * @brief Connection lifecycle states + * + * State transitions: + * DISCONNECTED -> CONNECTED (via connect()) + * CONNECTED -> DISCONNECTING (via disconnect()) + * DISCONNECTING -> DISCONNECTED (when cleanup completes) + */ +enum class ConnectionState +{ + DISCONNECTED, // Not connected, safe to destroy or reconnect + CONNECTED, // Actively connected, can send/receive messages + DISCONNECTING // Disconnect in progress, cleanup ongoing +}; + +class ICommunicationStrategy +{ +public: + virtual ~ICommunicationStrategy() = default; + + /** + * @brief Subscribe to a topic with a callback + * @param topic Topic to subscribe to + * @param callback Function to call when messages arrive + * @param qos Quality of Service level + * + * The callback will be invoked immediately when messages arrive. + * Protocol implementations must adapt their native callbacks to match + * the MessageCallback signature. + */ + virtual void subscribe( + const std::string& topic, MessageCallback callback, const int qos = 0) = 0; + + /** + * @brief Unsubscribe from a topic + * @param topic Topic to unsubscribe from + * + * Removes the subscription and associated callback for the given topic. + */ + virtual void unsubscribe(const std::string& topic) = 0; + + virtual void connect() = 0; + virtual void disconnect() = 0; + virtual void send_message( + const std::string& topic, const std::string& message, + const int qos = 0) = 0; + + /** + * @brief Get the current connection state + * @return ConnectionState enum value + */ + virtual ConnectionState get_state() const = 0; +}; +#endif // VDA5050_MASTER__COMMUNICATION__COMMUNICATION_HPP_ diff --git a/vda5050_master/include/vda5050_master/communication/heartbeat.hpp b/vda5050_master/include/vda5050_master/communication/heartbeat.hpp new file mode 100644 index 0000000..b5fa000 --- /dev/null +++ b/vda5050_master/include/vda5050_master/communication/heartbeat.hpp @@ -0,0 +1,250 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include "vda5050_core/logger/logger.hpp" + +#ifndef VDA5050_MASTER__COMMUNICATION__HEARTBEAT_HPP_ +#define VDA5050_MASTER__COMMUNICATION__HEARTBEAT_HPP_ + +namespace vda5050_master { +namespace communication { + +/** + * @brief Heartbeat listener lifecycle states + * + * State transitions: + * STOPPED -> RUNNING (via start_connection_heartbeat()) + * RUNNING -> STOPPING (via stop_connection_heartbeat()) + * STOPPING -> STOPPED (when cleanup completes) + */ +enum class HeartbeatState +{ + STOPPED, // Not running, safe to destroy or restart + RUNNING, // Actively monitoring heartbeats + STOPPING // Stop in progress, cleanup ongoing +}; + +class ConnectionHeartbeatListener +{ +public: + ConnectionHeartbeatListener( + const std::string& id, const int heartbeat_interval, + std::function disconnection_callback) + : id_(id), + heartbeat_interval_(heartbeat_interval), + state_(HeartbeatState::STOPPED), + last_connection_report_(std::chrono::steady_clock::now()), + disconnection_callback_(disconnection_callback) + { + // Nothing to do here ... + } + + void start_connection_heartbeat() + { + { + std::lock_guard lock(state_mutex_); + VDA5050_INFO("Starting Connection heartbeat listener"); + state_ = HeartbeatState::RUNNING; + } + connection_thread_ = + std::thread(&ConnectionHeartbeatListener::listen, this); + } + + void received_connection() + { + // Check state with proper synchronization + if (get_state() != HeartbeatState::RUNNING) + { + VDA5050_DEBUG("Connection heartbeat not running, ignored..."); + return; + } + std::lock_guard lock(last_connection_report_mutex_); + last_connection_report_ = get_current_time(); + VDA5050_INFO("[" + id_ + "] Received connection heartbeat"); + message_received_.notify_all(); + } + + std::chrono::steady_clock::time_point get_last_connection_report() + { + std::lock_guard lock(last_connection_report_mutex_); + return last_connection_report_; + } + + ~ConnectionHeartbeatListener() + { + stop_connection_heartbeat(); + VDA5050_INFO("[" + id_ + "] Deconstructing ConnectionHeartbeatListener"); + } + + /** + * @brief Get the current heartbeat listener state + * @return HeartbeatState enum value + */ + HeartbeatState get_state() + { + std::lock_guard lock(state_mutex_); + return state_; + } + + // Stop the listener + void stop_connection_heartbeat() + { + VDA5050_INFO("Stopping Connection heartbeat listener"); + + { + std::lock_guard lock(state_mutex_); + state_ = HeartbeatState::STOPPING; + } + + message_received_.notify_all(); + + if (connection_thread_.joinable()) + { + connection_thread_.join(); + } + else + { + VDA5050_INFO("Connection thread not joinable"); + } + + if (callback_thread_.joinable()) + { + callback_thread_.join(); + } + + { + std::lock_guard lock(state_mutex_); + state_ = HeartbeatState::STOPPED; + } + + VDA5050_INFO("Stopped Connection heartbeat listener"); + } + + virtual std::chrono::steady_clock::time_point get_current_time() + { + return std::chrono::steady_clock::now(); + } + + // Virtual method to allow overriding during tests. + virtual int get_check_interval() + { + return heartbeat_interval_; + } + +protected: + std::condition_variable message_received_; + +private: + bool is_stop_requested() + { + std::lock_guard lock(state_mutex_); + return state_ == HeartbeatState::STOPPING; + } + + bool is_timeout() + { + std::chrono::steady_clock::time_point current_time = get_current_time(); + int time_since_last_connection_report; + { + std::lock_guard lock(last_connection_report_mutex_); + time_since_last_connection_report = + std::chrono::duration_cast( + current_time - last_connection_report_) + .count(); + } + + if (std::abs(time_since_last_connection_report) > heartbeat_interval_) + { + VDA5050_WARN( + "[" + id_ + "] Connection heartbeat timeout after " + + std::to_string(time_since_last_connection_report) + " seconds " + + "(max: " + std::to_string(heartbeat_interval_) + "s)"); + return true; + } + return false; + } + + // Listener loop + + void listen() + { + while (!is_stop_requested()) + { + std::unique_lock lock(check_lock_); + message_received_.wait_for( + lock, std::chrono::seconds(get_check_interval())); + + // Check if shutdown was requested while waiting + if (is_stop_requested()) + { + VDA5050_DEBUG("[" + id_ + "] Shutdown requested, exiting listen loop"); + return; + } + + if (is_timeout()) + { + // Copy callback by value to ensure safe invocation + auto callback_copy = disconnection_callback_; + callback_thread_ = std::thread([callback_copy]() { callback_copy(); }); + if (callback_thread_.joinable()) + { + callback_thread_.join(); + } + VDA5050_INFO( + "[" + id_ + "] Heartbeat monitoring stopped after timeout"); + return; + } + } + } + + std::string id_; + + std::thread connection_thread_; + + std::thread callback_thread_; + + int heartbeat_interval_; + + // Lifecycle state protected by state_mutex_ + HeartbeatState state_; + + std::chrono::steady_clock::time_point last_connection_report_; + + // Mutex for state variable + mutable std::mutex state_mutex_; + + // Mutex to protect access to last_connection_report_ + mutable std::mutex last_connection_report_mutex_; + + // Mutex for condition variable wait + std::mutex check_lock_; + + std::function disconnection_callback_; +}; + +} // namespace communication +} // namespace vda5050_master + +#endif // VDA5050_MASTER__COMMUNICATION__HEARTBEAT_HPP_ diff --git a/vda5050_master/include/vda5050_master/communication/mqtt.hpp b/vda5050_master/include/vda5050_master/communication/mqtt.hpp new file mode 100644 index 0000000..bc06798 --- /dev/null +++ b/vda5050_master/include/vda5050_master/communication/mqtt.hpp @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef VDA5050_MASTER__COMMUNICATION__MQTT_HPP_ +#define VDA5050_MASTER__COMMUNICATION__MQTT_HPP_ + +#include +#include +#include + +#include "vda5050_core/mqtt_client/mqtt_client_interface.hpp" +#include "vda5050_master/communication/communication.hpp" + +class MqttCommunication : public ICommunicationStrategy +{ +public: + MqttCommunication(const std::string& endpoint, const std::string& id); + + void subscribe( + const std::string& topic, MessageCallback callback, + const int qos = 0) override; + + void unsubscribe(const std::string& topic) override; + + void connect() override; + + void disconnect() override; + + ConnectionState get_state() const override; + + void send_message( + const std::string& topic, const std::string& message, + const int qos = 0) override; + +private: + std::shared_ptr mqtt_client_; + std::string endpoint_; + std::string id_; + ConnectionState state_{ConnectionState::DISCONNECTED}; + mutable std::mutex state_mutex_; +}; + +#endif // VDA5050_MASTER__COMMUNICATION__MQTT_HPP_ diff --git a/vda5050_master/include/vda5050_master/communication/ros2.hpp b/vda5050_master/include/vda5050_master/communication/ros2.hpp new file mode 100644 index 0000000..37d7102 --- /dev/null +++ b/vda5050_master/include/vda5050_master/communication/ros2.hpp @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef VDA5050_MASTER__COMMUNICATION__ROS2_HPP_ +#define VDA5050_MASTER__COMMUNICATION__ROS2_HPP_ + +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/string.hpp" +#include "vda5050_master/communication/communication.hpp" + +class Ros2Communication : public ICommunicationStrategy +{ +public: + Ros2Communication( + std::shared_ptr node, const std::string& node_name); + + void subscribe( + const std::string& topic, MessageCallback callback, + const int qos = 0) override; + + void unsubscribe(const std::string& topic) override; + + void connect() override; + + void disconnect() override; + + void send_message( + const std::string& topic, const std::string& message, + const int qos = 0) override; + + ConnectionState get_state() const override; + +private: + std::shared_ptr> + get_or_create_publisher(const std::string& topic, int qos); + + rclcpp::QoS convert_qos(int vda5050_qos); + + std::shared_ptr node_; + std::string node_name_; + ConnectionState state_{ConnectionState::DISCONNECTED}; + mutable std::mutex state_mutex_; + + // Store subscriptions and publishers to keep them alive + std::unordered_map< + std::string, rclcpp::Subscription::SharedPtr> + subscriptions_; + std::unordered_map< + std::string, rclcpp::Publisher::SharedPtr> + publishers_; + std::mutex publisher_mutex_; + std::mutex subscriber_mutex_; +}; + +#endif // VDA5050_MASTER__COMMUNICATION__ROS2_HPP_ diff --git a/vda5050_master/include/vda5050_master/standard_names.hpp b/vda5050_master/include/vda5050_master/standard_names.hpp new file mode 100644 index 0000000..f113561 --- /dev/null +++ b/vda5050_master/include/vda5050_master/standard_names.hpp @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef VDA5050_MASTER__STANDARD_NAMES_HPP_ +#define VDA5050_MASTER__STANDARD_NAMES_HPP_ + +#include + +namespace vda5050_master { +const std::string Version = "v2"; // NOLINT +const std::string InterfaceName = "rmf2"; // NOLINT +const std::string ConnectionTopic = "connection"; // NOLINT +const std::string FactsheetTopic = "factsheet"; // NOLINT +const std::string OrderTopic = "order"; // NOLINT +const std::string StateTopic = "state"; // NOLINT +const std::string InstantActionsTopic = "instant_actions"; // NOLINT +const std::string VisualizationTopic = "visualization"; // NOLINT + +const int ConnectionQos = 1; +const int FactsheetQos = 0; +const int OrderQos = 1; +const int StateQos = 1; +const int VisualizationQos = 0; +const int InstantActionsQos = 1; + +const int ConnectionHeartbeatInterval = 15; // seconds + +} // namespace vda5050_master + +#endif // VDA5050_MASTER__STANDARD_NAMES_HPP_ diff --git a/vda5050_master/include/vda5050_master/vda5050_interfaces.hpp b/vda5050_master/include/vda5050_master/vda5050_interfaces.hpp new file mode 100644 index 0000000..508323d --- /dev/null +++ b/vda5050_master/include/vda5050_master/vda5050_interfaces.hpp @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef VDA5050_MASTER__VDA5050_INTERFACES_HPP_ +#define VDA5050_MASTER__VDA5050_INTERFACES_HPP_ + +#include "vda5050_msgs/msg/action.hpp" +#include "vda5050_msgs/msg/action_parameter.hpp" +#include "vda5050_msgs/msg/action_state.hpp" +#include "vda5050_msgs/msg/agv_position.hpp" +#include "vda5050_msgs/msg/battery_state.hpp" +#include "vda5050_msgs/msg/bounding_box_reference.hpp" +#include "vda5050_msgs/msg/connection.hpp" +#include "vda5050_msgs/msg/control_point.hpp" +#include "vda5050_msgs/msg/edge.hpp" +#include "vda5050_msgs/msg/edge_state.hpp" +#include "vda5050_msgs/msg/error.hpp" +#include "vda5050_msgs/msg/error_reference.hpp" +#include "vda5050_msgs/msg/header.hpp" +#include "vda5050_msgs/msg/info.hpp" +#include "vda5050_msgs/msg/info_reference.hpp" +#include "vda5050_msgs/msg/instant_actions.hpp" +#include "vda5050_msgs/msg/load.hpp" +#include "vda5050_msgs/msg/load_dimensions.hpp" +#include "vda5050_msgs/msg/node.hpp" +#include "vda5050_msgs/msg/node_position.hpp" +#include "vda5050_msgs/msg/node_state.hpp" +#include "vda5050_msgs/msg/order.hpp" +#include "vda5050_msgs/msg/safety_state.hpp" +#include "vda5050_msgs/msg/state.hpp" +#include "vda5050_msgs/msg/trajectory.hpp" +#include "vda5050_msgs/msg/velocity.hpp" +#include "vda5050_msgs/msg/visualization.hpp" + +#endif // VDA5050_MASTER__VDA5050_INTERFACES_HPP_ diff --git a/vda5050_master/include/vda5050_master/vda5050_master/master.hpp b/vda5050_master/include/vda5050_master/vda5050_master/master.hpp new file mode 100644 index 0000000..68ff821 --- /dev/null +++ b/vda5050_master/include/vda5050_master/vda5050_master/master.hpp @@ -0,0 +1,206 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef VDA5050_MASTER__VDA5050_MASTER__MASTER_HPP_ +#define VDA5050_MASTER__VDA5050_MASTER__MASTER_HPP_ + +#include +#include +#include +#include +#include + +#include "vda5050_master/agv/agv.hpp" +#include "vda5050_master/communication/communication.hpp" +#include "vda5050_master/vda5050_interfaces.hpp" + +/** + * @brief Factory function type for creating communication strategies + * + * The factory takes an AGV identifier (used for client naming) and returns + * a new ICommunicationStrategy instance. Each ICommunicationStrategy + * implementation handles its own configuration internally. + */ +using CommunicationFactory = + std::function( + const std::string& agv_id)>; + +/** + * @brief VDA5050 Master for multi-AGV fleet management + * + * This abstract base class manages VDA5050 communication for multiple AGVs. + * Each registered AGV gets its own communication instance. Users should + * inherit from this class and override the virtual callback methods to + * handle incoming VDA5050 messages. + * + * Features: + * - AGV registration with automatic communication setup + * - Per-AGV communication isolation + * - Message routing with AGV identification + * - Order and instant action publishing to specific AGVs + * + * Topic structure: {interfaceName}/{majorVersion}/{manufacturer}/{serialNumber}/{topic} + * Example: rmf2/v2/MyManufacturer/AGV001/state + * + * Thread safety note: Callbacks are invoked on the communication thread. + * If thread safety is required, the callback implementation should handle + * synchronization (e.g., using a mutex). + */ +class VDA5050Master +{ +public: + /** + * @brief Construct a VDA5050 master with a communication factory + * @param factory Factory function that creates ICommunicationStrategy instances + * + * The factory is called for each AGV when register_agv() is invoked. + * Each ICommunicationStrategy implementation handles its own configuration. + * + * Example usage with MQTT: + * @code + * auto factory = [broker](const std::string& agv_id) { + * return std::make_unique(broker, agv_id); + * }; + * MyMaster master(factory); + * @endcode + * + * Example usage with ROS2: + * @code + * auto factory = [node](const std::string& agv_id) { + * return std::make_unique(node, agv_id); + * }; + * MyMaster master(factory); + * @endcode + */ + explicit VDA5050Master(CommunicationFactory factory); + + /** + * @brief Virtual destructor for inheritance + */ + virtual ~VDA5050Master() = default; + + /** + * @brief Register an AGV for VDA5050 communication + * @param manufacturer Manufacturer name + * @param serial_number Serial number + * @param max_queue_size Maximum number of outgoing messages to queue (default: 10) + * @param drop_oldest If true, drop oldest message when queue full; if false, reject new message (default: true) + * + * Creates a dedicated communication instance for this AGV and subscribes + * to all VDA5050 topics with auto-constructed topic paths per VDA5050 spec. + */ + void register_agv( + const std::string& manufacturer, const std::string& serial_number, + size_t max_queue_size = AGV::DEFAULT_MAX_QUEUE_SIZE, + bool drop_oldest = true); + + /** + * @brief Unregister an AGV + * @param manufacturer Manufacturer name + * @param serial_number Serial number + * + * Disconnects and removes the AGV's communication instance. + */ + void unregister_agv( + const std::string& manufacturer, const std::string& serial_number); + + /** + * @brief Check if an AGV is registered + * @param manufacturer Manufacturer name + * @param serial_number Serial number + * @return true if AGV is registered + */ + bool is_agv_registered( + const std::string& manufacturer, const std::string& serial_number) const; + + /** + * @brief Publish an order to a specific AGV + * @param manufacturer Manufacturer name + * @param serial_number Serial number + * @param order The order message + * @return true if queued successfully, false if queue is full + * @throws std::runtime_error if AGV is not registered + */ + bool publish_order( + const std::string& manufacturer, const std::string& serial_number, + const vda5050_msgs::msg::Order& order); + + /** + * @brief Publish instant actions to a specific AGV + * @param manufacturer Manufacturer name + * @param serial_number Serial number + * @param actions The instant actions message + * @return true if queued successfully, false if queue is full + * @throws std::runtime_error if AGV is not registered + */ + bool publish_instant_actions( + const std::string& manufacturer, const std::string& serial_number, + const vda5050_msgs::msg::InstantActions& actions); + +protected: + // ============================================================================ + // Virtual callback methods - Override these in derived classes + // ============================================================================ + + /** + * @brief Called when a connection message is received from an AGV + * @param agv_id The AGV identifier (manufacturer/serial_number) + * @param msg The connection message + * + * This is a pure virtual method - derived classes must implement it. + */ + virtual void on_connection( + const std::string& agv_id, const vda5050_msgs::msg::Connection& msg) = 0; + + /** + * @brief Called when a state message is received from an AGV + * @param agv_id The AGV identifier (manufacturer/serial_number) + * @param msg The state message + * + * This is a pure virtual method - derived classes must implement it. + */ + virtual void on_state( + const std::string& agv_id, const vda5050_msgs::msg::State& msg) = 0; + + /** + * @brief Called when a visualization message is received from an AGV + * @param agv_id The AGV identifier (manufacturer/serial_number) + * @param msg The visualization message + * + * Default implementation logs a warning. Override to handle visualizations. + */ + virtual void on_visualization( + const std::string& agv_id, const vda5050_msgs::msg::Visualization& msg); + +private: + /** + * @brief Internal AGV lookup (must be called with agv_mutex_ held) + * @param agv_id The AGV identifier (manufacturer/serial_number) + * @return Shared pointer to AGV, or nullptr if not found + */ + std::shared_ptr get_agv(const std::string& agv_id) const; + + // Factory for creating communication strategies + CommunicationFactory communication_factory_; + + // Registered AGVs (shared_ptr allows safe access from get_agv()) + mutable std::mutex agv_mutex_; + std::unordered_map> agvs_; +}; + +#endif // VDA5050_MASTER__VDA5050_MASTER__MASTER_HPP_ diff --git a/vda5050_master/package.xml b/vda5050_master/package.xml new file mode 100644 index 0000000..ac6ce52 --- /dev/null +++ b/vda5050_master/package.xml @@ -0,0 +1,26 @@ + + + + vda5050_master + 0.0.0 + TODO: Package description + rosi + Apache-2.0 + + ament_cmake + + vda5050_core + vda5050_msgs + rclcpp + std_msgs + + ament_cmake_cppcheck + ament_cmake_cpplint + ament_cmake_clang_format + ament_cmake_copyright + ament_cmake_gmock + + + ament_cmake + + diff --git a/vda5050_master/src/agv/agv.cpp b/vda5050_master/src/agv/agv.cpp new file mode 100644 index 0000000..ba440ef --- /dev/null +++ b/vda5050_master/src/agv/agv.cpp @@ -0,0 +1,488 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "vda5050_master/agv/agv.hpp" + +#include + +#include "nlohmann/json.hpp" +#include "vda5050_core/logger/logger.hpp" +#include "vda5050_json_utils/serialization.hpp" +#include "vda5050_master/standard_names.hpp" + +// Additional JSON serialization for messages not in vda5050_json_utils +namespace vda5050_msgs { +namespace msg { + +inline void to_json(nlohmann::json& j, const InstantActions& msg) +{ + vda5050_types::header_detail::to_json(j, msg.header); + j["actions"] = msg.actions; +} + +inline void from_json(const nlohmann::json& j, InstantActions& msg) +{ + vda5050_types::header_detail::from_json(j, msg.header); + msg.actions = j.at("actions").get>(); +} + +inline void to_json(nlohmann::json& j, const Visualization& msg) +{ + vda5050_types::header_detail::to_json(j, msg.header); + if (!msg.agv_position.empty()) + { + j["agvPosition"] = msg.agv_position[0]; + } + if (!msg.velocity.empty()) + { + j["velocity"] = msg.velocity[0]; + } +} + +inline void from_json(const nlohmann::json& j, Visualization& msg) +{ + vda5050_types::header_detail::from_json(j, msg.header); + if (j.contains("agvPosition")) + { + AGVPosition pos; + vda5050_types::agv_position_detail::from_json(j.at("agvPosition"), pos); + msg.agv_position.push_back(pos); + } + if (j.contains("velocity")) + { + Velocity vel; + vda5050_types::velocity_detail::from_json(j.at("velocity"), vel); + msg.velocity.push_back(vel); + } +} + +} // namespace msg +} // namespace vda5050_msgs + +// ============================================================================ +// Constructor / Destructor +// ============================================================================ + +AGV::AGV( + const std::string& manufacturer, const std::string& serial_number, + std::unique_ptr communication, size_t max_queue_size, + bool drop_oldest) +: manufacturer_(manufacturer), + serial_number_(serial_number), + agv_id_(manufacturer + "/" + serial_number), + communication_(std::move(communication)), + registered_time_(Clock::now()), + max_queue_size_(max_queue_size), + drop_oldest_(drop_oldest) +{ + // Start the queue processing thread + queue_thread_ = std::thread(&AGV::process_queues, this); +} + +AGV::~AGV() +{ + // Signal the queue thread to stop + { + std::lock_guard lock(queue_mutex_); + stop_processing_ = true; + } + queue_cv_.notify_one(); + + // Wait for the thread to finish + if (queue_thread_.joinable()) + { + queue_thread_.join(); + } +} + +// ============================================================================ +// Communication +// ============================================================================ + +void AGV::connect() +{ + if (communication_) + { + communication_->connect(); + } +} + +void AGV::disconnect() +{ + if (communication_) + { + communication_->disconnect(); + } +} + +bool AGV::is_connected() const +{ + if (communication_) + { + return communication_->get_state() == ConnectionState::CONNECTED; + } + return false; +} + +// ============================================================================ +// Cached Messages - Update +// ============================================================================ + +void AGV::update_connection(const vda5050_msgs::msg::Connection& msg) +{ + std::lock_guard lock(data_mutex_); + last_connection_ = msg; + last_connection_time_ = Clock::now(); +} + +void AGV::update_state(const vda5050_msgs::msg::State& msg) +{ + std::lock_guard lock(data_mutex_); + last_state_ = msg; + last_state_time_ = Clock::now(); +} + +void AGV::update_visualization(const vda5050_msgs::msg::Visualization& msg) +{ + std::lock_guard lock(data_mutex_); + last_visualization_ = msg; + last_visualization_time_ = Clock::now(); +} + +// ============================================================================ +// Cached Messages - Get +// ============================================================================ + +std::optional AGV::get_last_connection() const +{ + std::lock_guard lock(data_mutex_); + return last_connection_; +} + +std::optional AGV::get_last_state() const +{ + std::lock_guard lock(data_mutex_); + return last_state_; +} + +std::optional AGV::get_last_visualization() + const +{ + std::lock_guard lock(data_mutex_); + return last_visualization_; +} + +// ============================================================================ +// Timestamps +// ============================================================================ + +std::optional AGV::get_last_connection_time() const +{ + std::lock_guard lock(data_mutex_); + return last_connection_time_; +} + +std::optional AGV::get_last_state_time() const +{ + std::lock_guard lock(data_mutex_); + return last_state_time_; +} + +std::optional AGV::get_last_visualization_time() const +{ + std::lock_guard lock(data_mutex_); + return last_visualization_time_; +} + +// ============================================================================ +// Subscriptions and Publishing +// ============================================================================ + +void AGV::setup_subscriptions( + ConnectionCallback on_connection, StateCallback on_state, + VisualizationCallback on_visualization) +{ + if (!communication_) + { + VDA5050_WARN( + "[AGV] Cannot setup subscriptions: no communication for {}", agv_id_); + return; + } + + // Subscribe to connection topic + communication_->subscribe( + build_topic(vda5050_master::ConnectionTopic), + [this, on_connection]( + const std::string& /*topic*/, const std::string& payload) { + handle_connection_message(payload, on_connection); + }, + vda5050_master::ConnectionQos); + + // Subscribe to state topic + communication_->subscribe( + build_topic(vda5050_master::StateTopic), + [this, on_state](const std::string& /*topic*/, const std::string& payload) { + handle_state_message(payload, on_state); + }, + vda5050_master::StateQos); + + // Subscribe to visualization topic (callback is optional, but subscription always created) + communication_->subscribe( + build_topic(vda5050_master::VisualizationTopic), + [this, on_visualization]( + const std::string& /*topic*/, const std::string& payload) { + handle_visualization_message(payload, on_visualization); + }, + vda5050_master::VisualizationQos); + + VDA5050_INFO("[AGV] Setup subscriptions for AGV: {}", agv_id_); +} + +bool AGV::send_order(const vda5050_msgs::msg::Order& order) +{ + std::lock_guard lock(queue_mutex_); + + if (order_queue_.size() >= max_queue_size_) + { + if (!drop_oldest_) + { + VDA5050_WARN( + "[AGV] Dropping new order: queue full ({}/{}) for {}", + order_queue_.size(), max_queue_size_, agv_id_); + return false; + } + // Drop oldest order to make room + VDA5050_WARN( + "[AGV] Dropping oldest order: queue full ({}/{}) for {}", + order_queue_.size(), max_queue_size_, agv_id_); + order_queue_.pop(); + } + + order_queue_.push(order); + queue_cv_.notify_one(); + + VDA5050_INFO("[AGV] Queued order for AGV: {}", agv_id_); + return true; +} + +bool AGV::send_instant_actions(const vda5050_msgs::msg::InstantActions& actions) +{ + std::lock_guard lock(queue_mutex_); + + if (instant_actions_queue_.size() >= max_queue_size_) + { + if (!drop_oldest_) + { + VDA5050_WARN( + "[AGV] Dropping new instant actions: queue full ({}/{}) for {}", + instant_actions_queue_.size(), max_queue_size_, agv_id_); + return false; + } + // Drop oldest instant actions to make room + VDA5050_WARN( + "[AGV] Dropping oldest instant actions: queue full ({}/{}) for {}", + instant_actions_queue_.size(), max_queue_size_, agv_id_); + instant_actions_queue_.pop(); + } + + instant_actions_queue_.push(actions); + queue_cv_.notify_one(); + + VDA5050_INFO("[AGV] Queued instant actions for AGV: {}", agv_id_); + return true; +} + +// ============================================================================ +// Private Helper Methods +// ============================================================================ + +std::string AGV::build_topic(const std::string& topic_name) const +{ + return vda5050_master::InterfaceName + "/" + vda5050_master::Version + "/" + + manufacturer_ + "/" + serial_number_ + "/" + topic_name; +} + +void AGV::handle_connection_message( + const std::string& payload, const ConnectionCallback& callback) +{ + try + { + auto json_msg = nlohmann::json::parse(payload); + vda5050_msgs::msg::Connection msg; + vda5050_msgs::msg::from_json(json_msg, msg); + + // Update cache + update_connection(msg); + + // Invoke user callback + if (callback) + { + callback(agv_id_, msg); + } + } + catch (const std::exception& e) + { + VDA5050_WARN( + "[AGV] Failed to parse connection message from {}: {}", agv_id_, + e.what()); + } +} + +void AGV::handle_state_message( + const std::string& payload, const StateCallback& callback) +{ + try + { + auto json_msg = nlohmann::json::parse(payload); + vda5050_msgs::msg::State msg; + vda5050_msgs::msg::from_json(json_msg, msg); + + // Update cache + update_state(msg); + + // Invoke user callback + if (callback) + { + callback(agv_id_, msg); + } + } + catch (const std::exception& e) + { + VDA5050_WARN( + "[AGV] Failed to parse state message from {}: {}", agv_id_, e.what()); + } +} + +void AGV::handle_visualization_message( + const std::string& payload, const VisualizationCallback& callback) +{ + try + { + auto json_msg = nlohmann::json::parse(payload); + vda5050_msgs::msg::Visualization msg; + vda5050_msgs::msg::from_json(json_msg, msg); + + // Update cache + update_visualization(msg); + + // Invoke user callback + if (callback) + { + callback(agv_id_, msg); + } + } + catch (const std::exception& e) + { + VDA5050_WARN( + "[AGV] Failed to parse visualization message from {}: {}", agv_id_, + e.what()); + } +} + +// ============================================================================ +// Queue Processing +// ============================================================================ + +void AGV::process_queues() +{ + VDA5050_INFO("[AGV] Queue processing thread started for {}", agv_id_); + + while (true) + { + std::optional order; + std::optional actions; + + { + std::unique_lock lock(queue_mutex_); + + // Wait for a message or stop signal + queue_cv_.wait(lock, [this] { + return stop_processing_ || !order_queue_.empty() || + !instant_actions_queue_.empty(); + }); + + // Check if we should stop + if ( + stop_processing_ && order_queue_.empty() && + instant_actions_queue_.empty()) + { + break; + } + + // Process instant actions first (higher priority) + if (!instant_actions_queue_.empty()) + { + actions = std::move(instant_actions_queue_.front()); + instant_actions_queue_.pop(); + } + else if (!order_queue_.empty()) + { + order = std::move(order_queue_.front()); + order_queue_.pop(); + } + } + + // Send the message (outside the lock) + if (actions) + { + send_instant_actions_now(*actions); + } + else if (order) + { + send_order_now(*order); + } + } + + VDA5050_INFO("[AGV] Queue processing thread stopped for {}", agv_id_); +} + +void AGV::send_order_now(const vda5050_msgs::msg::Order& order) +{ + if (!communication_) + { + VDA5050_WARN("[AGV] Cannot send order: no communication for {}", agv_id_); + return; + } + + nlohmann::json j; + vda5050_msgs::msg::to_json(j, order); + communication_->send_message( + build_topic(vda5050_master::OrderTopic), j.dump(), + vda5050_master::OrderQos); + + VDA5050_INFO("[AGV] Sent order to AGV: {}", agv_id_); +} + +void AGV::send_instant_actions_now( + const vda5050_msgs::msg::InstantActions& actions) +{ + if (!communication_) + { + VDA5050_WARN( + "[AGV] Cannot send instant actions: no communication for {}", agv_id_); + return; + } + + nlohmann::json j; + vda5050_msgs::msg::to_json(j, actions); + communication_->send_message( + build_topic(vda5050_master::InstantActionsTopic), j.dump(), + vda5050_master::InstantActionsQos); + + VDA5050_INFO("[AGV] Sent instant actions to AGV: {}", agv_id_); +} diff --git a/vda5050_master/src/vda5050_master/communication/communication.cpp b/vda5050_master/src/vda5050_master/communication/communication.cpp new file mode 100644 index 0000000..0a32859 --- /dev/null +++ b/vda5050_master/src/vda5050_master/communication/communication.cpp @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "vda5050_master/communication/communication.hpp" diff --git a/vda5050_master/src/vda5050_master/communication/heartbeat.cpp b/vda5050_master/src/vda5050_master/communication/heartbeat.cpp new file mode 100644 index 0000000..4fd8e39 --- /dev/null +++ b/vda5050_master/src/vda5050_master/communication/heartbeat.cpp @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "vda5050_master/communication/heartbeat.hpp" diff --git a/vda5050_master/src/vda5050_master/communication/mqtt.cpp b/vda5050_master/src/vda5050_master/communication/mqtt.cpp new file mode 100644 index 0000000..f9aee6f --- /dev/null +++ b/vda5050_master/src/vda5050_master/communication/mqtt.cpp @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "vda5050_master/communication/mqtt.hpp" + +#include "vda5050_core/logger/logger.hpp" + +MqttCommunication::MqttCommunication( + const std::string& endpoint, const std::string& id) +: ICommunicationStrategy(), endpoint_(endpoint), id_(id) +{ + mqtt_client_ = + vda5050_core::mqtt_client::create_default_client(endpoint_, id_); +} + +void MqttCommunication::subscribe( + const std::string& topic, MessageCallback callback, const int qos) +{ + mqtt_client_->subscribe( + topic, + [callback](const std::string& topic_, const std::string& payload_) { + callback(topic_, payload_); + }, + qos); + VDA5050_INFO("[MQTT] Subscribing to topic at {}", topic); +} + +void MqttCommunication::unsubscribe(const std::string& topic) +{ + mqtt_client_->unsubscribe(topic); + VDA5050_INFO("[MQTT] Unsubscribed from topic: {}", topic); +} + +void MqttCommunication::connect() +{ + mqtt_client_->connect(); + { + std::lock_guard lock(state_mutex_); + state_ = ConnectionState::CONNECTED; + } + VDA5050_INFO("[MQTT] Connecting to broker at {}", endpoint_); +} + +void MqttCommunication::disconnect() +{ + VDA5050_INFO("[MQTT] Disconnecting from MQTT broker"); + + // Signal disconnecting state + { + std::lock_guard lock(state_mutex_); + state_ = ConnectionState::DISCONNECTING; + } + + mqtt_client_->disconnect(); + + // Mark as fully disconnected + { + std::lock_guard lock(state_mutex_); + state_ = ConnectionState::DISCONNECTED; + } + + VDA5050_INFO("[MQTT] Disconnected from MQTT broker"); +} + +ConnectionState MqttCommunication::get_state() const +{ + std::lock_guard lock(state_mutex_); + return state_; +} + +void MqttCommunication::send_message( + const std::string& topic, const std::string& message, const int qos) +{ + mqtt_client_->publish(topic, message, qos); + VDA5050_INFO("[MQTT] Publishing message: {}", message); +} diff --git a/vda5050_master/src/vda5050_master/communication/ros2.cpp b/vda5050_master/src/vda5050_master/communication/ros2.cpp new file mode 100644 index 0000000..d55aacf --- /dev/null +++ b/vda5050_master/src/vda5050_master/communication/ros2.cpp @@ -0,0 +1,169 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "vda5050_master/communication/ros2.hpp" + +#include "vda5050_core/logger/logger.hpp" + +Ros2Communication::Ros2Communication( + std::shared_ptr node, const std::string& node_name) +: ICommunicationStrategy(), node_(node), node_name_(node_name) +{ + VDA5050_INFO( + "[ROS2] Initializing ROS2 communication for node: {}", node_name_); +} + +void Ros2Communication::subscribe( + const std::string& topic, MessageCallback callback, const int qos) +{ + // Convert VDA5050 QoS to ROS2 QoS + auto ros2_qos = convert_qos(qos); + + // Create a subscriber for this topic + auto subscription = node_->create_subscription( + topic, ros2_qos, + [this, topic, callback](const std_msgs::msg::String::SharedPtr msg) { + callback(topic, msg->data); + }); + + std::lock_guard lock(subscriber_mutex_); + // Store the subscription to keep it alive + subscriptions_[topic] = subscription; + VDA5050_INFO("[ROS2] Subscribed to topic: {}", topic); +} + +void Ros2Communication::unsubscribe(const std::string& topic) +{ + std::lock_guard lock(subscriber_mutex_); + auto it = subscriptions_.find(topic); + if (it != subscriptions_.end()) + { + subscriptions_.erase(it); + VDA5050_INFO("[ROS2] Unsubscribed from topic: {}", topic); + } + else + { + VDA5050_WARN( + "[ROS2] Cannot unsubscribe: no subscription for topic: {}", topic); + } +} + +void Ros2Communication::connect() +{ + // ROS2 nodes are connected when created, but we can initialize publishers here + { + std::lock_guard lock(state_mutex_); + state_ = ConnectionState::CONNECTED; + } + VDA5050_INFO("[ROS2] Node connected and ready"); +} + +void Ros2Communication::disconnect() +{ + VDA5050_INFO("[ROS2] Disconnecting from ROS2 communication"); + + // Signal disconnecting state + { + std::lock_guard lock(state_mutex_); + state_ = ConnectionState::DISCONNECTING; + } + + // Clear all subscriptions and publishers + { + std::lock_guard lock(subscriber_mutex_); + subscriptions_.clear(); + } + { + std::lock_guard lock(publisher_mutex_); + publishers_.clear(); + } + + // Mark as fully disconnected + { + std::lock_guard lock(state_mutex_); + state_ = ConnectionState::DISCONNECTED; + } + + VDA5050_INFO("[ROS2] Disconnected from ROS2 communication"); +} + +void Ros2Communication::send_message( + const std::string& topic, const std::string& message, const int qos) +{ + if (get_state() != ConnectionState::CONNECTED) + { + VDA5050_WARN("[ROS2] Cannot send message - not connected"); + return; + } + + // Get or create publisher for this topic + auto publisher = get_or_create_publisher(topic, qos); + + // Create and publish the message + auto ros_msg = std_msgs::msg::String(); + ros_msg.data = message; + publisher->publish(ros_msg); + + VDA5050_INFO("[ROS2] Published message to topic: {}", topic); +} + +ConnectionState Ros2Communication::get_state() const +{ + std::lock_guard lock(state_mutex_); + return state_; +} + +std::shared_ptr> +Ros2Communication::get_or_create_publisher(const std::string& topic, int qos) +{ + // Check if publisher already exists + std::lock_guard lock(publisher_mutex_); + auto it = publishers_.find(topic); + if (it != publishers_.end()) + { + return it->second; + } + + // Create new publisher + auto ros2_qos = convert_qos(qos); + auto publisher = + node_->create_publisher(topic, ros2_qos); + publishers_[topic] = publisher; + + VDA5050_INFO("[ROS2] Created publisher for topic: {}", topic); + return publisher; +} + +rclcpp::QoS Ros2Communication::convert_qos(int vda5050_qos) +{ + // 0 = At most once (Best effort) + // 1 = At least once (Reliable) + // 2 = Exactly once (Reliable + Transient local) + switch (vda5050_qos) + { + case 0: + return rclcpp::QoS(10).best_effort().durability_volatile(); + case 1: + return rclcpp::QoS(10).reliable().durability_volatile(); + case 2: + return rclcpp::QoS(10).reliable().transient_local(); + default: + VDA5050_WARN("[ROS2] Unknown QoS level {}, using default", vda5050_qos); + return rclcpp::QoS(10); + } +} diff --git a/vda5050_master/src/vda5050_master/master.cpp b/vda5050_master/src/vda5050_master/master.cpp new file mode 100644 index 0000000..11a7ec7 --- /dev/null +++ b/vda5050_master/src/vda5050_master/master.cpp @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "vda5050_master/vda5050_master/master.hpp" + +#include + +#include "vda5050_core/logger/logger.hpp" + +// ============================================================================ +// VDA5050Master Implementation +// ============================================================================ + +VDA5050Master::VDA5050Master(CommunicationFactory factory) +: communication_factory_(std::move(factory)) +{ +} + +void VDA5050Master::register_agv( + const std::string& manufacturer, const std::string& serial_number, + size_t max_queue_size, bool drop_oldest) +{ + std::string agv_id = manufacturer + "/" + serial_number; + + std::lock_guard lock(agv_mutex_); + + if (get_agv(agv_id)) + { + VDA5050_WARN("[VDA5050Master] AGV already registered: {}", agv_id); + return; + } + + // Create communication instance for this AGV using the factory + auto communication = communication_factory_(agv_id); + + // Create AGV object with queue configuration + auto agv = std::make_shared( + manufacturer, serial_number, std::move(communication), max_queue_size, + drop_oldest); + + // Connect the AGV's communication + agv->connect(); + + // Setup subscriptions for this AGV, routing callbacks to virtual methods + agv->setup_subscriptions( + [this](const std::string& id, const vda5050_msgs::msg::Connection& msg) { + on_connection(id, msg); + }, + [this](const std::string& id, const vda5050_msgs::msg::State& msg) { + on_state(id, msg); + }, + [this](const std::string& id, const vda5050_msgs::msg::Visualization& msg) { + on_visualization(id, msg); + }); + + // Store the AGV + agvs_[agv_id] = std::move(agv); + + VDA5050_INFO("[VDA5050Master] Registered AGV: {}", agv_id); +} + +void VDA5050Master::unregister_agv( + const std::string& manufacturer, const std::string& serial_number) +{ + std::string agv_id = manufacturer + "/" + serial_number; + + std::lock_guard lock(agv_mutex_); + + auto agv = get_agv(agv_id); + if (!agv) + { + VDA5050_WARN( + "[VDA5050Master] Cannot unregister: AGV not found: {}", agv_id); + return; + } + + // Disconnect the AGV's communication + agv->disconnect(); + + // Remove from map + agvs_.erase(agv_id); + + VDA5050_INFO("[VDA5050Master] Unregistered AGV: {}", agv_id); +} + +bool VDA5050Master::is_agv_registered( + const std::string& manufacturer, const std::string& serial_number) const +{ + std::lock_guard lock(agv_mutex_); + std::string agv_id = manufacturer + "/" + serial_number; + return get_agv(agv_id) != nullptr; +} + +std::shared_ptr VDA5050Master::get_agv(const std::string& agv_id) const +{ + auto it = agvs_.find(agv_id); + return (it != agvs_.end()) ? it->second : nullptr; +} + +bool VDA5050Master::publish_order( + const std::string& manufacturer, const std::string& serial_number, + const vda5050_msgs::msg::Order& order) +{ + std::string agv_id = manufacturer + "/" + serial_number; + + std::shared_ptr agv; + { + std::lock_guard lock(agv_mutex_); + agv = get_agv(agv_id); + } + + if (!agv) + { + throw std::runtime_error( + "Cannot publish order: AGV not registered: " + agv_id); + } + + return agv->send_order(order); +} + +bool VDA5050Master::publish_instant_actions( + const std::string& manufacturer, const std::string& serial_number, + const vda5050_msgs::msg::InstantActions& actions) +{ + std::string agv_id = manufacturer + "/" + serial_number; + + std::shared_ptr agv; + { + std::lock_guard lock(agv_mutex_); + agv = get_agv(agv_id); + } + + if (!agv) + { + throw std::runtime_error( + "Cannot publish instant actions: AGV not registered: " + agv_id); + } + + return agv->send_instant_actions(actions); +} + +// ============================================================================ +// Default implementations for optional virtual callbacks +// ============================================================================ + +void VDA5050Master::on_visualization( + const std::string& agv_id, const vda5050_msgs::msg::Visualization& /*msg*/) +{ + VDA5050_WARN( + "[VDA5050Master] on_visualization not overridden. Received visualization " + "from AGV: {}", + agv_id); +} diff --git a/vda5050_master/test/agv/agv_test.cpp b/vda5050_master/test/agv/agv_test.cpp new file mode 100644 index 0000000..c1222fb --- /dev/null +++ b/vda5050_master/test/agv/agv_test.cpp @@ -0,0 +1,925 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "vda5050_master/agv/agv.hpp" +#include "vda5050_master/communication/communication.hpp" +#include "vda5050_master/vda5050_master/master.hpp" + +namespace vda5050_master::test { + +// ============================================================================= +// Mock Communication Strategy +// ============================================================================= + +class MockCommunicationStrategy : public ICommunicationStrategy +{ +public: + MockCommunicationStrategy() : state_(ConnectionState::DISCONNECTED) {} + + void connect() override + { + state_ = ConnectionState::CONNECTED; + } + + void disconnect() override + { + state_ = ConnectionState::DISCONNECTED; + } + + void subscribe( + const std::string& topic, MessageCallback callback, + const int /*qos*/ = 0) override + { + std::lock_guard lock(mutex_); + subscriptions_[topic] = callback; + } + + void unsubscribe(const std::string& topic) override + { + std::lock_guard lock(mutex_); + subscriptions_.erase(topic); + } + + void send_message( + const std::string& topic, const std::string& message, + const int /*qos*/ = 0) override + { + // Wait until unblocked (for queue policy testing) + { + std::unique_lock lock(block_mutex_); + blocked_waiters_++; + blocked_waiter_cv_.notify_all(); // Notify that we're waiting + block_cv_.wait(lock, [this] { return !blocked_; }); + blocked_waiters_--; + } + + std::lock_guard lock(mutex_); + sent_messages_.push_back({topic, message}); + send_count_++; + cv_.notify_all(); + } + + ConnectionState get_state() const override + { + return state_; + } + + // Blocking control for queue policy testing + void block() + { + std::lock_guard lock(block_mutex_); + blocked_ = true; + } + void unblock() + { + std::lock_guard lock(block_mutex_); + blocked_ = false; + block_cv_.notify_all(); + } + + // Wait until at least one message is blocked in send_message + bool wait_for_blocked( + std::chrono::milliseconds timeout = std::chrono::milliseconds(1000)) + { + std::unique_lock lock(block_mutex_); + return blocked_waiter_cv_.wait_for( + lock, timeout, [this] { return blocked_waiters_ > 0; }); + } + + // Test helper methods + size_t get_send_count() const + { + std::lock_guard lock(mutex_); + return send_count_; + } + + std::vector> get_sent_messages() const + { + std::lock_guard lock(mutex_); + return sent_messages_; + } + + void clear_sent_messages() + { + std::lock_guard lock(mutex_); + sent_messages_.clear(); + send_count_ = 0; + } + + bool wait_for_messages( + size_t count, + std::chrono::milliseconds timeout = std::chrono::milliseconds(5000)) + { + std::unique_lock lock(mutex_); + return cv_.wait_for( + lock, timeout, [this, count] { return send_count_ >= count; }); + } + + // Simulate receiving a message on a subscribed topic + void simulate_receive(const std::string& topic, const std::string& payload) + { + MessageCallback callback; + { + std::lock_guard lock(mutex_); + auto it = subscriptions_.find(topic); + if (it != subscriptions_.end()) + { + callback = it->second; + } + } + // Call callback outside lock to avoid deadlock + if (callback) + { + callback(topic, payload); + } + } + +private: + std::atomic state_; + mutable std::mutex mutex_; + std::condition_variable cv_; + std::map subscriptions_; + std::vector> sent_messages_; + size_t send_count_ = 0; + + // Blocking support for queue policy testing + bool blocked_{false}; + size_t blocked_waiters_{0}; + std::mutex block_mutex_; + std::condition_variable block_cv_; + std::condition_variable blocked_waiter_cv_; +}; + +// ============================================================================= +// Shared Mock Registry - allows test fixture to access mocks created by factory +// ============================================================================= + +class MockRegistry +{ +public: + static MockRegistry& instance() + { + static MockRegistry registry; + return registry; + } + + void register_mock(const std::string& agv_id, MockCommunicationStrategy* mock) + { + std::lock_guard lock(mutex_); + mocks_[agv_id] = mock; + } + + MockCommunicationStrategy* get_mock(const std::string& agv_id) + { + std::lock_guard lock(mutex_); + auto it = mocks_.find(agv_id); + return it != mocks_.end() ? it->second : nullptr; + } + + void clear() + { + std::lock_guard lock(mutex_); + mocks_.clear(); + } + +private: + MockRegistry() = default; + std::mutex mutex_; + std::map mocks_; +}; + +// ============================================================================= +// Test Master - Concrete implementation of VDA5050Master for testing +// ============================================================================= + +class TestMaster : public VDA5050Master +{ +public: + TestMaster() + : VDA5050Master([](const std::string& agv_id) { + auto mock = std::make_unique(); + MockRegistry::instance().register_mock(agv_id, mock.get()); + return mock; + }) + { + } + + // Track received messages for verification + std::vector> + received_connections; + std::vector> received_states; + std::mutex callback_mutex; + +protected: + void on_connection( + const std::string& agv_id, + const vda5050_msgs::msg::Connection& msg) override + { + std::lock_guard lock(callback_mutex); + received_connections.push_back({agv_id, msg}); + } + + void on_state( + const std::string& agv_id, const vda5050_msgs::msg::State& msg) override + { + std::lock_guard lock(callback_mutex); + received_states.push_back({agv_id, msg}); + } +}; + +// ============================================================================= +// Test Fixture +// ============================================================================= + +class AGVIntegrationTestFixture : public ::testing::Test +{ +protected: + void SetUp() override + { + MockRegistry::instance().clear(); + manufacturer_ = "TestManufacturer"; + serial_number_ = "SN001"; + agv_id_ = manufacturer_ + "/" + serial_number_; + } + + void TearDown() override + { + MockRegistry::instance().clear(); + } + + vda5050_msgs::msg::Order create_test_order(const std::string& order_id) + { + vda5050_msgs::msg::Order order; + order.order_id = order_id; + order.order_update_id = 0; + return order; + } + + vda5050_msgs::msg::InstantActions create_test_instant_actions(uint32_t id) + { + vda5050_msgs::msg::InstantActions actions; + actions.header.header_id = id; + return actions; + } + + MockCommunicationStrategy* get_mock() + { + return MockRegistry::instance().get_mock(agv_id_); + } + + std::string manufacturer_; + std::string serial_number_; + std::string agv_id_; +}; + +// ============================================================================= +// Basic Registration Tests +// ============================================================================= + +TEST_F(AGVIntegrationTestFixture, RegisterAGVCreatesConnection) +{ + TestMaster master; + + EXPECT_FALSE(master.is_agv_registered(manufacturer_, serial_number_)); + + master.register_agv(manufacturer_, serial_number_); + + EXPECT_TRUE(master.is_agv_registered(manufacturer_, serial_number_)); +} + +TEST_F(AGVIntegrationTestFixture, UnregisterAGVRemovesConnection) +{ + TestMaster master; + + master.register_agv(manufacturer_, serial_number_); + EXPECT_TRUE(master.is_agv_registered(manufacturer_, serial_number_)); + + master.unregister_agv(manufacturer_, serial_number_); + EXPECT_FALSE(master.is_agv_registered(manufacturer_, serial_number_)); +} + +TEST_F(AGVIntegrationTestFixture, PublishOrderToUnregisteredAGVThrows) +{ + TestMaster master; + + EXPECT_THROW( + master.publish_order(manufacturer_, serial_number_, create_test_order("1")), + std::runtime_error); +} + +TEST_F(AGVIntegrationTestFixture, PublishInstantActionsToUnregisteredAGVThrows) +{ + TestMaster master; + + EXPECT_THROW( + master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(1)), + std::runtime_error); +} + +// ============================================================================= +// Message Publishing Tests +// ============================================================================= + +TEST_F(AGVIntegrationTestFixture, PublishOrderSendsMessage) +{ + TestMaster master; + master.register_agv(manufacturer_, serial_number_); + + auto* mock = get_mock(); + ASSERT_NE(mock, nullptr); + + EXPECT_TRUE(master.publish_order( + manufacturer_, serial_number_, create_test_order("order_1"))); + + // Wait for message to be processed by queue thread + ASSERT_TRUE(mock->wait_for_messages(1)); + + auto messages = mock->get_sent_messages(); + ASSERT_EQ(messages.size(), 1u); + + // Verify topic contains "order" + EXPECT_NE(messages[0].first.find("order"), std::string::npos); + + // Verify payload contains order_id + EXPECT_NE(messages[0].second.find("order_1"), std::string::npos); +} + +TEST_F(AGVIntegrationTestFixture, PublishInstantActionsSendsMessage) +{ + TestMaster master; + master.register_agv(manufacturer_, serial_number_); + + auto* mock = get_mock(); + ASSERT_NE(mock, nullptr); + + EXPECT_TRUE(master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(42))); + + // Wait for message to be processed + ASSERT_TRUE(mock->wait_for_messages(1)); + + auto messages = mock->get_sent_messages(); + ASSERT_EQ(messages.size(), 1u); + + // Verify topic contains "instant_actions" + EXPECT_NE(messages[0].first.find("instant_actions"), std::string::npos); +} + +TEST_F(AGVIntegrationTestFixture, PublishMultipleOrdersInSequence) +{ + TestMaster master; + master.register_agv(manufacturer_, serial_number_); + + auto* mock = get_mock(); + ASSERT_NE(mock, nullptr); + + constexpr int num_orders = 5; + for (int i = 0; i < num_orders; ++i) + { + EXPECT_TRUE(master.publish_order( + manufacturer_, serial_number_, + create_test_order("order_" + std::to_string(i)))); + } + + // Wait for all messages + ASSERT_TRUE(mock->wait_for_messages(num_orders)); + + auto messages = mock->get_sent_messages(); + ASSERT_EQ(messages.size(), static_cast(num_orders)); + + // Verify messages arrived in order + for (int i = 0; i < num_orders; ++i) + { + std::string expected_id = "order_" + std::to_string(i); + EXPECT_NE(messages[i].second.find(expected_id), std::string::npos) + << "Message " << i << " should contain " << expected_id; + } +} + +TEST_F(AGVIntegrationTestFixture, PublishMultipleInstantActionsInSequence) +{ + TestMaster master; + master.register_agv(manufacturer_, serial_number_); + + auto* mock = get_mock(); + ASSERT_NE(mock, nullptr); + + constexpr int num_actions = 5; + for (int i = 0; i < num_actions; ++i) + { + EXPECT_TRUE(master.publish_instant_actions( + manufacturer_, serial_number_, + create_test_instant_actions(static_cast(i)))); + } + + // Wait for all messages + ASSERT_TRUE(mock->wait_for_messages(num_actions)); + + auto messages = mock->get_sent_messages(); + ASSERT_EQ(messages.size(), static_cast(num_actions)); + + // Verify messages arrived in order (header_id should be 0, 1, 2, 3, 4) + for (int i = 0; i < num_actions; ++i) + { + std::string expected_id = "\"headerId\":" + std::to_string(i); + EXPECT_NE(messages[i].second.find(expected_id), std::string::npos) + << "Message " << i << " should contain headerId " << i; + } +} + +// ============================================================================= +// Concurrent Access Tests +// ============================================================================= + +TEST_F(AGVIntegrationTestFixture, ConcurrentPublishOrdersAreThreadSafe) +{ + TestMaster master; + master.register_agv(manufacturer_, serial_number_); + + auto* mock = get_mock(); + ASSERT_NE(mock, nullptr); + + // Use fewer messages to stay within default queue size (10) + // This ensures all messages are sent without drops + constexpr int num_threads = 2; + constexpr int orders_per_thread = 3; + constexpr int total_orders = num_threads * orders_per_thread; + std::vector threads; + + for (int t = 0; t < num_threads; ++t) + { + threads.emplace_back([&, t]() { + for (int i = 0; i < orders_per_thread; ++i) + { + std::string order_id = + "thread_" + std::to_string(t) + "_order_" + std::to_string(i); + master.publish_order( + manufacturer_, serial_number_, create_test_order(order_id)); + } + }); + } + + for (auto& thread : threads) + { + thread.join(); + } + + // Wait for all messages to be sent + ASSERT_TRUE( + mock->wait_for_messages(total_orders, std::chrono::milliseconds(10000))); + + auto messages = mock->get_sent_messages(); + EXPECT_EQ(messages.size(), static_cast(total_orders)); +} + +TEST_F(AGVIntegrationTestFixture, ConcurrentPublishInstantActionsAreThreadSafe) +{ + TestMaster master; + master.register_agv(manufacturer_, serial_number_); + + auto* mock = get_mock(); + ASSERT_NE(mock, nullptr); + + // Use fewer messages to stay within default queue size (10) + // This ensures all messages are sent without drops + constexpr int num_threads = 2; + constexpr int actions_per_thread = 3; + constexpr int total_actions = num_threads * actions_per_thread; + std::vector threads; + + for (int t = 0; t < num_threads; ++t) + { + threads.emplace_back([&, t]() { + for (int i = 0; i < actions_per_thread; ++i) + { + uint32_t action_id = static_cast(t * actions_per_thread + i); + master.publish_instant_actions( + manufacturer_, serial_number_, + create_test_instant_actions(action_id)); + } + }); + } + + for (auto& thread : threads) + { + thread.join(); + } + + // Wait for all messages to be sent + ASSERT_TRUE( + mock->wait_for_messages(total_actions, std::chrono::milliseconds(10000))); + + auto messages = mock->get_sent_messages(); + EXPECT_EQ(messages.size(), static_cast(total_actions)); +} + +TEST_F(AGVIntegrationTestFixture, ConcurrentOrdersAndInstantActions) +{ + TestMaster master; + master.register_agv(manufacturer_, serial_number_); + + auto* mock = get_mock(); + ASSERT_NE(mock, nullptr); + + constexpr int num_orders = 10; + constexpr int num_actions = 10; + + std::thread order_thread([&]() { + for (int i = 0; i < num_orders; ++i) + { + master.publish_order( + manufacturer_, serial_number_, + create_test_order("order_" + std::to_string(i))); + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + }); + + std::thread action_thread([&]() { + for (int i = 0; i < num_actions; ++i) + { + master.publish_instant_actions( + manufacturer_, serial_number_, + create_test_instant_actions(static_cast(i))); + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + }); + + order_thread.join(); + action_thread.join(); + + // Wait for all messages + ASSERT_TRUE(mock->wait_for_messages( + num_orders + num_actions, std::chrono::milliseconds(10000))); + + auto messages = mock->get_sent_messages(); + EXPECT_EQ(messages.size(), static_cast(num_orders + num_actions)); + + // Count orders and instant actions + int order_count = 0; + int action_count = 0; + for (const auto& msg : messages) + { + if (msg.first.find("instant_actions") != std::string::npos) + { + action_count++; + } + else if (msg.first.find("order") != std::string::npos) + { + order_count++; + } + } + + EXPECT_EQ(order_count, num_orders); + EXPECT_EQ(action_count, num_actions); +} + +// ============================================================================= +// Multi-AGV Tests +// ============================================================================= + +TEST_F(AGVIntegrationTestFixture, MultipleAGVsCanBeRegistered) +{ + TestMaster master; + + master.register_agv("Manufacturer1", "SN001"); + master.register_agv("Manufacturer1", "SN002"); + master.register_agv("Manufacturer2", "SN001"); + + EXPECT_TRUE(master.is_agv_registered("Manufacturer1", "SN001")); + EXPECT_TRUE(master.is_agv_registered("Manufacturer1", "SN002")); + EXPECT_TRUE(master.is_agv_registered("Manufacturer2", "SN001")); +} + +TEST_F(AGVIntegrationTestFixture, MessagesRoutedToCorrectAGV) +{ + TestMaster master; + + master.register_agv("Mfg1", "AGV1"); + master.register_agv("Mfg1", "AGV2"); + + auto* mock1 = MockRegistry::instance().get_mock("Mfg1/AGV1"); + auto* mock2 = MockRegistry::instance().get_mock("Mfg1/AGV2"); + ASSERT_NE(mock1, nullptr); + ASSERT_NE(mock2, nullptr); + + // Send to AGV1 + master.publish_order("Mfg1", "AGV1", create_test_order("order_for_agv1")); + ASSERT_TRUE(mock1->wait_for_messages(1)); + + // Send to AGV2 + master.publish_order("Mfg1", "AGV2", create_test_order("order_for_agv2")); + ASSERT_TRUE(mock2->wait_for_messages(1)); + + // Verify each AGV got its own message + auto messages1 = mock1->get_sent_messages(); + auto messages2 = mock2->get_sent_messages(); + + ASSERT_EQ(messages1.size(), 1u); + ASSERT_EQ(messages2.size(), 1u); + + EXPECT_NE(messages1[0].second.find("order_for_agv1"), std::string::npos); + EXPECT_NE(messages2[0].second.find("order_for_agv2"), std::string::npos); +} + +// ============================================================================= +// Graceful Shutdown Tests +// ============================================================================= + +TEST_F(AGVIntegrationTestFixture, DestructorCompletesWithPendingMessages) +{ + auto start = std::chrono::steady_clock::now(); + + { + TestMaster master; + master.register_agv(manufacturer_, serial_number_); + + // Queue many messages + for (int i = 0; i < 50; ++i) + { + master.publish_order( + manufacturer_, serial_number_, + create_test_order("pending_" + std::to_string(i))); + } + + // Master destructor should complete gracefully + } + + auto elapsed = std::chrono::steady_clock::now() - start; + + // Should complete in reasonable time + EXPECT_LT(elapsed, std::chrono::seconds(10)) + << "Destructor took too long with pending messages"; +} + +TEST_F(AGVIntegrationTestFixture, UnregisterWithPendingMessages) +{ + TestMaster master; + master.register_agv(manufacturer_, serial_number_); + + // Queue some messages + for (int i = 0; i < 10; ++i) + { + master.publish_order( + manufacturer_, serial_number_, + create_test_order("pending_" + std::to_string(i))); + } + + // Unregister should complete gracefully + auto start = std::chrono::steady_clock::now(); + master.unregister_agv(manufacturer_, serial_number_); + auto elapsed = std::chrono::steady_clock::now() - start; + + EXPECT_LT(elapsed, std::chrono::seconds(5)) << "Unregister took too long"; + + EXPECT_FALSE(master.is_agv_registered(manufacturer_, serial_number_)); +} + +// ============================================================================= +// Edge Cases +// ============================================================================= + +TEST_F(AGVIntegrationTestFixture, DuplicateRegistrationIsIgnored) +{ + TestMaster master; + + master.register_agv(manufacturer_, serial_number_); + EXPECT_TRUE(master.is_agv_registered(manufacturer_, serial_number_)); + + // Second registration should be ignored (no crash, no duplicate) + master.register_agv(manufacturer_, serial_number_); + EXPECT_TRUE(master.is_agv_registered(manufacturer_, serial_number_)); +} + +TEST_F(AGVIntegrationTestFixture, UnregisterNonExistentAGVIsIgnored) +{ + TestMaster master; + + // Should not throw or crash + EXPECT_NO_THROW(master.unregister_agv("NonExistent", "AGV")); +} + +// ============================================================================= +// Queue Policy Tests +// ============================================================================= + +TEST_F(AGVIntegrationTestFixture, DropNewestPolicyRejectsNewOrderWhenQueueFull) +{ + constexpr size_t queue_size = 3; + constexpr bool drop_oldest = false; // Reject new messages when full + + TestMaster master; + master.register_agv(manufacturer_, serial_number_, queue_size, drop_oldest); + + auto* mock = get_mock(); + ASSERT_NE(mock, nullptr); + mock->block(); // Block message processing so queue fills up + + // Publish first message and wait for it to be blocked in send_message + EXPECT_TRUE(master.publish_order( + manufacturer_, serial_number_, create_test_order("order_0"))); + ASSERT_TRUE(mock->wait_for_blocked()); + + // Now fill the queue (size=3) with orders 1, 2, 3 + EXPECT_TRUE(master.publish_order( + manufacturer_, serial_number_, create_test_order("order_1"))); + EXPECT_TRUE(master.publish_order( + manufacturer_, serial_number_, create_test_order("order_2"))); + EXPECT_TRUE(master.publish_order( + manufacturer_, serial_number_, create_test_order("order_3"))); + + // Queue is full - new messages should be rejected (return false) + EXPECT_FALSE(master.publish_order( + manufacturer_, serial_number_, create_test_order("order_4"))); + EXPECT_FALSE(master.publish_order( + manufacturer_, serial_number_, create_test_order("order_5"))); + + // Unblock and let messages be sent + mock->unblock(); + + // Wait for all messages: 1 + queue_size + constexpr size_t expected_messages = queue_size + 1; + ASSERT_TRUE(mock->wait_for_messages(expected_messages)); + + auto messages = mock->get_sent_messages(); + ASSERT_EQ(messages.size(), expected_messages); + + // Should have order_0, order_1, order_2, order_3 (new ones were rejected) + EXPECT_NE(messages[0].second.find("order_0"), std::string::npos); + EXPECT_NE(messages[1].second.find("order_1"), std::string::npos); + EXPECT_NE(messages[2].second.find("order_2"), std::string::npos); + EXPECT_NE(messages[3].second.find("order_3"), std::string::npos); +} + +TEST_F( + AGVIntegrationTestFixture, + DropNewestPolicyRejectsNewInstantActionsWhenQueueFull) +{ + constexpr size_t queue_size = 3; + constexpr bool drop_oldest = false; // Reject new messages when full + + TestMaster master; + master.register_agv(manufacturer_, serial_number_, queue_size, drop_oldest); + + auto* mock = get_mock(); + ASSERT_NE(mock, nullptr); + mock->block(); // Block message processing so queue fills up + + // Publish first message and wait for it to be blocked in send_message + EXPECT_TRUE(master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(0))); + ASSERT_TRUE(mock->wait_for_blocked()); + + // Now fill the queue (size=3) with actions 1, 2, 3 + EXPECT_TRUE(master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(1))); + EXPECT_TRUE(master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(2))); + EXPECT_TRUE(master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(3))); + + // Queue is full - new messages should be rejected (return false) + EXPECT_FALSE(master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(4))); + EXPECT_FALSE(master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(5))); + + // Unblock and let messages be sent + mock->unblock(); + + // Wait for all messages: 1 + queue_size + constexpr size_t expected_messages = queue_size + 1; + ASSERT_TRUE(mock->wait_for_messages(expected_messages)); + + auto messages = mock->get_sent_messages(); + ASSERT_EQ(messages.size(), expected_messages); + + // Should have header IDs 0, 1, 2, 3 (new ones were rejected) + EXPECT_NE(messages[0].second.find("\"headerId\":0"), std::string::npos); + EXPECT_NE(messages[1].second.find("\"headerId\":1"), std::string::npos); + EXPECT_NE(messages[2].second.find("\"headerId\":2"), std::string::npos); + EXPECT_NE(messages[3].second.find("\"headerId\":3"), std::string::npos); +} + +TEST_F(AGVIntegrationTestFixture, DropOldestPolicyDropsOldestOrder) +{ + constexpr size_t queue_size = 3; + constexpr bool drop_oldest = true; // Drop oldest messages when full + + TestMaster master; + master.register_agv(manufacturer_, serial_number_, queue_size, drop_oldest); + + auto* mock = get_mock(); + ASSERT_NE(mock, nullptr); + mock->block(); // Block message processing so queue fills up + + // Publish first message and wait for it to be blocked in send_message + EXPECT_TRUE(master.publish_order( + manufacturer_, serial_number_, create_test_order("order_0"))); + ASSERT_TRUE(mock->wait_for_blocked()); + + // Now publish remaining messages - these will fill and overflow the queue + // Queue will have order_1, order_2, order_3 initially (size=3) + // order_4 causes oldest (order_1) to be dropped + EXPECT_TRUE(master.publish_order( + manufacturer_, serial_number_, create_test_order("order_1"))); + EXPECT_TRUE(master.publish_order( + manufacturer_, serial_number_, create_test_order("order_2"))); + EXPECT_TRUE(master.publish_order( + manufacturer_, serial_number_, create_test_order("order_3"))); + EXPECT_TRUE(master.publish_order( + manufacturer_, serial_number_, create_test_order("order_4"))); + + // Unblock and let messages be sent + mock->unblock(); + + // Wait for messages: order_0 (being processed) + queue_size remaining + constexpr size_t expected_messages = queue_size + 1; + ASSERT_TRUE(mock->wait_for_messages(expected_messages)); + + auto messages = mock->get_sent_messages(); + ASSERT_EQ(messages.size(), expected_messages); + + // Should have order_0 (being processed), order_2, order_3, order_4 (order_1 dropped) + EXPECT_NE(messages[0].second.find("order_0"), std::string::npos); + EXPECT_NE(messages[1].second.find("order_2"), std::string::npos); + EXPECT_NE(messages[2].second.find("order_3"), std::string::npos); + EXPECT_NE(messages[3].second.find("order_4"), std::string::npos); +} + +TEST_F(AGVIntegrationTestFixture, DropOldestPolicyDropsOldestInstantActions) +{ + constexpr size_t queue_size = 3; + constexpr bool drop_oldest = true; // Drop oldest messages when full + + TestMaster master; + master.register_agv(manufacturer_, serial_number_, queue_size, drop_oldest); + + auto* mock = get_mock(); + ASSERT_NE(mock, nullptr); + mock->block(); // Block message processing so queue fills up + + // Publish first message and wait for it to be blocked in send_message + EXPECT_TRUE(master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(0))); + ASSERT_TRUE(mock->wait_for_blocked()); + + // Now publish remaining messages - these will fill and overflow the queue + // Queue will have actions 1, 2, 3 initially (size=3) + // action 4 causes oldest (action 1) to be dropped + EXPECT_TRUE(master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(1))); + EXPECT_TRUE(master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(2))); + EXPECT_TRUE(master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(3))); + EXPECT_TRUE(master.publish_instant_actions( + manufacturer_, serial_number_, create_test_instant_actions(4))); + + // Unblock and let messages be sent + mock->unblock(); + + // Wait for messages: action 0 (being processed) + queue_size remaining + constexpr size_t expected_messages = queue_size + 1; + ASSERT_TRUE(mock->wait_for_messages(expected_messages)); + + auto messages = mock->get_sent_messages(); + ASSERT_EQ(messages.size(), expected_messages); + + // Should have header IDs 0 (being processed), 2, 3, 4 (action 1 dropped) + EXPECT_NE(messages[0].second.find("\"headerId\":0"), std::string::npos); + EXPECT_NE(messages[1].second.find("\"headerId\":2"), std::string::npos); + EXPECT_NE(messages[2].second.find("\"headerId\":3"), std::string::npos); + EXPECT_NE(messages[3].second.find("\"headerId\":4"), std::string::npos); +} + +} // namespace vda5050_master::test diff --git a/vda5050_master/test/communication/mqtt/communications.cpp b/vda5050_master/test/communication/mqtt/communications.cpp new file mode 100644 index 0000000..5ce02b0 --- /dev/null +++ b/vda5050_master/test/communication/mqtt/communications.cpp @@ -0,0 +1,248 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include "../test_helpers.hpp" +#include "test_helpers.hpp" +#include "vda5050_master/communication/mqtt.hpp" + +// Using declarations instead of using-directives for cpplint compliance +using vda5050_master::test::make_numbered_payloads; +using vda5050_master::test::make_test_topic; +using vda5050_master::test::verify_messages_in_order; +using vda5050_master::test::wait_for_condition; +using vda5050_master::test::constants::default_payload_json; +using vda5050_master::test::mqtt::constants::MQTT_BROKER; +using vda5050_master::test::mqtt::constants::MQTT_POLL_INTERVAL; +class MqttCommunicationTestFixture : public ::testing::Test +{ +protected: + void SetUp() override + { + broker_endpoint_ = MQTT_BROKER; + qos_ = 0; + container_ = std::make_shared>(); + } + + void TearDown() override + { + // Cleanup if needed + } + + std::string get_last_subscription_message() + { + auto msg = *(container_->begin()); + container_->erase(container_->begin()); + return msg; + } + + std::shared_ptr> container_; + + std::string broker_endpoint_; + int qos_; +}; +TEST_F(MqttCommunicationTestFixture, SubscriptionTest) +{ + std::string topic = make_test_topic("mqtt/subscription"); + std::string payload = default_payload_json().dump(); + + auto mqtt_comms = MqttCommunication(broker_endpoint_, "test_id"); + ASSERT_NO_THROW(mqtt_comms.connect()); + mqtt_comms.subscribe( + topic, + [this](const std::string& topic, const std::string& payload) { + // Parse JSON and call the connection callback + container_->push_back(payload); + }, + qos_); + + auto talker = vda5050_core::mqtt_client::create_default_client( + broker_endpoint_, "talker"); + ASSERT_NO_THROW(talker->connect()); + ASSERT_NO_THROW(talker->publish(topic, payload, qos_)); + + // Poll for message with timeout (accounting for network latency) + bool received = wait_for_condition( + [&]() { return container_->size() > 0; }, MQTT_POLL_INTERVAL); + + ASSERT_EQ(1, container_->size()); + container_->clear(); + ASSERT_NO_THROW(talker->disconnect()); + ASSERT_NO_THROW(mqtt_comms.disconnect()); +} + +TEST_F(MqttCommunicationTestFixture, PublishTest) +{ + std::string topic = make_test_topic("mqtt/publish"); + std::string payload = default_payload_json().dump(); + + std::atomic_bool received = false; + + auto listener = vda5050_core::mqtt_client::create_default_client( + broker_endpoint_, "listener"); + ASSERT_NO_THROW(listener->connect()); + ASSERT_NO_THROW(listener->subscribe( + topic, + [&](const std::string& topic_, const std::string& payload_) { + received = true; + ASSERT_EQ(topic, topic_); + ASSERT_EQ(payload, payload_); + }, + qos_)); + + auto mqtt_comms = MqttCommunication(broker_endpoint_, "test_id"); + ASSERT_NO_THROW(mqtt_comms.connect()); + mqtt_comms.send_message(topic, payload, qos_); + + // Poll for message with timeout (accounting for network latency) + ASSERT_TRUE( + wait_for_condition([&]() { return received.load(); }, MQTT_POLL_INTERVAL)); + + ASSERT_TRUE(received); + ASSERT_NO_THROW(mqtt_comms.disconnect()); + ASSERT_NO_THROW(listener->disconnect()); +} + +TEST_F(MqttCommunicationTestFixture, MultipleMessagesTest) +{ + std::string topic = make_test_topic("mqtt/multiple"); + auto payloads = make_numbered_payloads(3); + + // Create node for Ros2Communication + auto mqtt_comms = MqttCommunication(broker_endpoint_, "test_id"); + ASSERT_NO_THROW(mqtt_comms.connect()); + mqtt_comms.subscribe( + topic, + [this](const std::string& topic, const std::string& payload) { + // Parse JSON and call the connection callback + container_->push_back(payload); + }, + qos_); + + auto talker = vda5050_core::mqtt_client::create_default_client( + broker_endpoint_, "talker"); + ASSERT_NO_THROW(talker->connect()); + + // Publish all messages + for (const auto& payload : payloads) + { + ASSERT_NO_THROW(talker->publish(topic, payload, qos_)); + } + + // Wait for all messages + bool all_received = wait_for_condition( + [&]() { return container_->size() >= payloads.size(); }, + MQTT_POLL_INTERVAL); + + ASSERT_TRUE(all_received); + + // Use helper to verify messages in order + verify_messages_in_order( + [&]() { return get_last_subscription_message(); }, payloads, + [&](size_t expected) { ASSERT_EQ(expected, container_->size()); }); + + ASSERT_NO_THROW(mqtt_comms.disconnect()); +} + +// ============================================================================= +// Lifecycle State Tests +// ============================================================================= + +TEST_F(MqttCommunicationTestFixture, InitialStateIsDisconnected) +{ + // Before connect() is called, should be in DISCONNECTED state + auto mqtt_comms = MqttCommunication(broker_endpoint_, "test_initial_state"); + + ASSERT_EQ(mqtt_comms.get_state(), ConnectionState::DISCONNECTED) + << "Initial state should be DISCONNECTED"; +} + +TEST_F(MqttCommunicationTestFixture, ConnectedStateAfterConnect) +{ + // After connect() is called, should be in CONNECTED state + auto mqtt_comms = MqttCommunication(broker_endpoint_, "test_connected_state"); + + ASSERT_NO_THROW(mqtt_comms.connect()); + + ASSERT_EQ(mqtt_comms.get_state(), ConnectionState::CONNECTED) + << "State should be CONNECTED after connect()"; + + ASSERT_NO_THROW(mqtt_comms.disconnect()); +} + +TEST_F(MqttCommunicationTestFixture, DisconnectedStateAfterDisconnect) +{ + // After disconnect() completes, should be in DISCONNECTED state + auto mqtt_comms = + MqttCommunication(broker_endpoint_, "test_disconnected_state"); + + ASSERT_NO_THROW(mqtt_comms.connect()); + ASSERT_EQ(mqtt_comms.get_state(), ConnectionState::CONNECTED); + + ASSERT_NO_THROW(mqtt_comms.disconnect()); + + ASSERT_EQ(mqtt_comms.get_state(), ConnectionState::DISCONNECTED) + << "State should be DISCONNECTED after disconnect()"; +} + +TEST_F(MqttCommunicationTestFixture, ReconnectAfterDisconnect) +{ + // Verify state transitions work correctly on reconnect + auto mqtt_comms = MqttCommunication(broker_endpoint_, "test_reconnect"); + + // First connection cycle + ASSERT_NO_THROW(mqtt_comms.connect()); + ASSERT_EQ(mqtt_comms.get_state(), ConnectionState::CONNECTED); + + ASSERT_NO_THROW(mqtt_comms.disconnect()); + ASSERT_EQ(mqtt_comms.get_state(), ConnectionState::DISCONNECTED); + + // Second connection cycle - should work the same + ASSERT_NO_THROW(mqtt_comms.connect()); + ASSERT_EQ(mqtt_comms.get_state(), ConnectionState::CONNECTED) + << "State should be CONNECTED after reconnect"; + + ASSERT_NO_THROW(mqtt_comms.disconnect()); + ASSERT_EQ(mqtt_comms.get_state(), ConnectionState::DISCONNECTED); +} + +TEST_F(MqttCommunicationTestFixture, MultipleDisconnectCallsSafe) +{ + // Verify multiple disconnect() calls don't cause issues + auto mqtt_comms = + MqttCommunication(broker_endpoint_, "test_multi_disconnect"); + + ASSERT_NO_THROW(mqtt_comms.connect()); + ASSERT_EQ(mqtt_comms.get_state(), ConnectionState::CONNECTED); + + // First disconnect + ASSERT_NO_THROW(mqtt_comms.disconnect()); + ASSERT_EQ(mqtt_comms.get_state(), ConnectionState::DISCONNECTED); + + // Second disconnect - should not throw or change state + ASSERT_NO_THROW(mqtt_comms.disconnect()); + ASSERT_EQ(mqtt_comms.get_state(), ConnectionState::DISCONNECTED); + + // Third disconnect - still safe + ASSERT_NO_THROW(mqtt_comms.disconnect()); + ASSERT_EQ(mqtt_comms.get_state(), ConnectionState::DISCONNECTED); +} diff --git a/vda5050_master/test/communication/mqtt/test_helpers.hpp b/vda5050_master/test/communication/mqtt/test_helpers.hpp new file mode 100644 index 0000000..3fc782c --- /dev/null +++ b/vda5050_master/test/communication/mqtt/test_helpers.hpp @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMUNICATION__MQTT__TEST_HELPERS_HPP_ +#define COMMUNICATION__MQTT__TEST_HELPERS_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "nlohmann/json.hpp" + +namespace vda5050_master::test::mqtt { + +// Test constants +namespace constants { +constexpr auto MQTT_POLL_INTERVAL = std::chrono::milliseconds(50); + +// Allow overriding MQTT broker via environment variable +// Default to localhost:1883 for local mosquitto broker +// Use localhost:1884 for automated Docker test broker by setting env var +// NOLINTNEXTLINE(runtime/string) +inline std::string get_mqtt_broker() +{ + const char* env_broker = std::getenv("VDA5050_TEST_MQTT_BROKER"); + if (env_broker != nullptr) + { + return std::string(env_broker); + } + return "tcp://localhost:1883"; +} + +// NOLINTNEXTLINE(runtime/string) +const std::string MQTT_BROKER = get_mqtt_broker(); +} // namespace constants +} // namespace vda5050_master::test::mqtt + +#endif // COMMUNICATION__MQTT__TEST_HELPERS_HPP_ diff --git a/vda5050_master/test/communication/ros2/communications.cpp b/vda5050_master/test/communication/ros2/communications.cpp new file mode 100644 index 0000000..5d3f58a --- /dev/null +++ b/vda5050_master/test/communication/ros2/communications.cpp @@ -0,0 +1,340 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include "../test_helpers.hpp" +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/string.hpp" +#include "test_helpers.hpp" +#include "vda5050_master/communication/ros2.hpp" +namespace { +auto payload_json = R"( + { + "happy": true, + "pi": 3.141 + } +)"_json; +} // namespace + +// Using declarations instead of using-directives for cpplint compliance +using vda5050_master::test::make_numbered_payloads; +using vda5050_master::test::make_test_topic; +using vda5050_master::test::verify_messages_in_order; +using vda5050_master::test::constants::default_payload_json; +using vda5050_master::test::ros2::publish_messages; +using vda5050_master::test::ros2::TestExecutor; +using vda5050_master::test::ros2::wait_for_condition_with_spin; +using vda5050_master::test::ros2::wait_for_discovery; +using vda5050_master::test::ros2::wait_for_publisher_discovery; + +class Ros2CommunicationTest : public ::testing::Test +{ +protected: + void SetUp() override + { + if (!rclcpp::ok()) + { + rclcpp::init(0, nullptr); + } + qos_ = 0; + container_ = std::make_shared>(); + } + + void TearDown() override + { + // Note: We don't shutdown rclcpp here to allow multiple tests to run + } + + std::string get_last_subscription_message() + { + auto msg = *(container_->begin()); + container_->erase(container_->begin()); + return msg; + } + std::shared_ptr> container_; + + int qos_; +}; + +TEST_F(Ros2CommunicationTest, SubscriptionTest) +{ + std::string topic = make_test_topic("ros2/subscription"); + std::string payload = default_payload_json().dump(); + + // Create node for Ros2Communication + auto test_node = std::make_shared("test_ros2_subscriber"); + auto ros2_comms = Ros2Communication(test_node, "test_id"); + ASSERT_NO_THROW(ros2_comms.connect()); + ros2_comms.subscribe( + topic, + [this](const std::string& topic, const std::string& payload) { + // Parse JSON and call the connection callback + container_->push_back(payload); + }, + qos_); + + // Create a separate publisher node + auto talker_node = std::make_shared("test_talker"); + auto talker_pub = + talker_node->create_publisher(topic, 10); + + // Create executor to spin both nodes + TestExecutor executor; + executor.add_node(test_node); + executor.add_node(talker_node); + + // Publish message + auto msg = std_msgs::msg::String(); + msg.data = payload; + talker_pub->publish(msg); + + bool received = executor.wait_for([&]() { return container_->size() > 0; }); + + // Verify message was received + ASSERT_TRUE(received); + ASSERT_EQ(payload, get_last_subscription_message()); + ASSERT_EQ(0, container_->size()); + + ASSERT_NO_THROW(ros2_comms.disconnect()); +} + +TEST_F(Ros2CommunicationTest, PublishTest) +{ + std::string topic = make_test_topic("ros2/publish"); + std::string payload = default_payload_json().dump(); + + std::atomic_bool ros2_received = false; + std::string received_payload; + + // Create listener node with subscription + auto listener_node = std::make_shared("test_listener"); + auto listener_sub = listener_node->create_subscription( + topic, rclcpp::QoS(10).best_effort().durability_volatile(), + [&](const std_msgs::msg::String::SharedPtr msg) { + ros2_received = true; + received_payload = msg->data; + }); + + // Create Ros2Communication instance + auto publisher_node = std::make_shared("test_ros2_publisher"); + auto ros2_comms = Ros2Communication(publisher_node, "test_id"); + ASSERT_NO_THROW(ros2_comms.connect()); + + TestExecutor executor; + executor.add_node(listener_node); + executor.add_node(publisher_node); + + // Publish message (publisher will be created and discovery will happen) + ros2_comms.send_message(topic, payload, qos_); + + // Wait for message to be received (allows time for discovery and transmission) + ASSERT_TRUE(executor.wait_for([&]() { return ros2_received.load(); })); + ASSERT_EQ(payload, received_payload); + ASSERT_NO_THROW(ros2_comms.disconnect()); +} + +TEST_F(Ros2CommunicationTest, MultipleMessagesTest) +{ + std::string topic = make_test_topic("ros2/multiple"); + auto payloads = make_numbered_payloads(3); // Helper generates test data + + // Create node for Ros2Communication + auto test_node = std::make_shared("test_ros2_multi"); + auto ros2_comms = Ros2Communication(test_node, "test_multi"); + ASSERT_NO_THROW(ros2_comms.connect()); + ros2_comms.subscribe( + topic, + [this](const std::string& topic, const std::string& payload) { + // Parse JSON and call the connection callback + container_->push_back(payload); + }, + qos_); + + // Create publisher node + auto talker_node = std::make_shared("test_talker_multi"); + auto talker_pub = + talker_node->create_publisher(topic, 10); + + // Create executor + TestExecutor executor; + executor.add_node(test_node); + executor.add_node(talker_node); + + // Use helper to publish multiple messages + publish_messages(talker_pub, payloads); + + // Wait for all messages using helper + bool all_received = + executor.wait_for([&]() { return container_->size() >= payloads.size(); }); + ASSERT_TRUE(all_received); + + // Use helper to verify messages in order + verify_messages_in_order( + [&]() { return get_last_subscription_message(); }, payloads, + [&](size_t expected) { ASSERT_EQ(expected, container_->size()); }); + + ASSERT_NO_THROW(ros2_comms.disconnect()); +} + +// ============================================================================= +// Lifecycle State Tests +// ============================================================================= + +TEST_F(Ros2CommunicationTest, InitialStateIsDisconnected) +{ + // Before connect() is called, should be in DISCONNECTED state + auto test_node = std::make_shared("test_initial_state"); + auto ros2_comms = Ros2Communication(test_node, "test_id"); + + ASSERT_EQ(ros2_comms.get_state(), ConnectionState::DISCONNECTED) + << "Initial state should be DISCONNECTED"; +} + +TEST_F(Ros2CommunicationTest, ConnectedStateAfterConnect) +{ + // After connect() is called, should be in CONNECTED state + auto test_node = std::make_shared("test_connected_state"); + auto ros2_comms = Ros2Communication(test_node, "test_id"); + + ASSERT_NO_THROW(ros2_comms.connect()); + + ASSERT_EQ(ros2_comms.get_state(), ConnectionState::CONNECTED) + << "State should be CONNECTED after connect()"; + + ASSERT_NO_THROW(ros2_comms.disconnect()); +} + +TEST_F(Ros2CommunicationTest, DisconnectedStateAfterDisconnect) +{ + // After disconnect() completes, should be in DISCONNECTED state + auto test_node = std::make_shared("test_disconnected_state"); + auto ros2_comms = Ros2Communication(test_node, "test_id"); + + ASSERT_NO_THROW(ros2_comms.connect()); + ASSERT_EQ(ros2_comms.get_state(), ConnectionState::CONNECTED); + + ASSERT_NO_THROW(ros2_comms.disconnect()); + + ASSERT_EQ(ros2_comms.get_state(), ConnectionState::DISCONNECTED) + << "State should be DISCONNECTED after disconnect()"; +} + +TEST_F(Ros2CommunicationTest, ReconnectAfterDisconnect) +{ + // Verify state transitions work correctly on reconnect + auto test_node = std::make_shared("test_reconnect"); + auto ros2_comms = Ros2Communication(test_node, "test_id"); + + // First connection cycle + ASSERT_NO_THROW(ros2_comms.connect()); + ASSERT_EQ(ros2_comms.get_state(), ConnectionState::CONNECTED); + + ASSERT_NO_THROW(ros2_comms.disconnect()); + ASSERT_EQ(ros2_comms.get_state(), ConnectionState::DISCONNECTED); + + // Second connection cycle - should work the same + ASSERT_NO_THROW(ros2_comms.connect()); + ASSERT_EQ(ros2_comms.get_state(), ConnectionState::CONNECTED) + << "State should be CONNECTED after reconnect"; + + ASSERT_NO_THROW(ros2_comms.disconnect()); + ASSERT_EQ(ros2_comms.get_state(), ConnectionState::DISCONNECTED); +} + +TEST_F(Ros2CommunicationTest, SendMessageFailsWhenNotConnected) +{ + // Verify send_message doesn't crash when not connected + auto test_node = std::make_shared("test_send_not_connected"); + auto ros2_comms = Ros2Communication(test_node, "test_id"); + + // Should not throw, just log warning + ASSERT_NO_THROW(ros2_comms.send_message("/test/topic", "test_payload", 0)); + + // Also test after disconnect + ASSERT_NO_THROW(ros2_comms.connect()); + ASSERT_NO_THROW(ros2_comms.disconnect()); + ASSERT_NO_THROW(ros2_comms.send_message("/test/topic", "test_payload", 0)); +} + +TEST_F(Ros2CommunicationTest, MultipleDisconnectCallsSafe) +{ + // Verify multiple disconnect() calls don't cause issues + auto test_node = std::make_shared("test_multi_disconnect"); + auto ros2_comms = Ros2Communication(test_node, "test_id"); + + ASSERT_NO_THROW(ros2_comms.connect()); + ASSERT_EQ(ros2_comms.get_state(), ConnectionState::CONNECTED); + + // First disconnect + ASSERT_NO_THROW(ros2_comms.disconnect()); + ASSERT_EQ(ros2_comms.get_state(), ConnectionState::DISCONNECTED); + + // Second disconnect - should not throw or change state + ASSERT_NO_THROW(ros2_comms.disconnect()); + ASSERT_EQ(ros2_comms.get_state(), ConnectionState::DISCONNECTED); + + // Third disconnect - still safe + ASSERT_NO_THROW(ros2_comms.disconnect()); + ASSERT_EQ(ros2_comms.get_state(), ConnectionState::DISCONNECTED); +} + +// TEST_F(Ros2CommunicationTest, QoSReliableTest) +// { +// std::string topic = "/test/ros2/qos_reliable"; +// std::string payload = payload_json.dump(); +// int qos = 1; // Reliable QoS + +// // Create node for Ros2Communication with QoS 1 (reliable) +// auto test_node = std::make_shared("test_ros2_qos"); +// auto ros2_comms = Ros2Communication(test_node, "test_qos"); +// ASSERT_NO_THROW(ros2_comms.connect()); +// ros2_comms.subscribe(topic, qos); + +// // Create a separate publisher node with matching QoS +// auto talker_node = std::make_shared("test_talker_qos"); +// auto talker_pub = talker_node->create_publisher( +// topic, rclcpp::QoS(10).reliable()); + +// // Create executor +// rclcpp::executors::SingleThreadedExecutor executor; +// executor.add_node(test_node); +// executor.add_node(talker_node); + +// // Publish message +// auto msg = std_msgs::msg::String(); +// msg.data = payload; +// talker_pub->publish(msg); + +// // Spin to process callbacks +// auto start_time = std::chrono::steady_clock::now(); +// while (ros2_comms.storage_[topic].empty() && +// (std::chrono::steady_clock::now() - start_time) < +// std::chrono::seconds(2)) { +// executor.spin_some(std::chrono::milliseconds(100)); +// } + +// // Verify message was received +// ASSERT_EQ(1, ros2_comms.storage_[topic].size()); +// ASSERT_EQ(payload, ros2_comms.get_last_subscription_message(topic)); + +// ASSERT_NO_THROW(ros2_comms.disconnect()); +// } diff --git a/vda5050_master/test/communication/ros2/test_helpers.hpp b/vda5050_master/test/communication/ros2/test_helpers.hpp new file mode 100644 index 0000000..77ded90 --- /dev/null +++ b/vda5050_master/test/communication/ros2/test_helpers.hpp @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMUNICATION__ROS2__TEST_HELPERS_HPP_ +#define COMMUNICATION__ROS2__TEST_HELPERS_HPP_ + +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/string.hpp" +#include "test_helpers.hpp" + +namespace vda5050_master::test::ros2 { +namespace constants { +constexpr auto ROS2_POLL_INTERVAL = std::chrono::milliseconds(100); +} // namespace constants +/** + * @brief Helper to publish multiple messages to a ROS2 topic + */ +inline void publish_messages( + rclcpp::Publisher::SharedPtr publisher, + const std::vector& payloads) +{ + for (const auto& payload : payloads) + { + auto msg = std_msgs::msg::String(); + msg.data = payload; + publisher->publish(msg); + } +} + +/** + * @brief Helper to wait for a condition while spinning ROS2 executor + */ +template +bool wait_for_condition_with_spin( + rclcpp::executors::SingleThreadedExecutor& executor, Condition condition, + std::chrono::milliseconds timeout = + vda5050_master::test::constants::DEFAULT_TIMEOUT, + std::chrono::milliseconds spin_interval = constants::ROS2_POLL_INTERVAL) +{ + auto start = std::chrono::steady_clock::now(); + while (!condition() && (std::chrono::steady_clock::now() - start) < timeout) + { + executor.spin_some(spin_interval); + } + return condition(); +} + +/** + * @brief Helper to wait for publisher-subscriber discovery + */ +template +bool wait_for_discovery( + rclcpp::executors::SingleThreadedExecutor& executor, + PublisherOrSubscriber pub_or_sub, size_t expected_count = 1) +{ + return wait_for_condition_with_spin( + executor, + [&]() { return pub_or_sub->get_subscription_count() >= expected_count; }, + vda5050_master::test::constants::DISCOVERY_TIMEOUT, + vda5050_master::test::constants::DISCOVERY_POLL_INTERVAL); +} + +/** + * @brief Specialization for subscriber checking publisher count + */ +inline bool wait_for_publisher_discovery( + rclcpp::executors::SingleThreadedExecutor& executor, + rclcpp::Subscription::SharedPtr subscription, + size_t expected_count = 1) +{ + return wait_for_condition_with_spin( + executor, + [&]() { return subscription->get_publisher_count() >= expected_count; }, + vda5050_master::test::constants::DISCOVERY_TIMEOUT, + vda5050_master::test::constants::DISCOVERY_POLL_INTERVAL); +} + +/** + * @brief Helper to create and configure a test executor with multiple nodes + */ +class TestExecutor +{ +public: + TestExecutor() + : executor_(std::make_unique()) + { + } + + void add_node(std::shared_ptr node) + { + executor_->add_node(node); + } + + rclcpp::executors::SingleThreadedExecutor& get() + { + return *executor_; + } + + template + bool wait_for( + Condition condition, std::chrono::milliseconds timeout = + vda5050_master::test::constants::DEFAULT_TIMEOUT) + { + return wait_for_condition_with_spin(*executor_, condition, timeout); + } + +private: + std::unique_ptr executor_; +}; + +} // namespace vda5050_master::test::ros2 + +#endif // COMMUNICATION__ROS2__TEST_HELPERS_HPP_ diff --git a/vda5050_master/test/communication/test_helpers.hpp b/vda5050_master/test/communication/test_helpers.hpp new file mode 100644 index 0000000..668ec82 --- /dev/null +++ b/vda5050_master/test/communication/test_helpers.hpp @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMUNICATION__TEST_HELPERS_HPP_ +#define COMMUNICATION__TEST_HELPERS_HPP_ + +#include +#include +#include +#include +#include + +#include "nlohmann/json.hpp" + +namespace vda5050_master::test { + +// Test constants +namespace constants { +constexpr auto DEFAULT_TIMEOUT = std::chrono::seconds(3); +constexpr auto DISCOVERY_TIMEOUT = std::chrono::seconds(2); +constexpr auto DISCOVERY_POLL_INTERVAL = std::chrono::milliseconds(10); +const char TEST_TOPIC_PREFIX[] = "/test/integration"; + +inline nlohmann::json default_payload_json() +{ + return R"( + { + "happy": true, + "pi": 3.141 + } + )"_json; +} +} // namespace constants +/** + * @brief Generic polling helper with timeout + * @param condition Lambda that returns true when condition is met + * @param timeout Maximum time to wait + * @param poll_interval How often to check the condition + * @return true if condition was met, false if timeout occurred + */ +template +bool wait_for_condition( + Condition condition, std::chrono::milliseconds poll_interval, + std::chrono::milliseconds timeout = constants::DEFAULT_TIMEOUT) +{ + auto start = std::chrono::steady_clock::now(); + while (!condition() && (std::chrono::steady_clock::now() - start) < timeout) + { + std::this_thread::sleep_for(poll_interval); + } + return condition(); +} + +/** + * @brief Helper to verify multiple messages in order + * @param get_message_func Function to retrieve next message + * @param expected_payloads List of expected payloads in order + * @param check_size_func Function to check current message count + */ +template +void verify_messages_in_order( + GetMessageFunc get_message_func, + const std::vector& expected_payloads, + CheckSizeFunc check_size_func) +{ + size_t expected_count = expected_payloads.size(); + for (size_t i = 0; i < expected_payloads.size(); ++i) + { + check_size_func(expected_count); + auto received = get_message_func(); + if (received != expected_payloads[i]) + { + throw std::runtime_error( + "Message " + std::to_string(i) + + " mismatch. Expected: " + expected_payloads[i] + ", Got: " + received); + } + expected_count--; + } + check_size_func(0); // Should be empty after all reads +} + +/** + * @brief Generate test topic name with suffix + */ +inline std::string make_test_topic(const std::string& suffix) +{ + return std::string(constants::TEST_TOPIC_PREFIX) + "/" + suffix; +} + +/** + * @brief Generate multiple JSON payloads with IDs + */ +inline std::vector make_numbered_payloads(size_t count) +{ + std::vector payloads; + payloads.reserve(count); + for (size_t i = 1; i <= count; ++i) + { + nlohmann::json j; + j["id"] = i; + payloads.push_back(j.dump()); + } + return payloads; +} + +} // namespace vda5050_master::test + +#endif // COMMUNICATION__TEST_HELPERS_HPP_ diff --git a/vda5050_master/test/communication/vda5050/vda5050_mqtt.cpp b/vda5050_master/test/communication/vda5050/vda5050_mqtt.cpp new file mode 100644 index 0000000..82ee734 --- /dev/null +++ b/vda5050_master/test/communication/vda5050/vda5050_mqtt.cpp @@ -0,0 +1,135 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include + +#include "../mqtt/test_helpers.hpp" +#include "vda5050_master/communication/mqtt.hpp" +#include "vda5050_master/vda5050_master/master.hpp" + +using vda5050_master::test::mqtt::constants::MQTT_BROKER; + +/** + * @brief Test implementation of VDA5050Master for unit testing + * + * This class provides a concrete implementation of the abstract VDA5050Master + * that stores received messages for test verification. + */ +class TestVDA5050Master : public VDA5050Master +{ +public: + explicit TestVDA5050Master(CommunicationFactory factory) + : VDA5050Master(std::move(factory)) + { + } + + // Thread-safe storage for received messages + std::mutex mutex_; + std::vector> + received_connections_; + std::vector> + received_states_; + std::vector> + received_visualizations_; + std::atomic connection_count_{0}; + std::atomic state_count_{0}; + +protected: + void on_connection( + const std::string& agv_id, + const vda5050_msgs::msg::Connection& msg) override + { + std::lock_guard lock(mutex_); + received_connections_.push_back({agv_id, msg}); + connection_count_++; + } + + void on_state( + const std::string& agv_id, const vda5050_msgs::msg::State& msg) override + { + std::lock_guard lock(mutex_); + received_states_.push_back({agv_id, msg}); + state_count_++; + } + + void on_visualization( + const std::string& agv_id, + const vda5050_msgs::msg::Visualization& msg) override + { + std::lock_guard lock(mutex_); + received_visualizations_.push_back({agv_id, msg}); + } +}; + +class VDA5050MasterTest : public ::testing::Test +{ +protected: + void SetUp() override + { + broker_endpoint_ = MQTT_BROKER; + // Create a factory that creates MqttCommunication instances + mqtt_factory_ = [this](const std::string& agv_id) { + return std::make_unique(broker_endpoint_, agv_id); + }; + } + + void TearDown() override + { + // Note: We don't shutdown rclcpp here to allow multiple tests to run + } + + std::string broker_endpoint_; + CommunicationFactory mqtt_factory_; +}; + +TEST_F(VDA5050MasterTest, Setup) +{ + auto master = std::make_unique(mqtt_factory_); + + // Register an AGV (this creates and connects the communication) + ASSERT_NO_THROW(master->register_agv("test_manufacturer", "test_serial")); + ASSERT_TRUE(master->is_agv_registered("test_manufacturer", "test_serial")); + + // Unregister to clean up (disconnects the AGV's communication) + ASSERT_NO_THROW(master->unregister_agv("test_manufacturer", "test_serial")); +} + +TEST_F(VDA5050MasterTest, AGVRegistrationAndUnregistration) +{ + auto master = std::make_unique(mqtt_factory_); + + // Register AGVs (each creates and connects its own communication) + master->register_agv("manufacturer1", "serial1"); + master->register_agv("manufacturer2", "serial2"); + + ASSERT_TRUE(master->is_agv_registered("manufacturer1", "serial1")); + ASSERT_TRUE(master->is_agv_registered("manufacturer2", "serial2")); + ASSERT_FALSE(master->is_agv_registered("unknown", "unknown")); + + // Unregister one AGV + master->unregister_agv("manufacturer1", "serial1"); + ASSERT_FALSE(master->is_agv_registered("manufacturer1", "serial1")); + ASSERT_TRUE(master->is_agv_registered("manufacturer2", "serial2")); + + // Cleanup + master->unregister_agv("manufacturer2", "serial2"); +} diff --git a/vda5050_master/test/integration/heartbeat.cpp b/vda5050_master/test/integration/heartbeat.cpp new file mode 100644 index 0000000..130257f --- /dev/null +++ b/vda5050_master/test/integration/heartbeat.cpp @@ -0,0 +1,367 @@ +/* + * Copyright (C) 2025 ROS-Industrial Consortium Asia Pacific + * Advanced Remanufacturing and Technology Centre + * A*STAR Research Entities (Co. Registration No. 199702110H) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "vda5050_master/communication/heartbeat.hpp" + +#include + +#include +#include + +#include "vda5050_master/standard_names.hpp" + +using vda5050_master::communication::HeartbeatState; + +class MockConnectionHeartbeatListener +: public vda5050_master::communication::ConnectionHeartbeatListener +{ +public: + MockConnectionHeartbeatListener( + const std::string& id, const int heartbeat_interval, + std::function disconnection_callback, int time_to_skip = 0) + : ConnectionHeartbeatListener(id, heartbeat_interval, disconnection_callback), + time_to_skip_(time_to_skip) + { + // ASSERT_NEAR( + // std::chrono::duration_cast( + // get_current_time().time_since_epoch()) + // .count(), + // std::chrono::duration_cast( + // std::chrono::steady_clock::now().time_since_epoch()) + // .count(), + // time_to_skip); + + // ASSERT_NEAR( + // std::chrono::duration_cast( + // get_last_connection_report().time_since_epoch()) + // .count(), + // std::chrono::duration_cast( + // std::chrono::steady_clock::now().time_since_epoch()) + // .count(), + // time_to_skip); + + // ASSERT_NEAR( + // std::chrono::duration_cast( + // get_last_connection_report().time_since_epoch()) + // .count(), + // std::chrono::duration_cast( + // get_current_time().time_since_epoch()) + // .count(), + // 1e9); + + start_connection_heartbeat(); + } + + std::chrono::steady_clock::time_point get_current_time() override + { + VDA5050_INFO( + "get_current_time with skip of " + std::to_string(time_to_skip_)); + return std::chrono::steady_clock::now() + + std::chrono::seconds(time_to_skip_); + } + + // Override check interval to speed up tests + // Instead of waiting 15 seconds, wait only 1 second + int get_check_interval() override + { + return 1; // Check every 1 second in tests (15x faster than production) + } + + ~MockConnectionHeartbeatListener() + { + VDA5050_INFO("MockConnectionHeartbeatListener destroyed"); + } + + void trigger_timeout() + { + std::this_thread::sleep_for(std::chrono::seconds(1)); + message_received_.notify_all(); + } + + int time_to_skip_; +}; + +TEST(ConnectionHeartbeatListenerTest, HeartbeatListenerInit) +{ + auto hb_listener = MockConnectionHeartbeatListener( + "test_listener", vda5050_master::ConnectionHeartbeatInterval, + [&]() { + // Timeout callback + VDA5050_INFO("Timeout callback"); + }, + vda5050_master::ConnectionHeartbeatInterval - 1); + + ASSERT_EQ(hb_listener.get_state(), HeartbeatState::RUNNING); + ASSERT_NO_THROW(hb_listener.stop_connection_heartbeat()); + ASSERT_EQ(hb_listener.get_state(), HeartbeatState::STOPPED); +} + +TEST(ConnectionHeartbeatListenerTest, HeartbeatReceivedNoTimeout) +{ + auto hb_listener = MockConnectionHeartbeatListener( + "test_listener", vda5050_master::ConnectionHeartbeatInterval, + [&]() { + // Timeout callback + VDA5050_INFO("Timeout callback"); + }, + vda5050_master::ConnectionHeartbeatInterval - 1); + + ASSERT_EQ(hb_listener.get_state(), HeartbeatState::RUNNING); + // std::this_thread::sleep_for(std::chrono::seconds(1)); + hb_listener.received_connection(); + + ASSERT_NEAR( + std::chrono::duration_cast( + hb_listener.get_last_connection_report().time_since_epoch()) + .count(), + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(), + vda5050_master::ConnectionHeartbeatInterval - 1); + + ASSERT_NO_THROW(hb_listener.stop_connection_heartbeat()); +} + +TEST(ConnectionHeartbeatListenerTest, HeartbeatNotReceivedTimeout) +{ + std::atomic heartbeat_failed{false}; + auto hb_listener = MockConnectionHeartbeatListener( + "test_listener", vda5050_master::ConnectionHeartbeatInterval, + [&heartbeat_failed]() { + // Timeout callback + VDA5050_INFO("Timeout callback"); + VDA5050_INFO( + "Heartbeat_failed before store: " + + std::to_string(heartbeat_failed.load())); + heartbeat_failed.store(true); + VDA5050_INFO( + "Heartbeat_failed after store: " + + std::to_string(heartbeat_failed.load())); + // ASSERT_TRUE(heartbeat_failed->load()); + }, + vda5050_master::ConnectionHeartbeatInterval + 1); + hb_listener.trigger_timeout(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ASSERT_NO_THROW(hb_listener.~MockConnectionHeartbeatListener()); + ASSERT_TRUE(heartbeat_failed.load()); +} + +TEST(ConnectionHeartbeatListenerTest, HeartbeatReceivedTimeout) +{ + std::atomic heartbeat_failed{false}; + auto hb_listener = MockConnectionHeartbeatListener( + "test_listener", vda5050_master::ConnectionHeartbeatInterval, + [&heartbeat_failed]() { + // Timeout callback + VDA5050_INFO("Timeout callback"); + heartbeat_failed.store(true); + }, + vda5050_master::ConnectionHeartbeatInterval + 1); + + hb_listener.trigger_timeout(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + hb_listener.received_connection(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ASSERT_NO_THROW(hb_listener.~MockConnectionHeartbeatListener()); + ASSERT_TRUE(heartbeat_failed.load()); +} + +TEST(ConnectionHeartbeatListenerTest, GracefulShutdownDoesNotBlock) +{ + std::atomic callback_called{false}; + + // Use a short interval but time_to_skip that won't cause immediate timeout + auto hb_listener = std::make_unique( + "test_listener", vda5050_master::ConnectionHeartbeatInterval, + [&callback_called]() { callback_called.store(true); }, + 0); // time_to_skip = 0, so no immediate timeout + + ASSERT_EQ(hb_listener->get_state(), HeartbeatState::RUNNING); + + // This should complete quickly, not block forever + auto start = std::chrono::steady_clock::now(); + hb_listener->stop_connection_heartbeat(); + auto elapsed = std::chrono::steady_clock::now() - start; + + // Should complete within 2 seconds (1 second check interval + margin) + ASSERT_LT( + std::chrono::duration_cast(elapsed).count(), 2) + << "stop_connection_heartbeat() blocked too long"; + + ASSERT_EQ(hb_listener->get_state(), HeartbeatState::STOPPED); + ASSERT_FALSE(callback_called.load()) + << "Callback should NOT be called during graceful shutdown"; +} + +TEST(ConnectionHeartbeatListenerTest, StateIsRunningWhileCallbackExecutes) +{ + std::atomic callback_started{false}; + std::atomic callback_finished{false}; + std::atomic was_running_during_callback{false}; + + // We need a raw pointer to check get_state() from within callback + MockConnectionHeartbeatListener* listener_ptr = nullptr; + + auto hb_listener = std::make_unique( + "test_listener", vda5050_master::ConnectionHeartbeatInterval, + [&callback_started, &callback_finished, &was_running_during_callback, + &listener_ptr]() { + callback_started.store(true); + + // Check get_state() during callback execution + if (listener_ptr) + { + was_running_during_callback.store( + listener_ptr->get_state() == HeartbeatState::RUNNING); + } + + // Simulate work + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + callback_finished.store(true); + }, + vda5050_master::ConnectionHeartbeatInterval + + 1); // Causes immediate timeout + + listener_ptr = hb_listener.get(); + + // Trigger the timeout + hb_listener->trigger_timeout(); + + // Wait for callback to finish + while (!callback_finished.load()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + // Now stop - should be quick since callback already finished + hb_listener->stop_connection_heartbeat(); + + ASSERT_TRUE(callback_started.load()) << "Callback should have started"; + ASSERT_TRUE(callback_finished.load()) << "Callback should have finished"; + ASSERT_TRUE(was_running_during_callback.load()) + << "get_state() should return RUNNING while callback is executing"; + ASSERT_EQ(hb_listener->get_state(), HeartbeatState::STOPPED) + << "get_state() should return STOPPED after stop completes"; +} + +TEST(ConnectionHeartbeatListenerTest, StateIsStoppedOnlyAfterFullStop) +{ + std::atomic stop_initiated{false}; + std::atomic stop_completed{false}; + std::atomic state_during_stop{HeartbeatState::RUNNING}; + + auto hb_listener = std::make_unique( + "test_listener", vda5050_master::ConnectionHeartbeatInterval, + []() { /* No-op callback */ }, + 0); // No immediate timeout + + ASSERT_EQ(hb_listener->get_state(), HeartbeatState::RUNNING); + + // Start stop in a separate thread so we can observe state + std::thread stop_thread([&]() { + stop_initiated.store(true); + hb_listener->stop_connection_heartbeat(); + stop_completed.store(true); + }); + + // Wait for stop to be initiated + while (!stop_initiated.load()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + // Give time for stop to progress + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + if (!stop_completed.load()) + { + state_during_stop.store(hb_listener->get_state()); + } + + stop_thread.join(); + + // After stop completes, must be STOPPED + ASSERT_EQ(hb_listener->get_state(), HeartbeatState::STOPPED) + << "get_state() must be STOPPED after stop_connection_heartbeat() returns"; +} + +TEST(ConnectionHeartbeatListenerTest, MultipleStopCallsSafe) +{ + auto hb_listener = std::make_unique( + "test_listener", vda5050_master::ConnectionHeartbeatInterval, + []() { /* No-op */ }, 0); + + ASSERT_EQ(hb_listener->get_state(), HeartbeatState::RUNNING); + + ASSERT_NO_THROW(hb_listener->stop_connection_heartbeat()); + ASSERT_EQ(hb_listener->get_state(), HeartbeatState::STOPPED); + + ASSERT_NO_THROW(hb_listener->stop_connection_heartbeat()); + ASSERT_EQ(hb_listener->get_state(), HeartbeatState::STOPPED); + + ASSERT_NO_THROW(hb_listener.reset()); +} + +// TEST(ConnectionHeartbeatListenerTest, HeartbeatReceivedNoTimeout) +// { +// std::string broker = "tcp://test.mosquitto.org:1883"; +// std::string topic = InterfaceName + "rmf2" + "/" + Version + +// "/test_manufacturer/test_serial_number/connection"; +// std::string payload = "test"; +// int qos = 0; + +// std::atomic_bool received = false; + +// std::atomic_bool heartbeat_received = false; + +// auto listener = +// vda5050_core::mqtt_client::create_default_client(broker, "listener"); +// ASSERT_NO_THROW(listener->connect()); +// ASSERT_NO_THROW(listener->subscribe( +// topic, +// [&](const std::string & topic_, const std::string & payload_) { +// received = true; +// ASSERT_EQ(topic, topic_); +// ASSERT_EQ(payload, payload_); +// }, +// qos)); + +// auto hb_listener = ConnectionHeartbeatListener( +// topic, +// ConnectionHeartbeatInterval, +// [&]() { +// // Timeout callback +// heartbeat_received = true; +// }); + +// std::this_thread::sleep_for(std::chrono::seconds(ConnectionHeartbeatInterval - 1)); +// auto talker = +// vda5050_core::mqtt_client::create_default_client(broker, "talker"); +// ASSERT_NO_THROW(talker->connect()); +// auto talker_pub_time = std::chrono::steady_clock::now(); +// ASSERT_NO_THROW(talker->publish(topic, payload, ConnectionQos)); + +// ASSERT_TRUE(received); +// double time_diff = std::chrono::duration(hb_listener.get_last_connection_report() - +// talker_pub_time).count(); +// // ASSERT_TRUE(std::chrono::duration>(time_diff) < 1.0); +// ASSERT_NEAR(time_diff, 1, 0.01); +// ASSERT_NO_THROW(talker->disconnect()); +// ASSERT_NO_THROW(listener->disconnect()); +// }