Skip to content

Commit

Permalink
batch send support compression integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
iamgd67 committed Sep 29, 2024
1 parent 04ec5a1 commit db375b6
Showing 1 changed file with 58 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,64 @@ public void testBatchSend_SysOuterBatch() throws Exception {
}
}

@Test
public void testBatchSend_CompressionBody() throws Exception {
Assert.assertTrue(brokerController1.getMessageStore() instanceof DefaultMessageStore);
Assert.assertTrue(brokerController2.getMessageStore() instanceof DefaultMessageStore);
Assert.assertTrue(brokerController3.getMessageStore() instanceof DefaultMessageStore);

String batchTopic = UUID.randomUUID().toString();
IntegrationTestBase.initTopic(batchTopic, NAMESRV_ADDR, CLUSTER_NAME, CQType.SimpleCQ);
Assert.assertEquals(8, brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
Assert.assertEquals(8, brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
Assert.assertEquals(8, brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
Assert.assertEquals(0, brokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController1.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController2.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController3.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));

DefaultMQProducer producer = ProducerFactory.getRMQProducer(NAMESRV_ADDR);
MessageQueue messageQueue = producer.fetchPublishMessageQueues(batchTopic).iterator().next();
int bodyCompressionThreshold = producer.getCompressMsgBodyOverHowmuch();

int bodyLen=bodyCompressionThreshold + 1;
int batchCount = 10;
int batchNum = 10;
for (int i = 0; i < batchCount; i++) {
List<Message> messageList = new ArrayList<>();
for (int j = 0; j < batchNum; j++) {
messageList.add(new Message(batchTopic, RandomUtils.getStringWithNumber(bodyLen).getBytes()));
}
SendResult sendResult = producer.send(messageList, messageQueue);
Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus());
Assert.assertEquals(messageQueue.getQueueId(), sendResult.getMessageQueue().getQueueId());
Assert.assertEquals(i * batchNum, sendResult.getQueueOffset());
Assert.assertEquals(10, sendResult.getMsgId().split(",").length);
}
Thread.sleep(300);
{
DefaultMQPullConsumer defaultMQPullConsumer = ConsumerFactory.getRMQPullConsumer(NAMESRV_ADDR, "group");

long startOffset = 5;
PullResult pullResult = defaultMQPullConsumer.pullBlockIfNotFound(messageQueue, "*", startOffset, batchCount * batchNum);
Assert.assertEquals(PullStatus.FOUND, pullResult.getPullStatus());
Assert.assertEquals(0, pullResult.getMinOffset());
Assert.assertEquals(batchCount * batchNum, pullResult.getMaxOffset());
Assert.assertEquals(batchCount * batchNum - startOffset, pullResult.getMsgFoundList().size());
for (int i = 0; i < pullResult.getMsgFoundList().size(); i++) {
MessageExt messageExt = pullResult.getMsgFoundList().get(i);
Assert.assertEquals(i + startOffset, messageExt.getQueueOffset());
Assert.assertEquals(batchTopic, messageExt.getTopic());
Assert.assertEquals(messageQueue.getQueueId(), messageExt.getQueueId());
Assert.assertEquals(bodyLen, messageExt.getBody().length);
}
}
}



@Test
public void testBatchSend_CheckProperties() throws Exception {
List<Message> messageList = new ArrayList<>();
Expand Down

0 comments on commit db375b6

Please sign in to comment.