From 07a59acbb2103b249fe236d4823762585196b423 Mon Sep 17 00:00:00 2001 From: Natan Silnitsky Date: Thu, 21 Jan 2021 10:06:13 +0200 Subject: [PATCH 1/3] Update DispatcherTest.scala --- .../consumer/dispatcher/DispatcherTest.scala | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/dispatcher/DispatcherTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/dispatcher/DispatcherTest.scala index f0785629..c8be4360 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/dispatcher/DispatcherTest.scala +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/dispatcher/DispatcherTest.scala @@ -62,21 +62,21 @@ class DispatcherTest extends BaseTest[Env with TestClock with TestMetrics] { } yield (result1 must equalTo(SubmitResult.Rejected)) or (result2 must equalTo(SubmitResult.Rejected))) } - "resume paused partitions" in new ctx(lowWatermark = 3, highWatermark = 7) { - run( - for { - queue <- Queue.bounded[Record](1) - dispatcher <- Dispatcher.make[Clock]("group", "clientId", (record) => queue.offer(record).flatMap(result => UIO(println(s"queue.offer result: ${result}"))), lowWatermark, highWatermark) - _ <- ZIO.foreach_(0 to (highWatermark + 1)) { offset => - submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) - } - _ <- submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) // Will be dropped - _ <- eventuallyZ(dispatcher.resumeablePartitions(Set(topicPartition)))(_.isEmpty) - _ <- ZIO.foreach_(1 to 4 )(_ => queue.take) - _ <- eventuallyZ(dispatcher.resumeablePartitions(Set(topicPartition)))(_ == Set(TopicPartition(topic, partition))) - } yield ok - ) - } +// "resume paused partitions" in new ctx(lowWatermark = 3, highWatermark = 7) { +// run( +// for { +// queue <- Queue.bounded[Record](1) +// dispatcher <- Dispatcher.make[Clock]("group", "clientId", (record) => queue.offer(record).flatMap(result => UIO(println(s"queue.offer result: ${result}"))), lowWatermark, highWatermark) +// _ <- ZIO.foreach_(0 to (highWatermark + 1)) { offset => +// submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) +// } +// _ <- submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) // Will be dropped +// _ <- eventuallyZ(dispatcher.resumeablePartitions(Set(topicPartition)))(_.isEmpty) +// _ <- ZIO.foreach_(1 to 4 )(_ => queue.take) +// _ <- eventuallyZ(dispatcher.resumeablePartitions(Set(topicPartition)))(_ == Set(TopicPartition(topic, partition))) +// } yield ok +// ) +// } "block resume paused partitions" in new ctx(lowWatermark = 30, highWatermark = 34) { run( From 9d2624a1893c69176dec41cfa0bd07ff58f2cdb0 Mon Sep 17 00:00:00 2001 From: Natan Silnitsky Date: Thu, 21 Jan 2021 10:25:10 +0200 Subject: [PATCH 2/3] another test commented out --- .../consumer/dispatcher/DispatcherTest.scala | 88 +++++++++---------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/dispatcher/DispatcherTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/dispatcher/DispatcherTest.scala index c8be4360..ae35f4ef 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/dispatcher/DispatcherTest.scala +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/dispatcher/DispatcherTest.scala @@ -62,59 +62,59 @@ class DispatcherTest extends BaseTest[Env with TestClock with TestMetrics] { } yield (result1 must equalTo(SubmitResult.Rejected)) or (result2 must equalTo(SubmitResult.Rejected))) } -// "resume paused partitions" in new ctx(lowWatermark = 3, highWatermark = 7) { -// run( -// for { -// queue <- Queue.bounded[Record](1) -// dispatcher <- Dispatcher.make[Clock]("group", "clientId", (record) => queue.offer(record).flatMap(result => UIO(println(s"queue.offer result: ${result}"))), lowWatermark, highWatermark) -// _ <- ZIO.foreach_(0 to (highWatermark + 1)) { offset => -// submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) -// } -// _ <- submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) // Will be dropped -// _ <- eventuallyZ(dispatcher.resumeablePartitions(Set(topicPartition)))(_.isEmpty) -// _ <- ZIO.foreach_(1 to 4 )(_ => queue.take) -// _ <- eventuallyZ(dispatcher.resumeablePartitions(Set(topicPartition)))(_ == Set(TopicPartition(topic, partition))) -// } yield ok -// ) -// } - - "block resume paused partitions" in new ctx(lowWatermark = 30, highWatermark = 34) { + "resume paused partitions" in new ctx(lowWatermark = 3, highWatermark = 7) { run( for { queue <- Queue.bounded[Record](1) - dispatcher <- Dispatcher.make[TestClock]("group", "clientId", (record) => queue.offer(record).flatMap(result => UIO(println(s"block resume paused partitions -queue.offer result: ${result}"))), - lowWatermark, highWatermark, 6500) + dispatcher <- Dispatcher.make[Clock]("group", "clientId", (record) => queue.offer(record).flatMap(result => UIO(println(s"queue.offer result: ${result}"))), lowWatermark, highWatermark) _ <- ZIO.foreach_(0 to (highWatermark + 1)) { offset => - submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) + submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) } - overCapacitySubmitResult <- submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) // Will be dropped - resumeablePartitionsWhenInHighWatermark <- dispatcher.resumeablePartitions(Set(topicPartition)) + _ <- submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) // Will be dropped + _ <- eventuallyZ(dispatcher.resumeablePartitions(Set(topicPartition)))(_.isEmpty) _ <- ZIO.foreach_(1 to 4 )(_ => queue.take) - _ <- TestClock.adjust(1.second) - resumablePartitionDuringBlockPeriod <- dispatcher.resumeablePartitions(Set(topicPartition)) - _ <- TestClock.adjust(6.second) - resumablePartitionAfterBlockPeriod <- dispatcher.resumeablePartitions(Set(topicPartition)) - _ <- ZIO.foreach_(0 to 3) { offset => - submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) - } - overCapacitySubmitResult2 <- submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 16L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) // Will be dropped - _ <- ZIO.foreach_(1 to 4 )(_ => queue.take) - _ <- TestClock.adjust(1.second) - // test clearPausedPartitionDuration - resumablePartitionDuringBlockPeriod2 <- dispatcher.resumeablePartitions(Set(topicPartition)) - } yield (resumeablePartitionsWhenInHighWatermark aka "resumeablePartitionsWhenInHighWatermark" must beEmpty) and - (resumablePartitionDuringBlockPeriod aka "resumablePartitionDuringBlockPeriod" must beEmpty) and - (resumablePartitionAfterBlockPeriod aka "resumablePartitionAfterBlockPeriod" mustEqual Set(TopicPartition(topic, partition))) and - (overCapacitySubmitResult aka "overCapacitySubmitResult" mustEqual Rejected) and - (overCapacitySubmitResult2 aka "overCapacitySubmitResult2" mustEqual Rejected) and - (resumablePartitionDuringBlockPeriod2 aka "resumablePartitionDuringBlockPeriod2" must beEmpty) + _ <- eventuallyZ(dispatcher.resumeablePartitions(Set(topicPartition)))(_ == Set(TopicPartition(topic, partition))) + } yield ok ) + } +// "block resume paused partitions" in new ctx(lowWatermark = 30, highWatermark = 34) { +// run( +// for { +// queue <- Queue.bounded[Record](1) +// dispatcher <- Dispatcher.make[TestClock]("group", "clientId", (record) => queue.offer(record).flatMap(result => UIO(println(s"block resume paused partitions -queue.offer result: ${result}"))), +// lowWatermark, highWatermark, 6500) +// _ <- ZIO.foreach_(0 to (highWatermark + 1)) { offset => +// submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) +// } +// overCapacitySubmitResult <- submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) // Will be dropped +// resumeablePartitionsWhenInHighWatermark <- dispatcher.resumeablePartitions(Set(topicPartition)) +// _ <- ZIO.foreach_(1 to 4 )(_ => queue.take) +// _ <- TestClock.adjust(1.second) +// resumablePartitionDuringBlockPeriod <- dispatcher.resumeablePartitions(Set(topicPartition)) +// _ <- TestClock.adjust(6.second) +// resumablePartitionAfterBlockPeriod <- dispatcher.resumeablePartitions(Set(topicPartition)) +// _ <- ZIO.foreach_(0 to 3) { offset => +// submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) +// } +// overCapacitySubmitResult2 <- submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 16L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) // Will be dropped +// _ <- ZIO.foreach_(1 to 4 )(_ => queue.take) +// _ <- TestClock.adjust(1.second) +// // test clearPausedPartitionDuration +// resumablePartitionDuringBlockPeriod2 <- dispatcher.resumeablePartitions(Set(topicPartition)) +// } yield (resumeablePartitionsWhenInHighWatermark aka "resumeablePartitionsWhenInHighWatermark" must beEmpty) and +// (resumablePartitionDuringBlockPeriod aka "resumablePartitionDuringBlockPeriod" must beEmpty) and +// (resumablePartitionAfterBlockPeriod aka "resumablePartitionAfterBlockPeriod" mustEqual Set(TopicPartition(topic, partition))) and +// (overCapacitySubmitResult aka "overCapacitySubmitResult" mustEqual Rejected) and +// (overCapacitySubmitResult2 aka "overCapacitySubmitResult2" mustEqual Rejected) and +// (resumablePartitionDuringBlockPeriod2 aka "resumablePartitionDuringBlockPeriod2" must beEmpty) +// ) - private def submit(dispatcher: Dispatcher[TestClock], record: ConsumerRecord[Chunk[Byte], Chunk[Byte]]): URIO[TestClock with Env, SubmitResult] = { - dispatcher.submit(record).tap(_ => TestClock.adjust(10.millis)) - } - } + +// private def submit(dispatcher: Dispatcher[TestClock], record: ConsumerRecord[Chunk[Byte], Chunk[Byte]]): URIO[TestClock with Env, SubmitResult] = { +// dispatcher.submit(record).tap(_ => TestClock.adjust(10.millis)) +// } +// } "pause handling" in new ctx() { run(for { From 5792db4c68dbf7353b8d8c37b7d798fe0870f40b Mon Sep 17 00:00:00 2001 From: Natan Silnitsky Date: Thu, 21 Jan 2021 10:31:12 +0200 Subject: [PATCH 3/3] sequential with all tests --- .../consumer/dispatcher/DispatcherTest.scala | 75 ++++++++++--------- 1 file changed, 38 insertions(+), 37 deletions(-) diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/dispatcher/DispatcherTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/dispatcher/DispatcherTest.scala index ae35f4ef..3a99c43e 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/dispatcher/DispatcherTest.scala +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/dispatcher/DispatcherTest.scala @@ -16,6 +16,7 @@ import zio.test.environment.TestClock import zio.{test, _} class DispatcherTest extends BaseTest[Env with TestClock with TestMetrics] { + sequential override def env: UManaged[Env with TestClock with TestMetrics] = for { @@ -78,43 +79,43 @@ class DispatcherTest extends BaseTest[Env with TestClock with TestMetrics] { ) } -// "block resume paused partitions" in new ctx(lowWatermark = 30, highWatermark = 34) { -// run( -// for { -// queue <- Queue.bounded[Record](1) -// dispatcher <- Dispatcher.make[TestClock]("group", "clientId", (record) => queue.offer(record).flatMap(result => UIO(println(s"block resume paused partitions -queue.offer result: ${result}"))), -// lowWatermark, highWatermark, 6500) -// _ <- ZIO.foreach_(0 to (highWatermark + 1)) { offset => -// submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) -// } -// overCapacitySubmitResult <- submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) // Will be dropped -// resumeablePartitionsWhenInHighWatermark <- dispatcher.resumeablePartitions(Set(topicPartition)) -// _ <- ZIO.foreach_(1 to 4 )(_ => queue.take) -// _ <- TestClock.adjust(1.second) -// resumablePartitionDuringBlockPeriod <- dispatcher.resumeablePartitions(Set(topicPartition)) -// _ <- TestClock.adjust(6.second) -// resumablePartitionAfterBlockPeriod <- dispatcher.resumeablePartitions(Set(topicPartition)) -// _ <- ZIO.foreach_(0 to 3) { offset => -// submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) -// } -// overCapacitySubmitResult2 <- submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 16L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) // Will be dropped -// _ <- ZIO.foreach_(1 to 4 )(_ => queue.take) -// _ <- TestClock.adjust(1.second) -// // test clearPausedPartitionDuration -// resumablePartitionDuringBlockPeriod2 <- dispatcher.resumeablePartitions(Set(topicPartition)) -// } yield (resumeablePartitionsWhenInHighWatermark aka "resumeablePartitionsWhenInHighWatermark" must beEmpty) and -// (resumablePartitionDuringBlockPeriod aka "resumablePartitionDuringBlockPeriod" must beEmpty) and -// (resumablePartitionAfterBlockPeriod aka "resumablePartitionAfterBlockPeriod" mustEqual Set(TopicPartition(topic, partition))) and -// (overCapacitySubmitResult aka "overCapacitySubmitResult" mustEqual Rejected) and -// (overCapacitySubmitResult2 aka "overCapacitySubmitResult2" mustEqual Rejected) and -// (resumablePartitionDuringBlockPeriod2 aka "resumablePartitionDuringBlockPeriod2" must beEmpty) -// ) - - -// private def submit(dispatcher: Dispatcher[TestClock], record: ConsumerRecord[Chunk[Byte], Chunk[Byte]]): URIO[TestClock with Env, SubmitResult] = { -// dispatcher.submit(record).tap(_ => TestClock.adjust(10.millis)) -// } -// } + "block resume paused partitions" in new ctx(lowWatermark = 30, highWatermark = 34) { + run( + for { + queue <- Queue.bounded[Record](1) + dispatcher <- Dispatcher.make[TestClock]("group", "clientId", (record) => queue.offer(record).flatMap(result => UIO(println(s"block resume paused partitions -queue.offer result: ${result}"))), + lowWatermark, highWatermark, 6500) + _ <- ZIO.foreach_(0 to (highWatermark + 1)) { offset => + submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) + } + overCapacitySubmitResult <- submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) // Will be dropped + resumeablePartitionsWhenInHighWatermark <- dispatcher.resumeablePartitions(Set(topicPartition)) + _ <- ZIO.foreach_(1 to 4 )(_ => queue.take) + _ <- TestClock.adjust(1.second) + resumablePartitionDuringBlockPeriod <- dispatcher.resumeablePartitions(Set(topicPartition)) + _ <- TestClock.adjust(6.second) + resumablePartitionAfterBlockPeriod <- dispatcher.resumeablePartitions(Set(topicPartition)) + _ <- ZIO.foreach_(0 to 3) { offset => + submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) + } + overCapacitySubmitResult2 <- submit(dispatcher, ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 16L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)) // Will be dropped + _ <- ZIO.foreach_(1 to 4 )(_ => queue.take) + _ <- TestClock.adjust(1.second) + // test clearPausedPartitionDuration + resumablePartitionDuringBlockPeriod2 <- dispatcher.resumeablePartitions(Set(topicPartition)) + } yield (resumeablePartitionsWhenInHighWatermark aka "resumeablePartitionsWhenInHighWatermark" must beEmpty) and + (resumablePartitionDuringBlockPeriod aka "resumablePartitionDuringBlockPeriod" must beEmpty) and + (resumablePartitionAfterBlockPeriod aka "resumablePartitionAfterBlockPeriod" mustEqual Set(TopicPartition(topic, partition))) and + (overCapacitySubmitResult aka "overCapacitySubmitResult" mustEqual Rejected) and + (overCapacitySubmitResult2 aka "overCapacitySubmitResult2" mustEqual Rejected) and + (resumablePartitionDuringBlockPeriod2 aka "resumablePartitionDuringBlockPeriod2" must beEmpty) + ) + + + private def submit(dispatcher: Dispatcher[TestClock], record: ConsumerRecord[Chunk[Byte], Chunk[Byte]]): URIO[TestClock with Env, SubmitResult] = { + dispatcher.submit(record).tap(_ => TestClock.adjust(10.millis)) + } + } "pause handling" in new ctx() { run(for {