Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7464] Polish the pop logger format #7465

Merged
merged 9 commits into from
Oct 16, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
PopMetricsManager.incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus());
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
POP_LOGGER.info("reviveQueueId={},retry msg, ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
queueId, popCheckPoint, messageExt.getQueueId(), messageExt.getQueueOffset(),
(System.currentTimeMillis() - popCheckPoint.getReviveTime()) / 1000, putMessageResult);
}
Expand Down Expand Up @@ -319,7 +319,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
// offset self amend
while (true) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip scan , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
POP_LOGGER.info("slave skip scan, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
break;
}
List<MessageExt> messageExts = getReviveMessage(offset, queueId);
Expand Down Expand Up @@ -351,7 +351,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
noMsgCount = 0;
}
if (System.currentTimeMillis() - startScanTime > brokerController.getBrokerConfig().getReviveScanTime()) {
POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId);
POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId);
break;
}
for (MessageExt messageExt : messageExts) {
Expand All @@ -373,7 +373,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
} else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
POP_LOGGER.info("reviveQueueId={}, find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
Expand Down Expand Up @@ -465,15 +465,15 @@ private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) {

protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwable {
ArrayList<PopCheckPoint> sortList = consumeReviveObj.genSortList();
POP_LOGGER.info("reviveQueueId={},ck listSize={}", queueId, sortList.size());
POP_LOGGER.info("reviveQueueId={}, ck listSize={}", queueId, sortList.size());
if (sortList.size() != 0) {
POP_LOGGER.info("reviveQueueId={}, 1st ck, startOffset={}, reviveOffset={} ; last ck, startOffset={}, reviveOffset={}", queueId, sortList.get(0).getStartOffset(),
POP_LOGGER.info("reviveQueueId={}, 1st ck, startOffset={}, reviveOffset={}; last ck, startOffset={}, reviveOffset={}", queueId, sortList.get(0).getStartOffset(),
sortList.get(0).getReviveOffset(), sortList.get(sortList.size() - 1).getStartOffset(), sortList.get(sortList.size() - 1).getReviveOffset());
}
long newOffset = consumeReviveObj.oldOffset;
for (PopCheckPoint popCheckPoint : sortList) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip ck process , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
POP_LOGGER.info("slave skip ck process, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
break;
}
if (consumeReviveObj.endTime - popCheckPoint.getReviveTime() <= (PopAckConstants.ackTimeInterval + PopAckConstants.SECOND)) {
Expand All @@ -483,12 +483,12 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl
// check normal topic, skip ck , if normal topic is not exist
String normalTopic = KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId());
if (brokerController.getTopicConfigManager().selectTopicConfig(normalTopic) == null) {
POP_LOGGER.warn("reviveQueueId={},can not get normal topic {} , then continue ", queueId, popCheckPoint.getTopic());
POP_LOGGER.warn("reviveQueueId={}, can not get normal topic {}, then continue", queueId, popCheckPoint.getTopic());
newOffset = popCheckPoint.getReviveOffset();
continue;
}
if (null == brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(popCheckPoint.getCId())) {
POP_LOGGER.warn("reviveQueueId={},can not get cid {} , then continue ", queueId, popCheckPoint.getCId());
POP_LOGGER.warn("reviveQueueId={}, can not get cid {}, then continue", queueId, popCheckPoint.getCId());
newOffset = popCheckPoint.getReviveOffset();
continue;
}
Expand Down Expand Up @@ -520,7 +520,7 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl

private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip retry , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
POP_LOGGER.info("slave skip retry, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
return;
}
inflightReviveRequestMap.put(popCheckPoint, new Pair<>(System.currentTimeMillis(), false));
Expand Down Expand Up @@ -646,7 +646,7 @@ public void run() {
consumeReviveMessage(consumeReviveObj);

if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip scan , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
POP_LOGGER.info("slave skip scan, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
continue;
}

Expand All @@ -662,7 +662,7 @@ public void run() {
currentReviveMessageTimestamp = System.currentTimeMillis();
}

POP_LOGGER.info("reviveQueueId={},revive finish,old offset is {}, new offset is {}, ckDelay={} ",
POP_LOGGER.info("reviveQueueId={}, revive finish,old offset is {}, new offset is {}, ckDelay={} ",
queueId, consumeReviveObj.oldOffset, consumeReviveObj.newOffset, delay);

if (sortList == null || sortList.isEmpty()) {
Expand Down
Loading