diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java index 38176c83ede..263441dac08 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java @@ -214,6 +214,64 @@ public void testBatchSend_SysOuterBatch() throws Exception { } } + @Test + public void testBatchSend_CompressionBody() throws Exception { + Assert.assertTrue(brokerController1.getMessageStore() instanceof DefaultMessageStore); + Assert.assertTrue(brokerController2.getMessageStore() instanceof DefaultMessageStore); + Assert.assertTrue(brokerController3.getMessageStore() instanceof DefaultMessageStore); + + String batchTopic = UUID.randomUUID().toString(); + IntegrationTestBase.initTopic(batchTopic, NAMESRV_ADDR, CLUSTER_NAME, CQType.SimpleCQ); + Assert.assertEquals(8, brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums()); + Assert.assertEquals(8, brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums()); + Assert.assertEquals(8, brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums()); + Assert.assertEquals(0, brokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(0, brokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(0, brokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(0, brokerController1.getMessageStore().getMaxOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(0, brokerController2.getMessageStore().getMaxOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(0, brokerController3.getMessageStore().getMaxOffsetInQueue(batchTopic, 0)); + + DefaultMQProducer producer = ProducerFactory.getRMQProducer(NAMESRV_ADDR); + MessageQueue messageQueue = producer.fetchPublishMessageQueues(batchTopic).iterator().next(); + int bodyCompressionThreshold = producer.getCompressMsgBodyOverHowmuch(); + + int bodyLen = bodyCompressionThreshold + 1; + int batchCount = 10; + int batchNum = 10; + for (int i = 0; i < batchCount; i++) { + List messageList = new ArrayList<>(); + for (int j = 0; j < batchNum; j++) { + messageList.add(new Message(batchTopic, RandomUtils.getStringWithNumber(bodyLen).getBytes())); + } + SendResult sendResult = producer.send(messageList, messageQueue); + Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus()); + Assert.assertEquals(messageQueue.getQueueId(), sendResult.getMessageQueue().getQueueId()); + Assert.assertEquals(i * batchNum, sendResult.getQueueOffset()); + Assert.assertEquals(10, sendResult.getMsgId().split(",").length); + } + Thread.sleep(300); + { + DefaultMQPullConsumer defaultMQPullConsumer = ConsumerFactory.getRMQPullConsumer(NAMESRV_ADDR, "group"); + + long startOffset = 5; + PullResult pullResult = defaultMQPullConsumer.pullBlockIfNotFound(messageQueue, "*", startOffset, batchCount * batchNum); + Assert.assertEquals(PullStatus.FOUND, pullResult.getPullStatus()); + Assert.assertEquals(0, pullResult.getMinOffset()); + Assert.assertEquals(batchCount * batchNum, pullResult.getMaxOffset()); + Assert.assertEquals(batchCount * batchNum - startOffset, pullResult.getMsgFoundList().size()); + for (int i = 0; i < pullResult.getMsgFoundList().size(); i++) { + MessageExt messageExt = pullResult.getMsgFoundList().get(i); + Assert.assertEquals(i + startOffset, messageExt.getQueueOffset()); + Assert.assertEquals(batchTopic, messageExt.getTopic()); + Assert.assertEquals(messageQueue.getQueueId(), messageExt.getQueueId()); + Assert.assertEquals(bodyLen, messageExt.getBody().length); + } + } + } + + + @Test public void testBatchSend_CheckProperties() throws Exception { List messageList = new ArrayList<>();