Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/Abhijeetmishr/rocketmq i…
Browse files Browse the repository at this point in the history
…nto iron
  • Loading branch information
Abhijeetmishr committed Apr 13, 2023
2 parents d43d59d + d9a7315 commit 0f89d3e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
public class AsyncTraceDispatcher implements TraceDispatcher {
private final static Logger log = LoggerFactory.getLogger(AsyncTraceDispatcher.class);
private final static AtomicInteger COUNTER = new AtomicInteger();
private final static short MAX_MSG_KEY_SIZE = Short.MAX_VALUE - 10000;
private final int queueSize;
private final int batchSize;
private final int maxMsgSize;
Expand Down Expand Up @@ -315,6 +316,7 @@ private String getTraceTopicName(String regionId) {
class TraceDataSegment {
private long firstBeanAddTime;
private int currentMsgSize;
private int currentMsgKeySize;
private final String traceTopicName;
private final String regionId;
private final List<TraceTransferBean> traceTransferBeanList = new ArrayList();
Expand All @@ -328,13 +330,14 @@ public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
initFirstBeanAddTime();
this.traceTransferBeanList.add(traceTransferBean);
this.currentMsgSize += traceTransferBean.getTransData().length();
if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 * 1000) {

this.currentMsgKeySize = traceTransferBean.getTransKey().stream()
.reduce(currentMsgKeySize, (acc, x) -> acc + x.length(), Integer::sum);
if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 * 1000 || currentMsgKeySize >= MAX_MSG_KEY_SIZE) {
List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);
traceExecutor.submit(asyncDataSendTask);

this.clear();

}
}

Expand All @@ -358,11 +361,11 @@ private void initFirstBeanAddTime() {
private void clear() {
this.firstBeanAddTime = 0;
this.currentMsgSize = 0;
this.currentMsgKeySize = 0;
this.traceTransferBeanList.clear();
}
}


class AsyncDataSendTask implements Runnable {
private final String traceTopicName;
private final String regionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
public interface AppendMessageCallback {

/**
* After message serialization, write MapedByteBuffer
* After message serialization, write MappedByteBuffer
*
* @return How many bytes to write
*/
AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
final int maxBlank, final MessageExtBrokerInner msg, PutMessageContext putMessageContext);

/**
* After batched message serialization, write MapedByteBuffer
* After batched message serialization, write MappedByteBuffer
*
* @param messageExtBatch, backed up by a byte array
* @return How many bytes to write
Expand Down

0 comments on commit 0f89d3e

Please sign in to comment.