From 6aba481334546b1c8cc7b2073ccc2c9403915007 Mon Sep 17 00:00:00 2001 From: Imaniac230 <44968160+Imaniac230@users.noreply.github.com> Date: Mon, 19 Feb 2024 01:26:36 +0100 Subject: [PATCH] wip: Start adjusting and adding new tests. --- ouster-ros/test/ring_buffer_test.cpp | 379 +++++++++++++++++++++++++-- 1 file changed, 363 insertions(+), 16 deletions(-) diff --git a/ouster-ros/test/ring_buffer_test.cpp b/ouster-ros/test/ring_buffer_test.cpp index 8d8bd7f4..9c8a2936 100644 --- a/ouster-ros/test/ring_buffer_test.cpp +++ b/ouster-ros/test/ring_buffer_test.cpp @@ -8,8 +8,8 @@ using namespace std::chrono_literals; class ThreadSafeRingBufferTest : public ::testing::Test { protected: - static const int ITEM_SIZE = 4; // predefined size for all items used in - static const int ITEM_COUNT = 3; // number of item the buffer could hold + static constexpr int ITEM_SIZE = 4; // predefined size for all items used in + static constexpr int ITEM_COUNT = 3; // number of item the buffer could hold void SetUp() override { buffer = std::make_unique(ITEM_SIZE, ITEM_COUNT); @@ -47,6 +47,8 @@ class ThreadSafeRingBufferTest : public ::testing::Test { return output; } + void reset_writing() { buffer->reset_write_idx(); } + std::unique_ptr buffer; }; @@ -77,7 +79,12 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferSimple) { EXPECT_FALSE(buffer->empty()); EXPECT_FALSE(buffer->full()); + EXPECT_EQ(buffer->size(), static_cast(ITEM_COUNT - 1)); + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); for (int i = 1; i < ITEM_COUNT; ++i) { buffer->read([i, &target](uint8_t* buffer){ std::memcpy(&target[i][0], buffer, ITEM_SIZE); @@ -93,7 +100,7 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferSimple) { } } -TEST_F(ThreadSafeRingBufferTest, ReadWriteToBuffer) { +TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferBlocking) { const int TOTAL_ITEMS = 10; // total items to process const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); @@ -111,11 +118,19 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBuffer) { }); std::thread consumer([this, &target]() { - for (int i = 0; i < TOTAL_ITEMS; ++i) { - buffer->read([i, &target](uint8_t* buffer){ - std::memcpy(&target[i][0], buffer, ITEM_SIZE); + int i = 0; + while (i < TOTAL_ITEMS - 1) { + buffer->read([&i, &target](uint8_t* buffer){ + std::memcpy(&target[i++][0], buffer, ITEM_SIZE); }); } + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); + buffer->read([&i, &target](uint8_t* buffer){ + std::memcpy(&target[i++][0], buffer, ITEM_SIZE); + }); }); producer.join(); @@ -123,7 +138,7 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBuffer) { for (int i = 0; i < TOTAL_ITEMS; ++i) { std::cout << "source " << source[i] << ", target " << target[i] << std::endl; - EXPECT_EQ(target[i], source[i]); + EXPECT_EQ(target[i], source[i]); } } @@ -148,11 +163,14 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferWithOverwrite) { // allowing sufficient time for the producer thread to be // completely done std::this_thread::sleep_for(1s); + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); std::thread consumer([this, &target]() { for (int i = 0; i < TOTAL_ITEMS; ++i) { buffer->read_timeout([i, &target](uint8_t* buffer){ - if (buffer != nullptr) - std::memcpy(&target[i][0], buffer, ITEM_SIZE); + std::memcpy(&target[i][0], buffer, ITEM_SIZE); }, 1s); } }); @@ -160,18 +178,347 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferWithOverwrite) { producer.join(); consumer.join(); - // Since our buffer can host only up to ITEM_COUNT simultanously only the + // Since our buffer can host only up to ITEM_COUNT simultaneously only the // last ITEM_COUNT items would have remained in the buffer by the time // the consumer started processing. - for (int i = 0; i < ITEM_COUNT; ++i) { - std::cout << "source " << source[i] << ", target " << target[i] << std::endl; - EXPECT_EQ(target[i], source[TOTAL_ITEMS-ITEM_COUNT+i]); + // If TOTAL_ITEMS is not divisible by ITEM_COUNT, the beginning of the buffer, + // will contain a section of ITEM_COUNT items with the latest overwritten data. + for (int i = 0; i < TOTAL_ITEMS % ITEM_COUNT; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; + EXPECT_EQ(target[i], source[TOTAL_ITEMS - (TOTAL_ITEMS % ITEM_COUNT) + i]); } - + // If TOTAL_ITEMS is divisible by ITEM_COUNT, the whole buffer will contain + // exactly the last ITEM_COUNT items. Otherwise, the end of the buffer will + // contain a section of ITEM_COUNT items with older data. + for (int i = TOTAL_ITEMS % ITEM_COUNT; i < ITEM_COUNT; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; + EXPECT_EQ(target[i], source[TOTAL_ITEMS - (TOTAL_ITEMS % ITEM_COUNT) - ITEM_COUNT + i]); + } + // The remaining part of the target will not have any new data, since the buffer, + // will now be completely read out. for (int i = ITEM_COUNT; i < TOTAL_ITEMS; ++i) { - std::cout << "source " << source[i] << ", target " << target[i] << std::endl; - EXPECT_EQ(target[i], "0000"); + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; + EXPECT_EQ(target[i], "0000"); + } +} + +TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferNonblocking) { + + const int TOTAL_ITEMS = 10; // total items to process + const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); + std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); + + std::thread producer([this, &source]() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + buffer->write_nonblock([i, &source](uint8_t* buffer){ + std::memcpy(buffer, &source[i][0], ITEM_SIZE); + }); + } + }); + + // wait for 1 second before starting the consumer thread + // allowing sufficient time for the producer thread to be + // completely done + std::this_thread::sleep_for(1s); + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); + std::thread consumer([this, &target]() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + buffer->read_nonblock([i, &target](uint8_t* buffer){ + std::memcpy(&target[i][0], buffer, ITEM_SIZE); + }); + } + }); + + producer.join(); + consumer.join(); + + // Since our buffer can host only up to ITEM_COUNT simultaneously only the + // first ITEM_COUNT items will be written into the buffer, with the rest being + // ignored. + for (int i = 0; i < ITEM_COUNT; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; + EXPECT_EQ(target[i], source[i]); + } + // The remaining part of the target will not have any new data, since the buffer, + // will now be completely read out. + for (int i = ITEM_COUNT; i < TOTAL_ITEMS; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; + EXPECT_EQ(target[i], "0000"); + } +} + +TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferBlockingThrottling) { + + const int TOTAL_ITEMS = 10; // total items to process + const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); + std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); + static constexpr std::chrono::milliseconds period(10); + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); + + // First, the producer will write to the buffer faster than the consumer can read. + std::thread faster_producer([this, &source]() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + buffer->write([i, &source](uint8_t* buffer){ + std::memcpy(buffer, &source[i][0], ITEM_SIZE); + }); + std::this_thread::sleep_for(period); + } + }); + + std::thread slower_consumer([this, &target]() { + int i = 0; + while (i < TOTAL_ITEMS - 1) { + buffer->read([&i, &target](uint8_t* buffer){ + std::memcpy(&target[i++][0], buffer, ITEM_SIZE); + }); + std::this_thread::sleep_for(period * 4); + } + + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); + buffer->read([&i, &target](uint8_t* buffer){ + std::memcpy(&target[i++][0], buffer, ITEM_SIZE); + }); + }); + + faster_producer.join(); + slower_consumer.join(); + + ASSERT_TRUE(buffer->empty()); + ASSERT_FALSE(buffer->full()); + + // Blocking read and write should be synchronized even if one thread is faster. + std::cout << "Faster producer, slower consumer:" << std::endl; + for (int i = 0; i < TOTAL_ITEMS; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; + EXPECT_EQ(target[i], source[i]); + } + + target = known_vector_str(TOTAL_ITEMS, "0000"); + + // Then, then consumer will read faster than the producer can write. + std::thread slower_producer([this, &source]() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + buffer->write([i, &source](uint8_t* buffer){ + std::memcpy(buffer, &source[i][0], ITEM_SIZE); + }); + std::this_thread::sleep_for(period * 4); + } + }); + + std::thread faster_consumer([this, &target]() { + int i = 0; + while (i < TOTAL_ITEMS - 1) { + buffer->read([&i, &target](uint8_t* buffer){ + std::memcpy(&target[i++][0], buffer, ITEM_SIZE); + }); + std::this_thread::sleep_for(period); + } + + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); + buffer->read([&i, &target](uint8_t* buffer){ + std::memcpy(&target[i++][0], buffer, ITEM_SIZE); + }); + }); + + slower_producer.join(); + faster_consumer.join(); + + ASSERT_TRUE(buffer->empty()); + ASSERT_FALSE(buffer->full()); + + // Blocking read and write should be synchronized even if one thread is faster. + std::cout << "Slower producer, faster consumer:" << std::endl; + for (int i = 0; i < TOTAL_ITEMS; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; +// EXPECT_EQ(target[i], source[i]); + } + + //TODO: finish asserts + const bool finishedImplementing = false; + ASSERT_TRUE(finishedImplementing); +} + +TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferWithOverwriteThrottling) { + + const int TOTAL_ITEMS = 10; // total items to process + const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); + std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); + static constexpr std::chrono::milliseconds period(10); + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); + + // First, the producer will write to the buffer faster than the consumer can read. + std::thread faster_producer([this, &source]() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + buffer->write_overwrite([i, &source](uint8_t* buffer){ + std::memcpy(buffer, &source[i][0], ITEM_SIZE); + }); + std::this_thread::sleep_for(period); + } + }); + + std::thread slower_consumer([this, &target, TOTAL_ITEMS]() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + buffer->read_timeout([i, &target](uint8_t* buffer){ + std::memcpy(&target[i][0], buffer, ITEM_SIZE); + }, 1s); + std::this_thread::sleep_for(period * 4); + } + + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); + buffer->read_timeout([TOTAL_ITEMS, &target](uint8_t* buffer){ + std::memcpy(&target[TOTAL_ITEMS - 1][0], buffer, ITEM_SIZE); + }, 1s); + }); + + faster_producer.join(); + slower_consumer.join(); + + ASSERT_TRUE(buffer->empty()); + ASSERT_FALSE(buffer->full()); + + // Blocking read and write should be synchronized even if one thread is faster. + std::cout << "Faster producer, slower consumer:" << std::endl; + for (int i = 0; i < TOTAL_ITEMS; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; +// EXPECT_EQ(target[i], source[i]); + } + + target = known_vector_str(TOTAL_ITEMS, "0000"); + + // Then, then consumer will read faster than the producer can write. + std::thread slower_producer([this, &source]() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + buffer->write_overwrite([i, &source](uint8_t* buffer){ + std::memcpy(buffer, &source[i][0], ITEM_SIZE); + }); + std::this_thread::sleep_for(period * 4); + } + }); + + std::thread faster_consumer([this, &target]() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + buffer->read_timeout([i, &target](uint8_t* buffer){ + std::memcpy(&target[i][0], buffer, ITEM_SIZE); + }, 1s); + std::this_thread::sleep_for(period); + } + }); + + slower_producer.join(); + faster_consumer.join(); + + // Blocking read and write should be synchronized even if one thread is faster. + std::cout << "Slower producer, faster consumer:" << std::endl; + for (int i = 0; i < TOTAL_ITEMS; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; +// EXPECT_EQ(target[i], source[i]); + } + + //TODO: finish asserts + const bool finishedImplementing = false; + ASSERT_TRUE(finishedImplementing); +} + +TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferNonblockingThrottling) { + + const int TOTAL_ITEMS = 10; // total items to process + const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); + std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); + static constexpr std::chrono::milliseconds period(10); + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); + + // First, the producer will write to the buffer faster than the consumer can read. + std::thread faster_producer([this, &source]() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + buffer->write_nonblock([i, &source](uint8_t* buffer){ + std::memcpy(buffer, &source[i][0], ITEM_SIZE); + }); + std::this_thread::sleep_for(period); + } + }); + + std::thread slower_consumer([this, &target, TOTAL_ITEMS]() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + buffer->read_nonblock([i, &target](uint8_t* buffer){ + std::memcpy(&target[i][0], buffer, ITEM_SIZE); + }); + std::this_thread::sleep_for(period * 4); + } + + reset_writing(); + buffer->read_nonblock([TOTAL_ITEMS, &target](uint8_t* buffer){ + std::memcpy(&target[TOTAL_ITEMS - 1][0], buffer, ITEM_SIZE); + }); + }); + + faster_producer.join(); + slower_consumer.join(); + + ASSERT_TRUE(buffer->empty()); + ASSERT_FALSE(buffer->full()); + + // Blocking read and write should be synchronized even if one thread is faster. + std::cout << "Faster producer, slower consumer:" << std::endl; + for (int i = 0; i < TOTAL_ITEMS; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; + // EXPECT_EQ(target[i], source[i]); + } + + target = known_vector_str(TOTAL_ITEMS, "0000"); + + // Then, then consumer will read faster than the producer can write. + std::thread slower_producer([this, &source]() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + buffer->write_nonblock([i, &source](uint8_t* buffer){ + std::memcpy(buffer, &source[i][0], ITEM_SIZE); + }); + std::this_thread::sleep_for(period * 4); } + }); + + std::thread faster_consumer([this, &target]() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + buffer->read_nonblock([i, &target](uint8_t* buffer){ + std::memcpy(&target[i][0], buffer, ITEM_SIZE); + }); + std::this_thread::sleep_for(period); + } + }); + + slower_producer.join(); + faster_consumer.join(); + + // Blocking read and write should be synchronized even if one thread is faster. + std::cout << "Slower producer, faster consumer:" << std::endl; + for (int i = 0; i < TOTAL_ITEMS; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; + // EXPECT_EQ(target[i], source[i]); + } + + //TODO: finish asserts + const bool finishedImplementing = false; + ASSERT_TRUE(finishedImplementing); } int main(int argc, char** argv)