From 58cc09489353a813d26be2bd3e3c43cc99851611 Mon Sep 17 00:00:00 2001 From: Ussama Naal Date: Sat, 6 Apr 2024 13:55:59 -0700 Subject: [PATCH] implement lock free ring_buffer with throttling --- CMakeLists.txt | 4 +- src/lidar_packet_handler.h | 117 +++++++++++++--- src/lock_free_ring_buffer.h | 89 ++++++++++++ tests/lock_free_ring_buffer_test.cpp | 193 +++++++++++++++++++++++++++ tests/ring_buffer_test.cpp | 10 +- tests/test_main.cpp | 7 + 6 files changed, 394 insertions(+), 26 deletions(-) create mode 100644 src/lock_free_ring_buffer.h create mode 100644 tests/lock_free_ring_buffer_test.cpp create mode 100644 tests/test_main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 682048e7..fa236939 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -91,8 +91,10 @@ add_dependencies(${PROJECT_NAME}_nodelets ${PROJECT_NAME}_gencpp) # ==== Test ==== if(CATKIN_ENABLE_TESTING) catkin_add_gtest(${PROJECT_NAME}_test - tests/ring_buffer_test.cpp src/os_ros.cpp + tests/test_main.cpp + tests/ring_buffer_test.cpp + tests/lock_free_ring_buffer_test.cpp tests/point_accessor_test.cpp tests/point_transform_test.cpp tests/point_cloud_compose_test.cpp diff --git a/src/lidar_packet_handler.h b/src/lidar_packet_handler.h index 9c683fe9..931600df 100644 --- a/src/lidar_packet_handler.h +++ b/src/lidar_packet_handler.h @@ -16,6 +16,9 @@ #include +#include "lock_free_ring_buffer.h" +#include + namespace { template @@ -73,12 +76,26 @@ class LidarPacketHandler { const std::vector& handlers, const std::string& timestamp_mode, int64_t ptp_utc_tai_offset) - : lidar_scan_handlers{handlers} { + : ring_buffer(LIDAR_SCAN_COUNT), lidar_scan_handlers{handlers} { // initialize lidar_scan processor and buffer scan_batcher = std::make_unique(info); - lidar_scan = std::make_unique( - info.format.columns_per_frame, info.format.pixels_per_column, - info.format.udp_profile_lidar); + + lidar_scans.resize(LIDAR_SCAN_COUNT); + mutexes.resize(LIDAR_SCAN_COUNT); + + for (size_t i = 0; i < lidar_scans.size(); ++i) { + lidar_scans[i] = std::make_unique( + info.format.columns_per_frame, info.format.pixels_per_column, + info.format.udp_profile_lidar); + mutexes[i] = std::make_shared(); // make_unique? + } + + lidar_scans_processing_thread = std::make_unique([this]() { + while (lidar_scans_processing_active) { + process_scans(); + } + NODELET_DEBUG("lidar_scans_processing_thread done."); + }); // initalize time handlers scan_col_ts_spacing_ns = compute_scan_col_ts_spacing_ns(info.mode); @@ -125,14 +142,37 @@ class LidarPacketHandler { info, handlers, timestamp_mode, ptp_utc_tai_offset); return [handler](const uint8_t* lidar_buf) { if (handler->lidar_packet_accumlator(lidar_buf)) { - for (auto h : handler->lidar_scan_handlers) { - h(*handler->lidar_scan, handler->lidar_scan_estimated_ts, - handler->lidar_scan_estimated_msg_ts); - } + handler->ring_buffer_has_elements.notify_one(); } }; } + const std::string getName() const { return "lidar_packet_hander"; } + + void process_scans() { + + { + std::unique_lock index_lock(ring_buffer_mutex); + ring_buffer_has_elements.wait(index_lock, [this] { + return !ring_buffer.empty(); + }); + } + + std::unique_lock lock(*mutexes[ring_buffer.read_head()]); + + for (auto h : lidar_scan_handlers) { + h(*lidar_scans[ring_buffer.read_head()], lidar_scan_estimated_ts, + lidar_scan_estimated_msg_ts); + } + + size_t read_step = 1; + if (ring_buffer.size() > 7) { + NODELET_WARN("lidar_scans full, THROTTLING"); + read_step = 2; + } + ring_buffer.read(read_step); + } + // time interpolation methods uint64_t impute_value(int last_scan_last_nonzero_idx, uint64_t last_scan_last_nonzero_value, @@ -221,23 +261,45 @@ class LidarPacketHandler { bool lidar_handler_sensor_time(const sensor::packet_format&, const uint8_t* lidar_buf) { - if (!(*scan_batcher)(lidar_buf, *lidar_scan)) return false; - lidar_scan_estimated_ts = compute_scan_ts(lidar_scan->timestamp()); - lidar_scan_estimated_msg_ts = - impl::ts_to_ros_time(lidar_scan_estimated_ts); + + if (ring_buffer.full()) { + NODELET_WARN("lidar_scans full, DROPPING PACKET"); + return false; + } + + std::unique_lock lock(*(mutexes[ring_buffer.write_head()])); + + if (!(*scan_batcher)(lidar_buf, *lidar_scans[ring_buffer.write_head()])) return false; + lidar_scan_estimated_ts = compute_scan_ts(lidar_scans[ring_buffer.write_head()]->timestamp()); + lidar_scan_estimated_msg_ts = impl::ts_to_ros_time(lidar_scan_estimated_ts); + + ring_buffer.write(); + return true; } bool lidar_handler_sensor_time_ptp(const sensor::packet_format&, const uint8_t* lidar_buf, int64_t ptp_utc_tai_offset) { - if (!(*scan_batcher)(lidar_buf, *lidar_scan)) return false; - auto ts_v = lidar_scan->timestamp(); + + if (ring_buffer.full()) { + NODELET_WARN("lidar_scans full, DROPPING PACKET"); + return false; + } + + std::unique_lock lock( + *(mutexes[ring_buffer.write_head()])); + + if (!(*scan_batcher)(lidar_buf, *lidar_scans[ring_buffer.write_head()])) return false; + auto ts_v = lidar_scans[ring_buffer.write_head()]->timestamp(); for (int i = 0; i < ts_v.rows(); ++i) ts_v[i] = impl::ts_safe_offset_add(ts_v[i], ptp_utc_tai_offset); lidar_scan_estimated_ts = compute_scan_ts(ts_v); lidar_scan_estimated_msg_ts = impl::ts_to_ros_time(lidar_scan_estimated_ts); + + ring_buffer.write(); + return true; } @@ -249,12 +311,24 @@ class LidarPacketHandler { pf, lidar_buf, packet_receive_time); // first point cloud time lidar_handler_ros_time_frame_ts_initialized = true; } - if (!(*scan_batcher)(lidar_buf, *lidar_scan)) return false; - lidar_scan_estimated_ts = compute_scan_ts(lidar_scan->timestamp()); + + if (ring_buffer.full()) { + NODELET_WARN("lidar_scans full, DROPPING PACKET"); + return false; + } + + std::unique_lock lock( + *(mutexes[ring_buffer.write_head()])); + + if (!(*scan_batcher)(lidar_buf, *lidar_scans[ring_buffer.write_head()])) return false; + lidar_scan_estimated_ts = compute_scan_ts(lidar_scans[ring_buffer.write_head()]->timestamp()); lidar_scan_estimated_msg_ts = lidar_handler_ros_time_frame_ts; // set time for next point cloud msg lidar_handler_ros_time_frame_ts = extrapolate_frame_ts(pf, lidar_buf, packet_receive_time); + + ring_buffer.write(); + return true; } @@ -267,7 +341,12 @@ class LidarPacketHandler { private: std::unique_ptr scan_batcher; - std::unique_ptr lidar_scan; + const int LIDAR_SCAN_COUNT = 10; + LockFreeRingBuffer ring_buffer; + std::mutex ring_buffer_mutex; + std::vector> lidar_scans; + std::vector> mutexes; + uint64_t lidar_scan_estimated_ts; ros::Time lidar_scan_estimated_msg_ts; @@ -286,6 +365,10 @@ class LidarPacketHandler { std::vector lidar_scan_handlers; LidarPacketAccumlator lidar_packet_accumlator; + + bool lidar_scans_processing_active = true; + std::unique_ptr lidar_scans_processing_thread; + std::condition_variable ring_buffer_has_elements; }; } // namespace ouster_ros \ No newline at end of file diff --git a/src/lock_free_ring_buffer.h b/src/lock_free_ring_buffer.h new file mode 100644 index 00000000..080f85fa --- /dev/null +++ b/src/lock_free_ring_buffer.h @@ -0,0 +1,89 @@ +/** + * Copyright (c) 2018-2024, Ouster, Inc. + * All rights reserved. + * + * @file thread_safe_ring_buffer.h + * @brief File contains thread safe implementation of a ring buffer + */ + +#pragma once + +#include +#include +#include +#include + +/** + * @class LockFreeRingBuffer thread safe ring buffer. + * + * @remarks current implementation has effective (capacity-1) when writing elements + */ +class LockFreeRingBuffer { + public: + LockFreeRingBuffer(size_t capacity) + : capacity_(capacity), + write_idx_(0), + read_idx_(0) {} + + /** + * Gets the maximum number of items that this ring buffer can hold. + */ + size_t capacity() const { return capacity_; } + + /** + * Gets the number of item that currently occupy the ring buffer. This + * number would vary between 0 and the capacity(). + * + * @remarks + * if returned value was 0 or the value was equal to the buffer capacity(), + * this does not guarantee that a subsequent call to read() or write() + * wouldn't cause the calling thread to be blocked. + */ + size_t size() const { + return write_idx_ >= read_idx_ ? + write_idx_ - read_idx_ : + write_idx_ + capacity_ - read_idx_; + } + + size_t available() const { + return capacity_ - 1 - size(); + } + + /** + * Checks if the ring buffer is empty. + */ + bool empty() const { + return read_idx_ == write_idx_; + } + + /** + * Checks if the ring buffer is full. + */ + bool full() const { + return read_idx_ == (write_idx_ + 1) % capacity_; + } + + /** + */ + bool write(size_t count = 1) { + if (count > available()) return false; + write_idx_ = (write_idx_ + count) % capacity_; + return true; + } + + /** + */ + bool read(size_t count = 1) { + if (count > size()) return false; + read_idx_ = (read_idx_ + count) % capacity_; + return true; + } + + size_t write_head() const { return write_idx_; } + size_t read_head() const { return read_idx_; } + + private: + const size_t capacity_; + std::atomic write_idx_; + std::atomic read_idx_; +}; \ No newline at end of file diff --git a/tests/lock_free_ring_buffer_test.cpp b/tests/lock_free_ring_buffer_test.cpp new file mode 100644 index 00000000..b88d79e0 --- /dev/null +++ b/tests/lock_free_ring_buffer_test.cpp @@ -0,0 +1,193 @@ +#include +#include +#include +#include +#include "../src/lock_free_ring_buffer.h" + +using namespace std::chrono_literals; + +class LockFreeRingBufferTest : public ::testing::Test { + protected: + static constexpr int ITEM_COUNT = 3; // number of item the buffer could hold + + void SetUp() override { + buffer = std::make_unique(ITEM_COUNT); + } + + void TearDown() override { + buffer.reset(); + } + + std::unique_ptr buffer; +}; + + +TEST_F(LockFreeRingBufferTest, ReadWriteToBufferFullEmpty) { + + assert (ITEM_COUNT > 1 && "or this test can't run"); + + EXPECT_EQ(buffer->capacity(), ITEM_COUNT); + EXPECT_EQ(buffer->available(), ITEM_COUNT - 1); + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); + + for (int i = 0; i < ITEM_COUNT - 1; ++i) { + buffer->write(); + } + + EXPECT_FALSE(buffer->empty()); + EXPECT_TRUE(buffer->full()); + + // remove one item + buffer->read(); + + EXPECT_FALSE(buffer->empty()); + EXPECT_FALSE(buffer->full()); + + for (int i = 1; i < ITEM_COUNT - 1; ++i) { + buffer->read(); + } + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); +} + +TEST_F(LockFreeRingBufferTest, ReadWriteToBufferCheckReturn) { + + assert (ITEM_COUNT > 1 && "or this test can't run"); + + EXPECT_TRUE(buffer->empty()); + + for (int i = 0; i < ITEM_COUNT - 1; ++i) { + EXPECT_TRUE(buffer->write()); + } + + EXPECT_TRUE(buffer->full()); + EXPECT_FALSE(buffer->write()); + + // remove one item and re-write + EXPECT_TRUE(buffer->read()); + EXPECT_TRUE(buffer->write()); + + for (int i = 0; i < ITEM_COUNT - 1; ++i) { + EXPECT_TRUE(buffer->read()); + } + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->read()); +} + + +TEST_F(LockFreeRingBufferTest, ReadWriteToBufferSizeAvailable) { + + assert (ITEM_COUNT == 3 && "or this test can't run"); + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); + + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + EXPECT_EQ(buffer->write_head(), 0); + EXPECT_EQ(buffer->read_head(), 0); + + EXPECT_TRUE(buffer->write()); + EXPECT_EQ(buffer->size(), 1); + EXPECT_EQ(buffer->available(), 1); + EXPECT_EQ(buffer->write_head(), 1); + EXPECT_EQ(buffer->read_head(), 0); + + EXPECT_TRUE(buffer->write()); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + EXPECT_EQ(buffer->write_head(), 2); + EXPECT_EQ(buffer->read_head(), 0); + + EXPECT_TRUE(buffer->read()); + EXPECT_EQ(buffer->size(), 1); + EXPECT_EQ(buffer->available(), 1); + EXPECT_EQ(buffer->write_head(), 2); + EXPECT_EQ(buffer->read_head(), 1); + + EXPECT_TRUE(buffer->write()); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + EXPECT_EQ(buffer->write_head(), 0); + EXPECT_EQ(buffer->read_head(), 1); + + // Next write should fail, so size shouldn't change + EXPECT_FALSE(buffer->write()); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + EXPECT_EQ(buffer->write_head(), 0); + EXPECT_EQ(buffer->read_head(), 1); + + EXPECT_TRUE(buffer->read()); + EXPECT_EQ(buffer->size(), 1); + EXPECT_EQ(buffer->available(), 1); + EXPECT_EQ(buffer->write_head(), 0); + EXPECT_EQ(buffer->read_head(), 2); + + EXPECT_TRUE(buffer->read()); + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + EXPECT_EQ(buffer->write_head(), 0); + EXPECT_EQ(buffer->read_head(), 0); + + // Next read should fail, so size shouldn't change + EXPECT_FALSE(buffer->read()); + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + EXPECT_EQ(buffer->write_head(), 0); + EXPECT_EQ(buffer->read_head(), 0); +} + +TEST_F(LockFreeRingBufferTest, ReadWriteToBufferAdvanceMultiple) { + + assert (ITEM_COUNT == 3 && "or this test can't run"); + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); + + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + + EXPECT_TRUE(buffer->write(2)); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + + // This write should fail since we advance beyond capacity, so size shouldn't change + EXPECT_FALSE(buffer->write(2)); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + + EXPECT_TRUE(buffer->read()); + EXPECT_EQ(buffer->size(), 1); + EXPECT_EQ(buffer->available(), 1); + + EXPECT_TRUE(buffer->read()); + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + + EXPECT_TRUE(buffer->write(2)); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + + // Any additional write will also fail, size shouldn't change + EXPECT_FALSE(buffer->write()); + EXPECT_EQ(buffer->size(), 2); + EXPECT_EQ(buffer->available(), 0); + + EXPECT_TRUE(buffer->read(2)); + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + + // This read should fail since we advance beyond available, so size shouldn't change + EXPECT_FALSE(buffer->read(2)); + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); + + // Any subsequent read will also fail, size shouldn't change + EXPECT_FALSE(buffer->read()); + EXPECT_EQ(buffer->size(), 0); + EXPECT_EQ(buffer->available(), 2); +} diff --git a/tests/ring_buffer_test.cpp b/tests/ring_buffer_test.cpp index 8d8bd7f4..e68ab29a 100644 --- a/tests/ring_buffer_test.cpp +++ b/tests/ring_buffer_test.cpp @@ -8,8 +8,8 @@ using namespace std::chrono_literals; class ThreadSafeRingBufferTest : public ::testing::Test { protected: - static const int ITEM_SIZE = 4; // predefined size for all items used in - static const int ITEM_COUNT = 3; // number of item the buffer could hold + static constexpr int ITEM_SIZE = 4; // predefined size for all items used in + static constexpr int ITEM_COUNT = 3; // number of item the buffer could hold void SetUp() override { buffer = std::make_unique(ITEM_SIZE, ITEM_COUNT); @@ -173,9 +173,3 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferWithOverwrite) { EXPECT_EQ(target[i], "0000"); } } - -int main(int argc, char** argv) -{ - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} \ No newline at end of file diff --git a/tests/test_main.cpp b/tests/test_main.cpp new file mode 100644 index 00000000..073f2adc --- /dev/null +++ b/tests/test_main.cpp @@ -0,0 +1,7 @@ +#include + +int main(int argc, char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file