From ad556dd6491a73d720a95fbd1feeca8314d31e7e Mon Sep 17 00:00:00 2001 From: Imaniac230 <44968160+Imaniac230@users.noreply.github.com> Date: Tue, 5 Mar 2024 19:12:17 +0100 Subject: [PATCH] fix-squash: Better push/pop methods - using only true atomic operations for handling the active items counter, remove pointless thresholding - the writer should never overtake the reader with a full buffer if operations are handled correctly. --- ouster-ros/src/thread_safe_ring_buffer.h | 47 +++------- ouster-ros/test/ring_buffer_test.cpp | 110 ----------------------- 2 files changed, 11 insertions(+), 146 deletions(-) diff --git a/ouster-ros/src/thread_safe_ring_buffer.h b/ouster-ros/src/thread_safe_ring_buffer.h index 510d8f11..1a8a4a4b 100644 --- a/ouster-ros/src/thread_safe_ring_buffer.h +++ b/ouster-ros/src/thread_safe_ring_buffer.h @@ -25,8 +25,6 @@ class ThreadSafeRingBuffer { active_items_count(0), write_idx(SIZE_MAX), read_idx(SIZE_MAX), - dropped_reads(0), - should_always_drop_reads(true), new_data_lock(mutex, std::defer_lock), free_space_lock(mutex, std::defer_lock) {} @@ -72,7 +70,6 @@ class ThreadSafeRingBuffer { */ template void write(BufferWriteFn&& buffer_write) { - should_always_drop_reads = false; free_space_lock.lock(); free_space_condition.wait(free_space_lock, [this] { return !full(); }); free_space_lock.unlock(); @@ -85,7 +82,6 @@ class ThreadSafeRingBuffer { */ template void write_overwrite(BufferWriteFn&& buffer_write) { - should_always_drop_reads = true; perform_write(buffer_write); } @@ -95,7 +91,6 @@ class ThreadSafeRingBuffer { */ template void write_nonblock(BufferWriteFn&& buffer_write) { - should_always_drop_reads = false; if (!full()) perform_write(buffer_write); } @@ -153,16 +148,6 @@ class ThreadSafeRingBuffer { */ void reset_read_idx() { read_idx = SIZE_MAX; } - /** - * Gets the max_allowed_read_drops value. - * @return The statically set max allowed number of reading drops. - * @remarks - * Should be mostly used by tests. - */ - static constexpr uint32_t get_max_allowed_read_drops() { - return MAX_ALLOWED_READ_DROPS; - } - private: /** * Performs the actual sequence of operations for writing. @@ -182,23 +167,15 @@ class ThreadSafeRingBuffer { * @param buffer_read * @remarks * If this function attempts to read using an index currently held by the - * writer, it will not perform the operations. However, if allowed, it will - * not keep dropping more than the MAX_ALLOWED_READ_DROPS, after which a - * single read is performed regardless. + * writer, it will not perform the operations. */ template void perform_read(BufferReadFn&& buffer_read) { - if ((incremented_with_capacity(read_idx.load()) == write_idx.load()) - && (should_always_drop_reads.load() || - (dropped_reads.load() < MAX_ALLOWED_READ_DROPS))) { - ++dropped_reads; - return; + if (incremented_with_capacity(read_idx.load()) != write_idx.load()) { + buffer_read(&buffer[increment_with_capacity(read_idx) * item_size]); + pop(); + free_space_condition.notify_one(); } - - dropped_reads = 0; - buffer_read(&buffer[increment_with_capacity(read_idx) * item_size]); - pop(); - free_space_condition.notify_one(); } /** @@ -229,19 +206,20 @@ class ThreadSafeRingBuffer { * buffer capacity. */ void push() { - active_items_count = std::min(active_items_count.load() + 1, capacity()); + size_t overflow = capacity() + 1; + ++active_items_count; + active_items_count.compare_exchange_strong(overflow, capacity()); } /** * Atomically decrements the buffer active elements count, clamping at zero. */ void pop() { - active_items_count = static_cast(std::max( - static_cast(active_items_count.load() - 1), 0)); + size_t overflow = SIZE_MAX; + --active_items_count; + active_items_count.compare_exchange_strong(overflow, 0); } - static constexpr uint32_t MAX_ALLOWED_READ_DROPS = UINT16_MAX * 6; - std::vector buffer; const size_t item_size; @@ -251,9 +229,6 @@ class ThreadSafeRingBuffer { std::atomic_size_t write_idx; std::atomic_size_t read_idx; - std::atomic_uint32_t dropped_reads; - std::atomic_bool should_always_drop_reads; - std::mutex mutex; std::condition_variable new_data_condition; std::unique_lock new_data_lock; diff --git a/ouster-ros/test/ring_buffer_test.cpp b/ouster-ros/test/ring_buffer_test.cpp index f140a76c..dba4793d 100644 --- a/ouster-ros/test/ring_buffer_test.cpp +++ b/ouster-ros/test/ring_buffer_test.cpp @@ -51,10 +51,6 @@ class ThreadSafeRingBufferTest : public ::testing::Test { void reset_reading() { buffer->reset_read_idx(); } - [[nodiscard]] uint32_t max_dropped_reads() const { - return buffer->get_max_allowed_read_drops(); - } - std::unique_ptr buffer; }; @@ -639,112 +635,6 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferNonblockingThrottling) { EXPECT_FALSE(buffer->full()); } -TEST_F(ThreadSafeRingBufferTest, GracefulReadingBlockingWithTimeout) { - - static constexpr int TOTAL_ITEMS = 10; // total items to process - const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); - std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); - - EXPECT_TRUE(buffer->empty()); - EXPECT_FALSE(buffer->full()); - - std::thread producer([this, &source]() { - for (int i = 0; i < TOTAL_ITEMS; ++i) { - buffer->write_nonblock([i, &source](uint8_t* buffer){ - std::memcpy(buffer, &source[i][0], ITEM_SIZE); - }); - } - - //We're not resetting the writing index on purpose. - }); - - // wait for 1 second before starting the consumer thread - // allowing sufficient time for the producer thread to be - // completely done - std::this_thread::sleep_for(1s); - std::thread consumer([this, &target]() { - unsigned idx = 0; - for (unsigned i = 0; i < ITEM_COUNT + max_dropped_reads(); ++i) { - buffer->read_timeout([&idx, &target](uint8_t* buffer){ - std::memcpy(&target[idx++][0], buffer, ITEM_SIZE); - }, 1s); - } - }); - - producer.join(); - consumer.join(); - - // The final writing index remained at ITEM_COUNT - 1, so the consumer will - // keep dropping reads until it reaches the maximum dropping threshold, and - // the final item will eventually be filled. - for (int i = 0; i < ITEM_COUNT; ++i) { - std::cout << "source " << source[i] << ", target " << target[i] << std::endl; - EXPECT_EQ(target[i], source[i]); - } - // Since the buffer can only hold upto ITEM_COUNT items, the buffer is completely - // read out. The remaining target items should be empty. - for (int i = ITEM_COUNT + 1; i < TOTAL_ITEMS; ++i) { - std::cout << "source " << source[i] << ", target " << target[i] << std::endl; - EXPECT_EQ(target[i], "0000"); - } - - EXPECT_TRUE(buffer->empty()); - EXPECT_FALSE(buffer->full()); -} - -TEST_F(ThreadSafeRingBufferTest, GracefulReadingNonblocking) { - - static constexpr int TOTAL_ITEMS = 10; // total items to process - const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); - std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); - - EXPECT_TRUE(buffer->empty()); - EXPECT_FALSE(buffer->full()); - - std::thread producer([this, &source]() { - for (int i = 0; i < TOTAL_ITEMS; ++i) { - buffer->write_nonblock([i, &source](uint8_t* buffer){ - std::memcpy(buffer, &source[i][0], ITEM_SIZE); - }); - } - - //We're not resetting the writing index on purpose. - }); - - // wait for 1 second before starting the consumer thread - // allowing sufficient time for the producer thread to be - // completely done - std::this_thread::sleep_for(1s); - std::thread consumer([this, &target]() { - unsigned idx = 0; - for (unsigned i = 0; i < ITEM_COUNT + max_dropped_reads(); ++i) { - buffer->read_nonblock([&idx, &target](uint8_t* buffer){ - std::memcpy(&target[idx++][0], buffer, ITEM_SIZE); - }); - } - }); - - producer.join(); - consumer.join(); - - // The final writing index remained at ITEM_COUNT - 1, so the consumer will - // keep dropping reads until it reaches the maximum dropping threshold, and - // the final item will eventually be filled. - for (int i = 0; i < ITEM_COUNT; ++i) { - std::cout << "source " << source[i] << ", target " << target[i] << std::endl; - EXPECT_EQ(target[i], source[i]); - } - // Since the buffer can only hold upto ITEM_COUNT items, the buffer is completely - // read out. The remaining target items should be empty. - for (int i = ITEM_COUNT + 1; i < TOTAL_ITEMS; ++i) { - std::cout << "source " << source[i] << ", target " << target[i] << std::endl; - EXPECT_EQ(target[i], "0000"); - } - - EXPECT_TRUE(buffer->empty()); - EXPECT_FALSE(buffer->full()); -} - int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv);