From ade5822aba552f3839cf077daea44dc26869026b Mon Sep 17 00:00:00 2001 From: Ussama Naal Date: Tue, 16 Apr 2024 07:42:07 -0700 Subject: [PATCH] Implement lock free ring buffer with throttling --- ouster-ros/CMakeLists.txt | 14 +- ouster-ros/config/community_driver.rviz | 5 - ouster-ros/config/viz-reliable.rviz | 5 - ouster-ros/src/lidar_packet_handler.h | 117 +++++++++-- ouster-ros/src/lock_free_ring_buffer.h | 89 ++++++++ .../test/lock_free_ring_buffer_test.cpp | 193 ++++++++++++++++++ ouster-ros/test/ring_buffer_test.cpp | 10 +- ouster-ros/test/test_main.cpp | 7 + 8 files changed, 400 insertions(+), 40 deletions(-) create mode 100644 ouster-ros/src/lock_free_ring_buffer.h create mode 100644 ouster-ros/test/lock_free_ring_buffer_test.cpp create mode 100644 ouster-ros/test/test_main.cpp diff --git a/ouster-ros/CMakeLists.txt b/ouster-ros/CMakeLists.txt index b56cf112..03196f8c 100644 --- a/ouster-ros/CMakeLists.txt +++ b/ouster-ros/CMakeLists.txt @@ -169,12 +169,14 @@ rclcpp_components_register_node(os_driver_component if(BUILD_TESTING) find_package(ament_cmake_gtest REQUIRED) ament_add_gtest(${PROJECT_NAME}_test - test/ring_buffer_test.cpp - src/os_ros.cpp - test/point_accessor_test.cpp - test/point_transform_test.cpp - test/point_cloud_compose_test.cpp - ) + src/os_ros.cpp + test/test_main.cpp + test/ring_buffer_test.cpp + test/lock_free_ring_buffer_test.cpp + test/point_accessor_test.cpp + test/point_transform_test.cpp + test/point_cloud_compose_test.cpp +) ament_target_dependencies(${PROJECT_NAME}_test rclcpp ouster_sensor_msgs diff --git a/ouster-ros/config/community_driver.rviz b/ouster-ros/config/community_driver.rviz index 927c7778..e74c26cf 100644 --- a/ouster-ros/config/community_driver.rviz +++ b/ouster-ros/config/community_driver.rviz @@ -28,11 +28,6 @@ Panels: - /Current View1 Name: Views Splitter Ratio: 0.5 - - Class: rviz_common/Time - Experimental: false - Name: Time - SyncMode: 0 - SyncSource: PointCloud2 Visualization Manager: Class: "" Displays: diff --git a/ouster-ros/config/viz-reliable.rviz b/ouster-ros/config/viz-reliable.rviz index 42a2301f..b5097c8c 100644 --- a/ouster-ros/config/viz-reliable.rviz +++ b/ouster-ros/config/viz-reliable.rviz @@ -28,11 +28,6 @@ Panels: - /Current View1 Name: Views Splitter Ratio: 0.5 - - Class: rviz_common/Time - Experimental: false - Name: Time - SyncMode: 0 - SyncSource: PointCloud2 Visualization Manager: Class: "" Displays: diff --git a/ouster-ros/src/lidar_packet_handler.h b/ouster-ros/src/lidar_packet_handler.h index df94b4ef..d0bc8e10 100644 --- a/ouster-ros/src/lidar_packet_handler.h +++ b/ouster-ros/src/lidar_packet_handler.h @@ -16,6 +16,9 @@ #include +#include "lock_free_ring_buffer.h" +#include + namespace { template @@ -73,12 +76,26 @@ class LidarPacketHandler { const std::vector& handlers, const std::string& timestamp_mode, int64_t ptp_utc_tai_offset) - : lidar_scan_handlers{handlers} { + : ring_buffer(LIDAR_SCAN_COUNT), lidar_scan_handlers{handlers} { // initialize lidar_scan processor and buffer scan_batcher = std::make_unique(info); - lidar_scan = std::make_unique( - info.format.columns_per_frame, info.format.pixels_per_column, - info.format.udp_profile_lidar); + + lidar_scans.resize(LIDAR_SCAN_COUNT); + mutexes.resize(LIDAR_SCAN_COUNT); + + for (size_t i = 0; i < lidar_scans.size(); ++i) { + lidar_scans[i] = std::make_unique( + info.format.columns_per_frame, info.format.pixels_per_column, + info.format.udp_profile_lidar); + mutexes[i] = std::make_shared(); // make_unique? + } + + lidar_scans_processing_thread = std::make_unique([this]() { + while (lidar_scans_processing_active) { + process_scans(); + } + // TODO: NODELET_DEBUG("lidar_scans_processing_thread done."); + }); // initalize time handlers scan_col_ts_spacing_ns = compute_scan_col_ts_spacing_ns(info.mode); @@ -125,14 +142,37 @@ class LidarPacketHandler { info, handlers, timestamp_mode, ptp_utc_tai_offset); return [handler](const uint8_t* lidar_buf) { if (handler->lidar_packet_accumlator(lidar_buf)) { - for (auto h : handler->lidar_scan_handlers) { - h(*handler->lidar_scan, handler->lidar_scan_estimated_ts, - handler->lidar_scan_estimated_msg_ts); - } + handler->ring_buffer_has_elements.notify_one(); } }; } + const std::string getName() const { return "lidar_packet_hander"; } + + void process_scans() { + + { + std::unique_lock index_lock(ring_buffer_mutex); + ring_buffer_has_elements.wait(index_lock, [this] { + return !ring_buffer.empty(); + }); + } + + std::unique_lock lock(*mutexes[ring_buffer.read_head()]); + + for (auto h : lidar_scan_handlers) { + h(*lidar_scans[ring_buffer.read_head()], lidar_scan_estimated_ts, + lidar_scan_estimated_msg_ts); + } + + size_t read_step = 1; + if (ring_buffer.size() > 7) { + // TODO: NODELET_WARN("lidar_scans full, THROTTLING"); + read_step = 2; + } + ring_buffer.read(read_step); + } + // time interpolation methods uint64_t impute_value(int last_scan_last_nonzero_idx, uint64_t last_scan_last_nonzero_value, @@ -221,21 +261,45 @@ class LidarPacketHandler { bool lidar_handler_sensor_time(const sensor::packet_format&, const uint8_t* lidar_buf) { - if (!(*scan_batcher)(lidar_buf, *lidar_scan)) return false; - lidar_scan_estimated_ts = compute_scan_ts(lidar_scan->timestamp()); + + if (ring_buffer.full()) { + // TODO: NODELET_WARN("lidar_scans full, DROPPING PACKET"); + return false; + } + + std::unique_lock lock(*(mutexes[ring_buffer.write_head()])); + + if (!(*scan_batcher)(lidar_buf, *lidar_scans[ring_buffer.write_head()])) return false; + lidar_scan_estimated_ts = compute_scan_ts(lidar_scans[ring_buffer.write_head()]->timestamp()); lidar_scan_estimated_msg_ts = rclcpp::Time(lidar_scan_estimated_ts); + + ring_buffer.write(); + return true; } bool lidar_handler_sensor_time_ptp(const sensor::packet_format&, const uint8_t* lidar_buf, int64_t ptp_utc_tai_offset) { - if (!(*scan_batcher)(lidar_buf, *lidar_scan)) return false; - auto ts_v = lidar_scan->timestamp(); + + if (ring_buffer.full()) { + // TODO: NODELET_WARN("lidar_scans full, DROPPING PACKET"); + return false; + } + + std::unique_lock lock( + *(mutexes[ring_buffer.write_head()])); + + if (!(*scan_batcher)(lidar_buf, *lidar_scans[ring_buffer.write_head()])) return false; + auto ts_v = lidar_scans[ring_buffer.write_head()]->timestamp(); for (int i = 0; i < ts_v.rows(); ++i) ts_v[i] = impl::ts_safe_offset_add(ts_v[i], ptp_utc_tai_offset); lidar_scan_estimated_ts = compute_scan_ts(ts_v); - lidar_scan_estimated_msg_ts = rclcpp::Time(lidar_scan_estimated_ts); + lidar_scan_estimated_msg_ts = + rclcpp::Time(lidar_scan_estimated_ts); + + ring_buffer.write(); + return true; } @@ -247,12 +311,24 @@ class LidarPacketHandler { pf, lidar_buf, packet_receive_time); // first point cloud time lidar_handler_ros_time_frame_ts_initialized = true; } - if (!(*scan_batcher)(lidar_buf, *lidar_scan)) return false; - lidar_scan_estimated_ts = compute_scan_ts(lidar_scan->timestamp()); + + if (ring_buffer.full()) { + // TODO: NODELET_WARN("lidar_scans full, DROPPING PACKET"); + return false; + } + + std::unique_lock lock( + *(mutexes[ring_buffer.write_head()])); + + if (!(*scan_batcher)(lidar_buf, *lidar_scans[ring_buffer.write_head()])) return false; + lidar_scan_estimated_ts = compute_scan_ts(lidar_scans[ring_buffer.write_head()]->timestamp()); lidar_scan_estimated_msg_ts = lidar_handler_ros_time_frame_ts; // set time for next point cloud msg lidar_handler_ros_time_frame_ts = extrapolate_frame_ts(pf, lidar_buf, packet_receive_time); + + ring_buffer.write(); + return true; } @@ -265,7 +341,12 @@ class LidarPacketHandler { private: std::unique_ptr scan_batcher; - std::unique_ptr lidar_scan; + const int LIDAR_SCAN_COUNT = 10; + LockFreeRingBuffer ring_buffer; + std::mutex ring_buffer_mutex; + std::vector> lidar_scans; + std::vector> mutexes; + uint64_t lidar_scan_estimated_ts; rclcpp::Time lidar_scan_estimated_msg_ts; @@ -284,6 +365,10 @@ class LidarPacketHandler { std::vector lidar_scan_handlers; LidarPacketAccumlator lidar_packet_accumlator; + + bool lidar_scans_processing_active = true; + std::unique_ptr lidar_scans_processing_thread; + std::condition_variable ring_buffer_has_elements; }; } // namespace ouster_ros \ No newline at end of file diff --git a/ouster-ros/src/lock_free_ring_buffer.h b/ouster-ros/src/lock_free_ring_buffer.h new file mode 100644 index 00000000..080f85fa --- /dev/null +++ b/ouster-ros/src/lock_free_ring_buffer.h @@ -0,0 +1,89 @@ +/** + * Copyright (c) 2018-2024, Ouster, Inc. + * All rights reserved. + * + * @file thread_safe_ring_buffer.h + * @brief File contains thread safe implementation of a ring buffer + */ + +#pragma once + +#include +#include +#include +#include + +/** + * @class LockFreeRingBuffer thread safe ring buffer. + * + * @remarks current implementation has effective (capacity-1) when writing elements + */ +class LockFreeRingBuffer { + public: + LockFreeRingBuffer(size_t capacity) + : capacity_(capacity), + write_idx_(0), + read_idx_(0) {} + + /** + * Gets the maximum number of items that this ring buffer can hold. + */ + size_t capacity() const { return capacity_; } + + /** + * Gets the number of item that currently occupy the ring buffer. This + * number would vary between 0 and the capacity(). + * + * @remarks + * if returned value was 0 or the value was equal to the buffer capacity(), + * this does not guarantee that a subsequent call to read() or write() + * wouldn't cause the calling thread to be blocked. + */ + size_t size() const { + return write_idx_ >= read_idx_ ? + write_idx_ - read_idx_ : + write_idx_ + capacity_ - read_idx_; + } + + size_t available() const { + return capacity_ - 1 - size(); + } + + /** + * Checks if the ring buffer is empty. + */ + bool empty() const { + return read_idx_ == write_idx_; + } + + /** + * Checks if the ring buffer is full. + */ + bool full() const { + return read_idx_ == (write_idx_ + 1) % capacity_; + } + + /** + */ + bool write(size_t count = 1) { + if (count > available()) return false; + write_idx_ = (write_idx_ + count) % capacity_; + return true; + } + + /** + */ + bool read(size_t count = 1) { + if (count > size()) return false; + read_idx_ = (read_idx_ + count) % capacity_; + return true; + } + + size_t write_head() const { return write_idx_; } + size_t read_head() const { return read_idx_; } + + private: + const size_t capacity_; + std::atomic write_idx_; + std::atomic read_idx_; +}; \ No newline at end of file diff --git a/ouster-ros/test/lock_free_ring_buffer_test.cpp b/ouster-ros/test/lock_free_ring_buffer_test.cpp new file mode 100644 index 00000000..b88d79e0 --- /dev/null +++ b/ouster-ros/test/lock_free_ring_buffer_test.cpp @@ -0,0 +1,193 @@ +#include +#include +#include +#include +#include "../src/lock_free_ring_buffer.h" + +using namespace std::chrono_literals; + +class LockFreeRingBufferTest : public ::testing::Test { + protected: + static constexpr int ITEM_COUNT = 3; // number of item the buffer could hold + + void SetUp() override { + buffer = std::make_unique(ITEM_COUNT); + } + + void TearDown() override { + buffer.reset(); + } + + std::unique_ptr buffer; +}; + + +TEST_F(LockFreeRingBufferTest, ReadWriteToBufferFullEmpty) { + + assert (ITEM_COUNT > 1 && "or this test can't run"); + + EXPECT_EQ(buffer->capacity(), ITEM_COUNT); + EXPECT_EQ(buffer->available(), ITEM_COUNT - 1); + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); + + for (int i = 0; i < ITEM_COUNT - 1; ++i) { + buffer->write(); + } + + EXPECT_FALSE(buffer->empty()); + EXPECT_TRUE(buffer->full()); + + // remove one item + buffer->read(); + + EXPECT_FALSE(buffer->empty()); + EXPECT_FALSE(buffer->full()); + + for (int i = 1; i < ITEM_COUNT - 1; ++i) { + buffer->read(); + } + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); +} + +TEST_F(LockFreeRingBufferTest, ReadWriteToBufferCheckReturn) { + + assert (ITEM_COUNT > 1 && "or this test can't run"); + + EXPECT_TRUE(buffer->empty()); + + for (int i = 0; i < ITEM_COUNT - 1; ++i) { + EXPECT_TRUE(buffer->write()); + } + + EXPECT_TRUE(buffer->full()); + EXPECT_FALSE(buffer->write()); + + // remove one item and re-write + EXPECT_TRUE(buffer->read()); + EXPECT_TRUE(buffer->write()); + + for (int i = 0; i < ITEM_COUNT - 1; ++i) { + EXPECT_TRUE(buffer->read()); + } + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->read()); +} + + +TEST_F(LockFreeRingBufferTest, ReadWriteToBufferSizeAvailable) { + + assert (ITEM_COUNT == 3 && "or this test can't run"); + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); + + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + EXPECT_EQ(buffer->write_head(), 0); + EXPECT_EQ(buffer->read_head(), 0); + + EXPECT_TRUE(buffer->write()); + EXPECT_EQ(buffer->size(), 1); + EXPECT_EQ(buffer->available(), 1); + EXPECT_EQ(buffer->write_head(), 1); + EXPECT_EQ(buffer->read_head(), 0); + + EXPECT_TRUE(buffer->write()); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + EXPECT_EQ(buffer->write_head(), 2); + EXPECT_EQ(buffer->read_head(), 0); + + EXPECT_TRUE(buffer->read()); + EXPECT_EQ(buffer->size(), 1); + EXPECT_EQ(buffer->available(), 1); + EXPECT_EQ(buffer->write_head(), 2); + EXPECT_EQ(buffer->read_head(), 1); + + EXPECT_TRUE(buffer->write()); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + EXPECT_EQ(buffer->write_head(), 0); + EXPECT_EQ(buffer->read_head(), 1); + + // Next write should fail, so size shouldn't change + EXPECT_FALSE(buffer->write()); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + EXPECT_EQ(buffer->write_head(), 0); + EXPECT_EQ(buffer->read_head(), 1); + + EXPECT_TRUE(buffer->read()); + EXPECT_EQ(buffer->size(), 1); + EXPECT_EQ(buffer->available(), 1); + EXPECT_EQ(buffer->write_head(), 0); + EXPECT_EQ(buffer->read_head(), 2); + + EXPECT_TRUE(buffer->read()); + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + EXPECT_EQ(buffer->write_head(), 0); + EXPECT_EQ(buffer->read_head(), 0); + + // Next read should fail, so size shouldn't change + EXPECT_FALSE(buffer->read()); + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + EXPECT_EQ(buffer->write_head(), 0); + EXPECT_EQ(buffer->read_head(), 0); +} + +TEST_F(LockFreeRingBufferTest, ReadWriteToBufferAdvanceMultiple) { + + assert (ITEM_COUNT == 3 && "or this test can't run"); + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); + + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + + EXPECT_TRUE(buffer->write(2)); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + + // This write should fail since we advance beyond capacity, so size shouldn't change + EXPECT_FALSE(buffer->write(2)); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + + EXPECT_TRUE(buffer->read()); + EXPECT_EQ(buffer->size(), 1); + EXPECT_EQ(buffer->available(), 1); + + EXPECT_TRUE(buffer->read()); + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + + EXPECT_TRUE(buffer->write(2)); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + + // Any additional write will also fail, size shouldn't change + EXPECT_FALSE(buffer->write()); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + + EXPECT_TRUE(buffer->read(2)); + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + + // This read should fail since we advance beyond available, so size shouldn't change + EXPECT_FALSE(buffer->read(2)); + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + + // Any subsequent read will also fail, size shouldn't change + EXPECT_FALSE(buffer->read()); + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); +} diff --git a/ouster-ros/test/ring_buffer_test.cpp b/ouster-ros/test/ring_buffer_test.cpp index 8d8bd7f4..e68ab29a 100644 --- a/ouster-ros/test/ring_buffer_test.cpp +++ b/ouster-ros/test/ring_buffer_test.cpp @@ -8,8 +8,8 @@ using namespace std::chrono_literals; class ThreadSafeRingBufferTest : public ::testing::Test { protected: - static const int ITEM_SIZE = 4; // predefined size for all items used in - static const int ITEM_COUNT = 3; // number of item the buffer could hold + static constexpr int ITEM_SIZE = 4; // predefined size for all items used in + static constexpr int ITEM_COUNT = 3; // number of item the buffer could hold void SetUp() override { buffer = std::make_unique(ITEM_SIZE, ITEM_COUNT); @@ -173,9 +173,3 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferWithOverwrite) { EXPECT_EQ(target[i], "0000"); } } - -int main(int argc, char** argv) -{ - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} \ No newline at end of file diff --git a/ouster-ros/test/test_main.cpp b/ouster-ros/test/test_main.cpp new file mode 100644 index 00000000..073f2adc --- /dev/null +++ b/ouster-ros/test/test_main.cpp @@ -0,0 +1,7 @@ +#include + +int main(int argc, char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file