diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/AbsMetricItem.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/AbsMetricItem.java index 3bf0b430659..7a1e21cdb77 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/AbsMetricItem.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/AbsMetricItem.java @@ -27,10 +27,6 @@ public abstract class AbsMetricItem { protected final String name; protected final AtomicLong value = new AtomicLong(0); - public AbsMetricItem(MetricType metricType, String name) { - this(metricType, MetricValueType.NORMAL, name, 0); - } - public AbsMetricItem(MetricType metricType, MetricValueType valueType, String name, long initialValue) { this.metricType = metricType; @@ -67,6 +63,10 @@ public long incrementAndGet() { return value.incrementAndGet(); } + public boolean compareAndSet(long expect, long update) { + return value.compareAndSet(expect, update); + } + public long decrementAndGet() { return value.decrementAndGet(); } diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/CountMetricItem.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/CountMetricItem.java index 67f2c623b14..194dec8c81c 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/CountMetricItem.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/CountMetricItem.java @@ -20,7 +20,7 @@ public class CountMetricItem extends AbsMetricItem { public CountMetricItem(String name) { - super(MetricType.COUNTER, name); + super(MetricType.COUNTER, MetricValueType.MAX, name, 0); } @Override diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/GaugeNormMetricItem.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/GaugeNormMetricItem.java index 2005babc493..a2210471f68 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/GaugeNormMetricItem.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/GaugeNormMetricItem.java @@ -20,7 +20,7 @@ public class GaugeNormMetricItem extends AbsMetricItem { public GaugeNormMetricItem(String name) { - super(MetricType.GAUGE, MetricValueType.MIN, name, 0); + super(MetricType.GAUGE, MetricValueType.NORMAL, name, 0); } @Override diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValue.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValue.java index 4ff4aa48fe8..18e047744d5 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValue.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValue.java @@ -17,14 +17,11 @@ package org.apache.inlong.tubemq.corebase.metric; -import java.beans.ConstructorProperties; - public class MetricValue { private final String type; private final String name; private final long value; - @ConstructorProperties({"type", "name", "value"}) public MetricValue(String type, String name, long value) { this.name = name; this.type = type; diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValueType.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValueType.java index 64d972c3358..ded31b7a320 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValueType.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValueType.java @@ -18,13 +18,14 @@ package org.apache.inlong.tubemq.corebase.metric; public enum MetricValueType { - NORMAL(0, "Normal"), - MIN(1, "Min"), - MAX(2, "Max"); + NORMAL(0, "Normal", "Current value"), + MIN(1, "Min", "Historical minimum value"), + MAX(2, "Max", "Historical maximum value"); - MetricValueType(int id, String name) { + MetricValueType(int id, String name, String desc) { this.id = id; this.name = name; + this.desc = desc; } public int getId() { @@ -35,6 +36,10 @@ public String getName() { return name; } + public String getDesc() { + return desc; + } + public static MetricValueType valueOf(int value) { for (MetricValueType valueType : MetricValueType.values()) { if (valueType.getId() == value) { @@ -46,4 +51,5 @@ public static MetricValueType valueOf(int value) { private final int id; private final String name; + private final String desc; } diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValues.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValues.java index 34c771b6366..1ab380d8847 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValues.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValues.java @@ -17,14 +17,12 @@ package org.apache.inlong.tubemq.corebase.metric; -import java.beans.ConstructorProperties; import java.util.Map; public class MetricValues { private final String lastResetTime; private final Map metricValues; - @ConstructorProperties({"lastResetTime", "metricValues"}) public MetricValues(String lastResetTime, Map metricValues) { this.lastResetTime = lastResetTime; this.metricValues = metricValues; diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/MetricItemTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/MetricItemTest.java new file mode 100644 index 00000000000..e941b55181e --- /dev/null +++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/MetricItemTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.tubemq.corebase.metric; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricItemTest { + private static final Logger logger = + LoggerFactory.getLogger(MetricItemTest.class); + + @Test + public void testMetricItem() { + try { + final CountMetricItem countMetricItem = + new CountMetricItem("CountMetricItem"); + final GaugeNormMetricItem gaugeNormMetricItem = + new GaugeNormMetricItem("GaugeNormMetricItem"); + final GaugeMaxMetricItem gaugeMaxMetricItem = + new GaugeMaxMetricItem("GaugeMaxMetricItem"); + final GaugeMinMetricItem gaugeMinMetricItem = + new GaugeMinMetricItem("GaugeMinMetricItem"); + + countMetricItem.incrementAndGet(); + countMetricItem.incrementAndGet(); + countMetricItem.incrementAndGet(); + countMetricItem.decrementAndGet(); + + gaugeNormMetricItem.update(1000); + gaugeNormMetricItem.update(2000); + gaugeNormMetricItem.update(500); + + gaugeMaxMetricItem.update(1000); + gaugeMaxMetricItem.update(5000); + gaugeMaxMetricItem.update(3000); + + gaugeMinMetricItem.update(1000); + gaugeMinMetricItem.update(1); + gaugeMinMetricItem.update(10000); + + Assert.assertEquals(2, countMetricItem.getValue()); + Assert.assertEquals(500, gaugeNormMetricItem.getValue()); + Assert.assertEquals(5000, gaugeMaxMetricItem.getValue()); + Assert.assertEquals(1, gaugeMinMetricItem.getValue()); + + countMetricItem.getAndSet(); + gaugeNormMetricItem.getAndSet(); + gaugeMaxMetricItem.getAndSet(); + gaugeMinMetricItem.getAndSet(); + + Assert.assertEquals(0, countMetricItem.getValue()); + Assert.assertEquals(500, gaugeNormMetricItem.getValue()); + Assert.assertEquals(0, gaugeMaxMetricItem.getValue()); + Assert.assertEquals(Long.MAX_VALUE, gaugeMinMetricItem.getValue()); + + Assert.assertEquals(MetricType.COUNTER.getId(), + countMetricItem.getMetricType().getId()); + Assert.assertEquals(MetricValueType.MAX.getId(), + countMetricItem.getMetricValueType().getId()); + Assert.assertEquals(MetricType.GAUGE.getId(), + gaugeNormMetricItem.getMetricType().getId()); + Assert.assertEquals(MetricValueType.NORMAL.getId(), + gaugeNormMetricItem.getMetricValueType().getId()); + Assert.assertEquals(MetricType.GAUGE.getId(), + gaugeMaxMetricItem.getMetricType().getId()); + Assert.assertEquals(MetricValueType.MAX.getId(), + gaugeMaxMetricItem.getMetricValueType().getId()); + Assert.assertEquals(MetricType.GAUGE.getId(), + gaugeMinMetricItem.getMetricType().getId()); + Assert.assertEquals(MetricValueType.MIN.getId(), + gaugeMinMetricItem.getMetricValueType().getId()); + } catch (Exception ex) { + logger.error("error happens" + ex); + } + } +} diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java index 6fe76f4558c..f6cfa46f2fe 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java @@ -845,7 +845,7 @@ private RegisterResponseB2C inProcessConsumerRegister(final String clientId, fin consumerNodeInfo = new ConsumerNodeInfo(storeManager, reqQryPriorityId, clientId, filterCondSet, reqSessionKey, reqSessionTime, true, partStr); if (consumerRegisterMap.put(partStr, consumerNodeInfo) == null) { - BrokerMetricsHolder.METRICS.consumerOnlineCnt.incrementAndGet(); + BrokerMetricsHolder.incConsumerCnt(); } heartbeatManager.regConsumerNode(getHeartbeatNodeId(clientId, partStr), clientId, partStr); MessageStore dataStore = null; @@ -891,7 +891,7 @@ private RegisterResponseB2C inProcessConsumerRegister(final String clientId, fin heartbeatManager.getConsumerRegMap().get(getHeartbeatNodeId(consumerId, partStr)); if (timeoutInfo == null || System.currentTimeMillis() >= timeoutInfo.getTimeoutTime()) { if (consumerRegisterMap.remove(partStr) != null) { - BrokerMetricsHolder.METRICS.consumerOnlineCnt.decrementAndGet(); + BrokerMetricsHolder.decConsumerCnt(true); } strBuffer.append("[Duplicated Register] Remove Invalid Consumer Register ") .append(consumerId).append(TokenConstants.SEGMENT_SEP).append(partStr); @@ -957,7 +957,7 @@ private RegisterResponseB2C inProcessConsumerUnregister(final String clientId, f .append(request.getPartitionId()).append(" updatedOffset:").append(updatedOffset).toString()); strBuffer.delete(0, strBuffer.length()); if (consumerRegisterMap.remove(partStr) != null) { - BrokerMetricsHolder.METRICS.consumerOnlineCnt.decrementAndGet(); + BrokerMetricsHolder.decConsumerCnt(false); } heartbeatManager.unRegConsumerNode( getHeartbeatNodeId(clientId, partStr)); @@ -1245,8 +1245,7 @@ public void onTimeout(final String nodeId, TimeoutInfo nodeInfo) { } if (consumerNodeInfo.getConsumerId().equalsIgnoreCase(nodeInfo.getSecondKey())) { if (consumerRegisterMap.remove(nodeInfo.getThirdKey()) != null) { - BrokerMetricsHolder.METRICS.consumerOnlineCnt.decrementAndGet(); - BrokerMetricsHolder.METRICS.consumerTmoTotCnt.decrementAndGet(); + BrokerMetricsHolder.decConsumerCnt(true); } String[] groupTopicPart = consumerNodeInfo.getPartStr().split(TokenConstants.ATTR_SEP); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java index 474ddfbfcd7..6773264b643 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java @@ -218,8 +218,7 @@ public void run() { if (!response.getSuccess()) { isKeepAlive.set(false); if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) { - BrokerMetricsHolder.METRICS - .masterNoNodeCnt.incrementAndGet(); + BrokerMetricsHolder.incMasterNoNodeCnt(); register2Master(); heartbeatErrors.set(0); logger.info("Re-register to master successfully!"); @@ -234,7 +233,7 @@ public void run() { isKeepAlive.set(false); heartbeatErrors.incrementAndGet(); samplePrintCtrl.printExceptionCaught(t); - BrokerMetricsHolder.METRICS.hbExceptionCnt.incrementAndGet(); + BrokerMetricsHolder.incHBExceptionCnt(); } } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java index f8f0c383c81..30585563c48 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java @@ -20,14 +20,17 @@ import org.apache.inlong.tubemq.corebase.metric.MetricValues; /** - * BrokerMonitorMXBean - * Provide access interface of a metric item with JMX.
- * Decouple between metric item and monitor system, in particular scene,
- * inlong can depend on user-defined monitor system. + * BrokerMetricMXBean + * Broker's metric data access interface, including: + * the getMetric() that directly obtains data + * the getAndReSetMetrics() that can clear the values of + * the counter, maximum and minimum extremum Gauge data */ public interface BrokerMetricMXBean { + // get current metric data by viewing mode MetricValues getMetrics(); + // get current metric data and reset the Counter, maximum/minimum Gauge metric MetricValues getAndReSetMetrics(); } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetrics.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetrics.java index f7588c819d0..aa573c99fa6 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetrics.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetrics.java @@ -33,31 +33,33 @@ public class BrokerMetrics implements BrokerMetricMXBean { private final AtomicLong lastResetTime = new AtomicLong(System.currentTimeMillis()); - public final AbsMetricItem syncDataDurMin = - new GaugeMinMetricItem("fSync_latency_min"); - public final AbsMetricItem syncDataDurMax = - new GaugeMaxMetricItem("fSync_latency_max"); - public final AbsMetricItem syncZkDurMin = - new GaugeMinMetricItem("zkSync_latency_min"); - public final AbsMetricItem syncZkDurMax = - new GaugeMaxMetricItem("zkSync_latency_max"); - public final AbsMetricItem zkExceptionCnt = + // Delay statistics for syncing data to files + protected final AbsMetricItem syncDataDurMin = + new GaugeMinMetricItem("fSync_duration_min"); + protected final AbsMetricItem syncDataDurMax = + new GaugeMaxMetricItem("fSync_duration_max"); + // Delay statistics for syncing data to Zookeeper + protected final AbsMetricItem syncZkDurMin = + new GaugeMinMetricItem("zkSync_duration_min"); + protected final AbsMetricItem syncZkDurMax = + new GaugeMaxMetricItem("zkSync_duration_max"); + // Zookeeper Exception statistics + protected final AbsMetricItem zkExceptionCnt = new CountMetricItem("zk_exception_cnt"); - public final AbsMetricItem masterNoNodeCnt = + // Broker 2 Master status statistics + protected final AbsMetricItem masterNoNodeCnt = new CountMetricItem("online_timeout_cnt"); - public final AbsMetricItem hbExceptionCnt = + protected final AbsMetricItem hbExceptionCnt = new CountMetricItem("hb_master_exception_cnt"); - public final AbsMetricItem ioExceptionCnt = + // Disk IO Exception statistics + protected final AbsMetricItem ioExceptionCnt = new CountMetricItem("io_exception_cnt"); - public final AbsMetricItem consumerOnlineCnt = + // Consumer client statistics + protected final AbsMetricItem consumerOnlineCnt = new GaugeNormMetricItem("consumer_online_cnt"); - public final AbsMetricItem consumerTmoTotCnt = + protected final AbsMetricItem consumerTmoTotCnt = new CountMetricItem("consumer_timeout_cnt"); - public BrokerMetrics() { - this.lastResetTime.set(System.currentTimeMillis()); - } - @Override public MetricValues getMetrics() { Map metricValues = new HashMap<>(); @@ -92,5 +94,49 @@ public MetricValues getAndReSetMetrics() { return new MetricValues( WebParameterUtils.date2yyyyMMddHHmmss(new Date(befTime)), metricValues); } + + public long getLastResetTime() { + return lastResetTime.get(); + } + + public AbsMetricItem getSyncDataDurMin() { + return syncDataDurMin; + } + + public AbsMetricItem getSyncDataDurMax() { + return syncDataDurMax; + } + + public AbsMetricItem getSyncZkDurMin() { + return syncZkDurMin; + } + + public AbsMetricItem getSyncZkDurMax() { + return syncZkDurMax; + } + + public AbsMetricItem getZkExceptionCnt() { + return zkExceptionCnt; + } + + public AbsMetricItem getMasterNoNodeCnt() { + return masterNoNodeCnt; + } + + public AbsMetricItem getHbExceptionCnt() { + return hbExceptionCnt; + } + + public AbsMetricItem getIoExceptionCnt() { + return ioExceptionCnt; + } + + public AbsMetricItem getConsumerOnlineCnt() { + return consumerOnlineCnt; + } + + public AbsMetricItem getConsumerTmoTotCnt() { + return consumerTmoTotCnt; + } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricsHolder.java index 845f14d37f3..b64c6785cad 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricsHolder.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricsHolder.java @@ -27,9 +27,10 @@ public class BrokerMetricsHolder { private static final Logger logger = LoggerFactory.getLogger(BrokerMetricsHolder.class); - + // Registration status indicator private static final AtomicBoolean registered = new AtomicBoolean(false); - public static final BrokerMetrics METRICS = new BrokerMetrics(); + // broker metrics information + private static final BrokerMetrics statsInfo = new BrokerMetrics(); public static void registerMXBean() { if (!registered.compareAndSet(false, true)) { @@ -38,11 +39,52 @@ public static void registerMXBean() { try { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName mxBeanName = - new ObjectName("org.apache.inlong.tubemq.server.broker:type=brokerMetrics"); - mbs.registerMBean(METRICS, mxBeanName); + new ObjectName("org.apache.inlong.tubemq.server.broker:type=BrokerMetrics"); + mbs.registerMBean(statsInfo, mxBeanName); } catch (Exception ex) { logger.error("Register BrokerMXBean error: ", ex); } } + + public static void incConsumerCnt() { + statsInfo.consumerOnlineCnt.incrementAndGet(); + } + + public static void decConsumerCnt(boolean isTimeout) { + statsInfo.consumerOnlineCnt.decrementAndGet(); + if (isTimeout) { + statsInfo.consumerTmoTotCnt.incrementAndGet(); + } + } + + public static void incMasterNoNodeCnt() { + statsInfo.masterNoNodeCnt.incrementAndGet(); + } + + public static void incHBExceptionCnt() { + statsInfo.hbExceptionCnt.incrementAndGet(); + } + + public static void incIOExceptionCnt() { + statsInfo.ioExceptionCnt.incrementAndGet(); + } + + public static void incZKExceptionCnt() { + statsInfo.zkExceptionCnt.incrementAndGet(); + } + + public static void updSyncDataDurations(long dltTime) { + statsInfo.syncDataDurMin.update(dltTime); + statsInfo.syncDataDurMax.update(dltTime); + } + + public static void updSyncZKDurations(long dltTime) { + statsInfo.syncZkDurMin.update(dltTime); + statsInfo.syncZkDurMax.update(dltTime); + } + + public static BrokerMetrics getStatsInfo() { + return statsInfo; + } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java index a6202cf7937..42580bde48b 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java @@ -115,7 +115,7 @@ private FileSegment(final long start, final File file, } catch (final Exception e) { if (e instanceof IOException) { ServiceStatusHolder.addReadIOErrCnt(); - BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet(); + BrokerMetricsHolder.incIOExceptionCnt(); } if (this.segmentType == SegmentType.DATA) { logger.error("[File Store] Set DATA Segment cachedSize error", e); @@ -140,7 +140,7 @@ public void close() { } catch (Throwable ee) { if (ee instanceof IOException) { ServiceStatusHolder.addReadIOErrCnt(); - BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet(); + BrokerMetricsHolder.incIOExceptionCnt(); } logger.error(new StringBuilder(512).append("[File Store] Close ") .append(this.file.getAbsoluteFile().toString()) @@ -163,7 +163,7 @@ public void deleteFile() { } catch (Throwable e1) { if (e1 instanceof IOException) { ServiceStatusHolder.addReadIOErrCnt(); - BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet(); + BrokerMetricsHolder.incIOExceptionCnt(); } logger.error("[File Store] failure to close channel ", e1); } @@ -175,7 +175,7 @@ public void deleteFile() { } catch (Throwable ee) { if (ee instanceof IOException) { ServiceStatusHolder.addReadIOErrCnt(); - BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet(); + BrokerMetricsHolder.incIOExceptionCnt(); } logger.error("[File Store] failure to delete file ", ee); } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java index 2b116dd8f68..33495b4d657 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java @@ -197,7 +197,7 @@ public void batchAppendMsg(final StringBuilder sb, final int msgCnt, // print abnormal information if (inIndexOffset != indexOffset || inDataOffset != dataOffset) { ServiceStatusHolder.addWriteIOErrCnt(); - BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet(); + BrokerMetricsHolder.incIOExceptionCnt(); logger.error(sb.append("[File Store]: appendMsg data Error, storekey=") .append(this.storeKey).append(",msgCnt=").append(msgCnt) .append(",indexSize=").append(indexSize) @@ -211,7 +211,7 @@ public void batchAppendMsg(final StringBuilder sb, final int msgCnt, } catch (Throwable e) { if (!closed.get()) { ServiceStatusHolder.addWriteIOErrCnt(); - BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet(); + BrokerMetricsHolder.incIOExceptionCnt(); } samplePrintCtrl.printExceptionCaught(e); } finally { @@ -327,7 +327,7 @@ public GetMessageResult getMessages(final int partitionId, final long lastRdOffs } catch (Throwable e2) { if (e2 instanceof IOException) { ServiceStatusHolder.addReadIOErrCnt(); - BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet(); + BrokerMetricsHolder.incIOExceptionCnt(); } samplePrintCtrl.printExceptionCaught(e2, messageStore.getStoreKey(), String.valueOf(partitionId)); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java index f37dfa2bbb7..e66211295b2 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java @@ -272,12 +272,10 @@ public boolean batchFlush(MsgFileStore msgFileStore, final ByteBuffer tmpDataReadBuf = this.cacheDataSegment.asReadOnlyBuffer(); tmpIndexBuffer.flip(); tmpDataReadBuf.flip(); - long tmpValue = System.currentTimeMillis(); + long startTime = System.currentTimeMillis(); msgFileStore.batchAppendMsg(strBuffer, curMessageCount.get(), cacheIndexOffset.get(), tmpIndexBuffer, cacheDataOffset.get(), tmpDataReadBuf); - long dltTime = System.currentTimeMillis() - tmpValue; - BrokerMetricsHolder.METRICS.syncDataDurMin.update(dltTime); - BrokerMetricsHolder.METRICS.syncDataDurMax.update(dltTime); + BrokerMetricsHolder.updSyncDataDurations(System.currentTimeMillis() - startTime); return true; } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java index a4f1b7a9dd0..1e74bb783a6 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java @@ -614,7 +614,7 @@ private void commitTmpOffsets() { } private void commitCfmOffsets(boolean retryable) { - long tmpValue = System.currentTimeMillis(); + long startTime = System.currentTimeMillis(); for (Map.Entry> entry : cfmOffsetMap.entrySet()) { if (TStringUtils.isBlank(entry.getKey()) || entry.getValue() == null || entry.getValue().isEmpty()) { @@ -622,9 +622,7 @@ private void commitCfmOffsets(boolean retryable) { } zkOffsetStorage.commitOffset(entry.getKey(), entry.getValue().values(), retryable); } - long dltTime = System.currentTimeMillis() - tmpValue; - BrokerMetricsHolder.METRICS.syncZkDurMin.update(dltTime); - BrokerMetricsHolder.METRICS.syncZkDurMax.update(dltTime); + BrokerMetricsHolder.updSyncZKDurations(System.currentTimeMillis() - startTime); } /*** diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java index 30eb1d127ef..7d23ba12986 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java @@ -79,7 +79,7 @@ public ZkOffsetStorage(final ZKConfig zkConfig, boolean isBroker, int brokerId) try { this.zkw = new ZooKeeperWatcher(zkConfig); } catch (Throwable e) { - BrokerMetricsHolder.METRICS.zkExceptionCnt.incrementAndGet(); + BrokerMetricsHolder.incZKExceptionCnt(); logger.error(new StringBuilder(256) .append("[ZkOffsetStorage] Failed to connect ZooKeeper server (") .append(this.zkConfig.getZkServerAddr()).append(") !").toString(), e); @@ -143,7 +143,7 @@ public OffsetStorageInfo loadOffset(final String group, final String topic, int try { offsetZkInfo = ZKUtil.readDataMaybeNull(this.zkw, znode); } catch (KeeperException e) { - BrokerMetricsHolder.METRICS.zkExceptionCnt.incrementAndGet(); + BrokerMetricsHolder.incZKExceptionCnt(); logger.error("KeeperException during load offsets from ZooKeeper", e); return null; } @@ -183,7 +183,7 @@ private void cfmOffset(final StringBuilder sb, final String group, try { ZKUtil.updatePersistentPath(this.zkw, offsetPath, offsetData); } catch (final Throwable t) { - BrokerMetricsHolder.METRICS.zkExceptionCnt.incrementAndGet(); + BrokerMetricsHolder.incZKExceptionCnt(); logger.error("Exception during commit offsets to ZooKeeper", t); throw new OffsetStoreException(t); } @@ -224,7 +224,7 @@ public Map queryGroupOffsetInfo(String group, String topic, offsetMap.put(partitionId, Long.parseLong(offsetInfoStrs[1])); } } catch (Throwable e) { - BrokerMetricsHolder.METRICS.zkExceptionCnt.incrementAndGet(); + BrokerMetricsHolder.incZKExceptionCnt(); offsetMap.put(partitionId, null); } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java index 189518ba67d..8a08f2be598 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java @@ -110,7 +110,7 @@ import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity; import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity; import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity; -import org.apache.inlong.tubemq.server.master.metrics.MasterMetric; +import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder; import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHolder; import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager; import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.DefBrokerRunManager; @@ -164,7 +164,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable { private AtomicInteger curCltBalanceParal = new AtomicInteger(0); private Sleeper stopSleeper = new Sleeper(1000, this); private SimpleVisitTokenManager visitTokenManager; - private final MasterMetric masterMetrics; /** * constructor @@ -179,7 +178,8 @@ public TMaster(MasterConfig masterConfig) throws Exception { this.checkAndCreateBdbDataPath(); this.masterAddInfo = new NodeAddrInfo(masterConfig.getHostName(), masterConfig.getPort()); - this.masterMetrics = MasterMetric.create(); + // register metric bean + MasterMetricsHolder.registerMXBean(); this.svrExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel()); this.cltExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel()); this.visitTokenManager = new SimpleVisitTokenManager(this.masterConfig); @@ -189,9 +189,9 @@ public TMaster(MasterConfig masterConfig) throws Exception { false, TBaseConstants.META_VALUE_UNDEFINED); this.producerHolder = new ProducerInfoHolder(); this.consumerHolder = new ConsumerInfoHolder(this); - this.consumerEventManager = new ConsumerEventManager(consumerHolder, masterMetrics); + this.consumerEventManager = new ConsumerEventManager(consumerHolder); this.topicPSInfoManager = new TopicPSInfoManager(this); - this.loadBalancer = new DefaultLoadBalancer(masterMetrics); + this.loadBalancer = new DefaultLoadBalancer(); heartbeatManager.regConsumerCheckBusiness(masterConfig.getConsumerHeartbeatTimeoutMs(), new TimeoutListener() { @Override @@ -283,10 +283,6 @@ public BrokerRunManager getBrokerRunManager() { return brokerRunManager; } - public MasterMetric getMasterMetrics() { - return masterMetrics; - } - /** * Producer register request to master * @@ -350,10 +346,8 @@ public RegisterResponseM2P producerRegisterP2M(RegisterRequestP2M request, } final String clientJdkVer = request.hasJdkVersion() ? request.getJdkVersion() : ""; heartbeatManager.regProducerNode(producerId); - if (producerHolder.setProducerInfo(producerId, - new HashSet<>(transTopicSet), hostName, overtls)) { - masterMetrics.producerCnt.incrementAndGet(); - } + producerHolder.setProducerInfo(producerId, + new HashSet<>(transTopicSet), hostName, overtls); Tuple2> brokerStaticInfo = brokerRunManager.getBrokerStaticInfo(overtls); builder.setBrokerCheckSum(brokerStaticInfo.getF0()); @@ -1697,11 +1691,8 @@ private void processServerBalance(TMaster tMaster, final List subGroups = groupsNeedToBalance.subList(startIndex, endIndex); if (subGroups.isEmpty()) { if (curSvrBalanceParal.decrementAndGet() == 0) { - long durTime = System.currentTimeMillis() - startBalanceTime; - masterMetrics.svrBalLatency.set(durTime); - if (durTime > masterMetrics.svrBalLatencyMax.get()) { - masterMetrics.svrBalLatencyMax.set(durTime); - } + MasterMetricsHolder.updSvrBalanceDurations( + System.currentTimeMillis() - startBalanceTime); } continue; } @@ -1742,11 +1733,8 @@ public void run() { logger.warn("[Svr-Balance processor] Error during process", e); } finally { if (curSvrBalanceParal.decrementAndGet() == 0) { - long durTime = System.currentTimeMillis() - startBalanceTime; - masterMetrics.svrBalLatency.set(durTime); - if (durTime > masterMetrics.svrBalLatencyMax.get()) { - masterMetrics.svrBalLatencyMax.set(durTime); - } + MasterMetricsHolder.updSvrBalanceDurations( + System.currentTimeMillis() - startBalanceTime); } } } @@ -2570,24 +2558,12 @@ void run(String arg, boolean isTimeout) { try { lid = masterRowLock.getLock(null, StringUtils.getBytesUtf8(consumerId), true); - ConsumerInfo info = consumerHolder.removeConsumer(group, consumerId); + ConsumerInfo info = consumerHolder.removeConsumer(group, consumerId, isTimeout); currentSubInfo.remove(consumerId); consumerEventManager.removeAll(consumerId); if (info != null) { if (consumerHolder.isConsumeGroupEmpty(group)) { topicPSInfoManager.rmvGroupSubTopicInfo(group, info.getTopicSet()); - // metric - masterMetrics.consumeGroupCnt.decrementAndGet(); - if (info.getConsumeType() == ConsumeType.CONSUME_CLIENT_REB) { - masterMetrics.cltBalConsumeGroupCnt.decrementAndGet(); - } - if (isTimeout) { - masterMetrics.consumeGroupTmoTotCnt.incrementAndGet(); - } - } - masterMetrics.consumerCnt.decrementAndGet(); - if (isTimeout) { - masterMetrics.consumerTmoTotCnt.incrementAndGet(); } } } catch (IOException e) { @@ -2604,13 +2580,9 @@ private class ReleaseProducer extends AbstractReleaseRunner { @Override void run(String clientId, boolean isTimeout) { if (clientId != null) { - ProducerInfo info = producerHolder.removeProducer(clientId); + ProducerInfo info = producerHolder.removeProducer(clientId, isTimeout); if (info != null) { topicPSInfoManager.rmvProducerTopicPubInfo(clientId, info.getTopicSet()); - masterMetrics.producerCnt.decrementAndGet(); - if (isTimeout) { - masterMetrics.producerTmoTotCnt.incrementAndGet(); - } } } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java index 501f45ecdc8..93568ed0f61 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java @@ -34,7 +34,7 @@ import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorage; import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager; import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity; -import org.apache.inlong.tubemq.server.master.metrics.MasterMetric; +import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder; import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager; import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo; import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo; @@ -48,10 +48,9 @@ public class DefaultLoadBalancer implements LoadBalancer { private static final Logger logger = LoggerFactory.getLogger(LoadBalancer.class); private static final Random RANDOM = new Random(System.currentTimeMillis()); - private final MasterMetric masterMetrics; - public DefaultLoadBalancer(MasterMetric masterMetrics) { - this.masterMetrics = masterMetrics; + public DefaultLoadBalancer() { + // initial information } /** @@ -663,13 +662,8 @@ private Map>> inReBalanceCluster( } } if (consumeGroupInfo.addAllocatedTimes() > 0) { - long durTime = System.currentTimeMillis() - consumeGroupInfo.getCreateTime(); - if (durTime < masterMetrics.svrBalResetDurMin.get()) { - masterMetrics.svrBalResetDurMin.set(durTime); - } - if (durTime > masterMetrics.svrBalResetDurMax.get()) { - masterMetrics.svrBalResetDurMax.set(durTime); - } + MasterMetricsHolder.updSvrBalResetDurations( + System.currentTimeMillis() - consumeGroupInfo.getCreateTime()); } } return finalSubInfoMap; diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java deleted file mode 100644 index 20f93efaeb8..00000000000 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.tubemq.server.master.metrics; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.inlong.commons.config.metrics.CountMetric; -import org.apache.inlong.commons.config.metrics.Dimension; -import org.apache.inlong.commons.config.metrics.GaugeMetric; -import org.apache.inlong.commons.config.metrics.MetricDomain; -import org.apache.inlong.commons.config.metrics.MetricItem; -import org.apache.inlong.commons.config.metrics.MetricRegister; - -@MetricDomain(name = "master_metrics") -public class MasterMetric extends MetricItem { - - private static final MasterMetric MASTER_METRICS = new MasterMetric(); - private static final AtomicBoolean REGISTER_ONCE = - new AtomicBoolean(false); - private static final String METRIC_NAME = "master_metrics"; - - @Dimension - public String tagName; - - @GaugeMetric - public AtomicLong consumeGroupCnt = new AtomicLong(0); - - @GaugeMetric - public AtomicLong cltBalConsumeGroupCnt = new AtomicLong(0); - - @CountMetric - public AtomicLong consumeGroupTmoTotCnt = new AtomicLong(0); - - @GaugeMetric - public AtomicLong consumerCnt = new AtomicLong(0); - - @CountMetric - public AtomicLong consumerTmoTotCnt = new AtomicLong(0); - - @GaugeMetric - public AtomicLong producerCnt = new AtomicLong(0); - - @CountMetric - public AtomicLong producerTmoTotCnt = new AtomicLong(0); - - @GaugeMetric - public AtomicLong brokerConfigCnt = new AtomicLong(0); - - @GaugeMetric - public AtomicLong brokerOnlineCnt = new AtomicLong(0); - - @CountMetric - public AtomicLong brokerAbnTotCnt = new AtomicLong(0); - - @GaugeMetric - public AtomicLong brokerAbnCurCnt = new AtomicLong(0); - - @CountMetric - public AtomicLong brokerFbdTotCnt = new AtomicLong(0); - - @GaugeMetric - public AtomicLong brokerFbdCurCnt = new AtomicLong(0); - - @CountMetric - public AtomicLong brokerTmoTotCnt = new AtomicLong(0); - - @GaugeMetric - public AtomicLong svrBalLatency = new AtomicLong(0); - - @GaugeMetric - public AtomicLong svrBalLatencyMax = new AtomicLong(0); - - @GaugeMetric - public AtomicLong svrBalResetDurMin = new AtomicLong(Long.MAX_VALUE); - - @GaugeMetric - public AtomicLong svrBalResetDurMax = new AtomicLong(0); - - @GaugeMetric - public AtomicLong svrBalConEventConsumerCnt = new AtomicLong(0); - - @GaugeMetric - public AtomicLong svrBalDisConEventConsumerCnt = new AtomicLong(0); - - public static MasterMetric create() { - if (REGISTER_ONCE.compareAndSet(false, true)) { - MASTER_METRICS.tagName = METRIC_NAME; - MetricRegister.register(MASTER_METRICS); - } - return MASTER_METRICS; - } -} - diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricMXBean.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricMXBean.java new file mode 100644 index 00000000000..fa98f2d5efc --- /dev/null +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricMXBean.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.tubemq.server.master.metrics; + +import org.apache.inlong.tubemq.corebase.metric.MetricValues; + +/** + * MasterMetricMXBean + * Master's metric data access interface, including: + * the getMetric() that directly obtains data + * the getAndReSetMetrics() that can clear the values of + * the counter, maximum and minimum extremum Gauge data + */ +public interface MasterMetricMXBean { + + // get current metric data by viewing mode + MetricValues getMetrics(); + + // get current metric data and reset the Counter, maximum/minimum Gauge metric + MetricValues getAndReSetMetrics(); +} diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetrics.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetrics.java new file mode 100644 index 00000000000..0c0f2d29d18 --- /dev/null +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetrics.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.tubemq.server.master.metrics; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.inlong.tubemq.corebase.metric.AbsMetricItem; +import org.apache.inlong.tubemq.corebase.metric.CountMetricItem; +import org.apache.inlong.tubemq.corebase.metric.GaugeMaxMetricItem; +import org.apache.inlong.tubemq.corebase.metric.GaugeMinMetricItem; +import org.apache.inlong.tubemq.corebase.metric.GaugeNormMetricItem; +import org.apache.inlong.tubemq.corebase.metric.MetricValues; +import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils; + +public class MasterMetrics implements MasterMetricMXBean { + + // statistics time since last reset + private final AtomicLong lastResetTime = + new AtomicLong(System.currentTimeMillis()); + // consume group statistics + protected final AbsMetricItem consumeGroupCnt = + new GaugeNormMetricItem("consume_group_cnt"); + protected final AbsMetricItem consumeGroupTmoTotCnt = + new CountMetricItem("consume_group_timeout_cnt"); + protected final AbsMetricItem cltBalConsumeGroupCnt = + new GaugeNormMetricItem("client_balance_group_cnt"); + protected final AbsMetricItem cltBalGroupTmototCnt = + new CountMetricItem("clt_balance_timeout_cnt"); + // consumer client statistics + protected final AbsMetricItem consumerOnlineCnt = + new GaugeNormMetricItem("consumer_online_cnt"); + protected final AbsMetricItem consumerTmoTotCnt = + new CountMetricItem("consumer_timeout_cnt"); + // producer client statistics + protected final AbsMetricItem producerOnlineCnt = + new GaugeNormMetricItem("producer_online_cnt"); + protected final AbsMetricItem producerTmoTotCnt = + new CountMetricItem("producer_timeout_cnt"); + // broker node statistics + protected final AbsMetricItem brokerConfigCnt = + new GaugeNormMetricItem("broker_configure_cnt"); + protected final AbsMetricItem brokerOnlineCnt = + new GaugeNormMetricItem("broker_online_cnt"); + protected final AbsMetricItem brokerTmoTotCnt = + new CountMetricItem("broker_timeout_cnt"); + protected final AbsMetricItem brokerAbnCurCnt = + new GaugeNormMetricItem("broker_abn_current_cnt"); + protected final AbsMetricItem brokerAbnTotCnt = + new CountMetricItem("broker_abn_total_cnt"); + protected final AbsMetricItem brokerFbdCurCnt = + new GaugeNormMetricItem("broker_fbd_current_cnt"); + protected final AbsMetricItem brokerFbdTotCnt = + new CountMetricItem("broker_fbd_total_cnt"); + // server balance statistics + protected final AbsMetricItem svrBalDuration = + new GaugeNormMetricItem("svrbalance_duration"); + protected final AbsMetricItem svrBalDurationMin = + new GaugeMinMetricItem("svrbalance_duration_min"); + protected final AbsMetricItem svrBalDurationMax = + new GaugeMaxMetricItem("svrbalance_duration_max"); + protected final AbsMetricItem svrBalResetDurMin = + new GaugeMinMetricItem("svrbal_reset_duration_min"); + protected final AbsMetricItem svrBalResetDurMax = + new GaugeMaxMetricItem("svrbal_reset_duration_max"); + protected final AbsMetricItem svrBalConEventConsumerCnt = + new GaugeNormMetricItem("svrbal_con_consumer_cnt"); + protected final AbsMetricItem svrBalDisConEventConsumerCnt = + new GaugeNormMetricItem("svrbal_discon_consumer_cnt"); + + @Override + public MetricValues getMetrics() { + Map metricValues = new HashMap<>(); + metricValues.put(consumeGroupCnt.getName(), consumeGroupCnt.getValue()); + metricValues.put(consumeGroupTmoTotCnt.getName(), consumeGroupTmoTotCnt.getValue()); + metricValues.put(cltBalConsumeGroupCnt.getName(), cltBalConsumeGroupCnt.getValue()); + metricValues.put(cltBalGroupTmototCnt.getName(), cltBalGroupTmototCnt.getValue()); + metricValues.put(consumerOnlineCnt.getName(), consumerOnlineCnt.getValue()); + metricValues.put(consumerTmoTotCnt.getName(), consumerTmoTotCnt.getValue()); + metricValues.put(producerOnlineCnt.getName(), producerOnlineCnt.getValue()); + metricValues.put(producerTmoTotCnt.getName(), producerTmoTotCnt.getValue()); + metricValues.put(brokerConfigCnt.getName(), brokerConfigCnt.getValue()); + metricValues.put(brokerOnlineCnt.getName(), brokerOnlineCnt.getValue()); + metricValues.put(brokerTmoTotCnt.getName(), brokerTmoTotCnt.getValue()); + metricValues.put(brokerAbnCurCnt.getName(), brokerAbnCurCnt.getValue()); + metricValues.put(brokerAbnTotCnt.getName(), brokerAbnTotCnt.getValue()); + metricValues.put(brokerFbdCurCnt.getName(), brokerFbdCurCnt.getValue()); + metricValues.put(brokerFbdTotCnt.getName(), brokerFbdTotCnt.getValue()); + metricValues.put(svrBalDuration.getName(), svrBalDuration.getValue()); + metricValues.put(svrBalDurationMin.getName(), svrBalDurationMin.getValue()); + metricValues.put(svrBalDurationMax.getName(), svrBalDurationMax.getValue()); + metricValues.put(svrBalResetDurMin.getName(), svrBalResetDurMin.getValue()); + metricValues.put(svrBalResetDurMax.getName(), svrBalResetDurMax.getValue()); + metricValues.put(svrBalConEventConsumerCnt.getName(), + svrBalConEventConsumerCnt.getValue()); + metricValues.put(svrBalDisConEventConsumerCnt.getName(), + svrBalDisConEventConsumerCnt.getValue()); + return new MetricValues(WebParameterUtils.date2yyyyMMddHHmmss( + new Date(lastResetTime.get())), metricValues); + } + + @Override + public MetricValues getAndReSetMetrics() { + Map metricValues = new HashMap<>(); + metricValues.put(consumeGroupCnt.getName(), consumeGroupCnt.getAndSet()); + metricValues.put(consumeGroupTmoTotCnt.getName(), consumeGroupTmoTotCnt.getAndSet()); + metricValues.put(cltBalConsumeGroupCnt.getName(), cltBalConsumeGroupCnt.getAndSet()); + metricValues.put(cltBalGroupTmototCnt.getName(), cltBalGroupTmototCnt.getAndSet()); + metricValues.put(consumerOnlineCnt.getName(), consumerOnlineCnt.getAndSet()); + metricValues.put(consumerTmoTotCnt.getName(), consumerTmoTotCnt.getAndSet()); + metricValues.put(producerOnlineCnt.getName(), producerOnlineCnt.getAndSet()); + metricValues.put(producerTmoTotCnt.getName(), producerTmoTotCnt.getAndSet()); + metricValues.put(brokerConfigCnt.getName(), brokerConfigCnt.getAndSet()); + metricValues.put(brokerOnlineCnt.getName(), brokerOnlineCnt.getAndSet()); + metricValues.put(brokerTmoTotCnt.getName(), brokerTmoTotCnt.getAndSet()); + metricValues.put(brokerAbnCurCnt.getName(), brokerAbnCurCnt.getAndSet()); + metricValues.put(brokerAbnTotCnt.getName(), brokerAbnTotCnt.getAndSet()); + metricValues.put(brokerFbdCurCnt.getName(), brokerFbdCurCnt.getAndSet()); + metricValues.put(brokerFbdTotCnt.getName(), brokerFbdTotCnt.getAndSet()); + metricValues.put(svrBalDuration.getName(), svrBalDuration.getAndSet()); + metricValues.put(svrBalDurationMin.getName(), svrBalDurationMin.getAndSet()); + metricValues.put(svrBalDurationMax.getName(), svrBalDurationMax.getAndSet()); + metricValues.put(svrBalResetDurMin.getName(), svrBalResetDurMin.getAndSet()); + metricValues.put(svrBalResetDurMax.getName(), svrBalResetDurMax.getAndSet()); + metricValues.put(svrBalConEventConsumerCnt.getName(), + svrBalConEventConsumerCnt.getAndSet()); + metricValues.put(svrBalDisConEventConsumerCnt.getName(), + svrBalDisConEventConsumerCnt.getAndSet()); + alignBrokerFbdMetrics(); + alignBrokerAbnMetrics(); + long befTime = lastResetTime.getAndSet(System.currentTimeMillis()); + return new MetricValues(WebParameterUtils.date2yyyyMMddHHmmss( + new Date(befTime)), metricValues); + } + + public long getLastResetTime() { + return lastResetTime.get(); + } + + public AbsMetricItem getConsumeGroupCnt() { + return consumeGroupCnt; + } + + public AbsMetricItem getConsumeGroupTmoTotCnt() { + return consumeGroupTmoTotCnt; + } + + public AbsMetricItem getCltBalConsumeGroupCnt() { + return cltBalConsumeGroupCnt; + } + + public AbsMetricItem getCltBalGroupTmototCnt() { + return cltBalGroupTmototCnt; + } + + public AbsMetricItem getConsumerOnlineCnt() { + return consumerOnlineCnt; + } + + public AbsMetricItem getConsumerTmoTotCnt() { + return consumerTmoTotCnt; + } + + public AbsMetricItem getProducerOnlineCnt() { + return producerOnlineCnt; + } + + public AbsMetricItem getProducerTmoTotCnt() { + return producerTmoTotCnt; + } + + public AbsMetricItem getBrokerConfigCnt() { + return brokerConfigCnt; + } + + public AbsMetricItem getBrokerOnlineCnt() { + return brokerOnlineCnt; + } + + public AbsMetricItem getBrokerTmoTotCnt() { + return brokerTmoTotCnt; + } + + public AbsMetricItem getBrokerAbnCurCnt() { + return brokerAbnCurCnt; + } + + public AbsMetricItem getBrokerAbnTotCnt() { + return brokerAbnTotCnt; + } + + public AbsMetricItem getBrokerFbdCurCnt() { + return brokerFbdCurCnt; + } + + public AbsMetricItem getBrokerFbdTotCnt() { + return brokerFbdTotCnt; + } + + public AbsMetricItem getSvrBalDuration() { + return svrBalDuration; + } + + public AbsMetricItem getSvrBalDurationMin() { + return svrBalDurationMin; + } + + public AbsMetricItem getSvrBalDurationMax() { + return svrBalDurationMax; + } + + public AbsMetricItem getSvrBalResetDurMin() { + return svrBalResetDurMin; + } + + public AbsMetricItem getSvrBalResetDurMax() { + return svrBalResetDurMax; + } + + public AbsMetricItem getSvrBalConEventConsumerCnt() { + return svrBalConEventConsumerCnt; + } + + public AbsMetricItem getSvrBalDisConEventConsumerCnt() { + return svrBalDisConEventConsumerCnt; + } + + private void alignBrokerFbdMetrics() { + // Notice: the minimum value of the brokerFbdTotCnt metric value is + // the current value of brokerFbdCurCnt, so the metric value + // needs to be aligned after reset + long curCnt = brokerFbdCurCnt.getValue(); + long totalCnt = brokerFbdTotCnt.getValue(); + while (curCnt > totalCnt) { + if (brokerFbdTotCnt.compareAndSet(totalCnt, curCnt)) { + break; + } + curCnt = brokerFbdCurCnt.getValue(); + totalCnt = brokerFbdTotCnt.getValue(); + } + } + + private void alignBrokerAbnMetrics() { + // Notice: the minimum value of the brokerAbnTotCnt metric value is + // the current value of brokerAbnCurCnt, so the metric value + // needs to be aligned after reset + long curCnt = brokerAbnCurCnt.getValue(); + long totalCnt = brokerAbnTotCnt.getValue(); + while (curCnt > totalCnt) { + if (brokerAbnTotCnt.compareAndSet(totalCnt, curCnt)) { + break; + } + curCnt = brokerAbnCurCnt.getValue(); + totalCnt = brokerAbnTotCnt.getValue(); + } + } +} + diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricsHolder.java new file mode 100644 index 00000000000..416420daa8b --- /dev/null +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricsHolder.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.tubemq.server.master.metrics; + +import java.lang.management.ManagementFactory; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MasterMetricsHolder { + private static final Logger logger = + LoggerFactory.getLogger(MasterMetricsHolder.class); + // Registration status indicator + private static final AtomicBoolean registered = + new AtomicBoolean(false); + // master metrics information + private static final MasterMetrics statsInfo = new MasterMetrics(); + + public static void registerMXBean() { + if (!registered.compareAndSet(false, true)) { + return; + } + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName mxBeanName = + new ObjectName("org.apache.inlong.tubemq.server.master:type=MasterMetrics"); + mbs.registerMBean(statsInfo, mxBeanName); + } catch (Exception ex) { + logger.error("Register MasterMXBean error: ", ex); + } + } + + public static void incConsumerCnt(boolean isGroupEmpty, boolean isCltBal) { + statsInfo.consumerOnlineCnt.incrementAndGet(); + if (isGroupEmpty) { + statsInfo.consumeGroupCnt.incrementAndGet(); + if (isCltBal) { + statsInfo.cltBalConsumeGroupCnt.incrementAndGet(); + } + } + } + + public static void decConsumerCnt(boolean isTimeout, + boolean isGroupEmpty, + boolean isCltBal) { + statsInfo.consumerOnlineCnt.decrementAndGet(); + if (isTimeout) { + statsInfo.consumerTmoTotCnt.incrementAndGet(); + } + if (isGroupEmpty) { + decConsumeGroupCnt(isTimeout, isCltBal); + } + } + + public static void decConsumeGroupCnt(boolean isTimeout, boolean isCltBal) { + statsInfo.consumeGroupCnt.decrementAndGet(); + if (isTimeout) { + statsInfo.consumeGroupTmoTotCnt.incrementAndGet(); + } + if (isCltBal) { + statsInfo.cltBalConsumeGroupCnt.decrementAndGet(); + if (isTimeout) { + statsInfo.cltBalGroupTmototCnt.incrementAndGet(); + } + } + } + + public static void incProducerCnt() { + statsInfo.producerOnlineCnt.incrementAndGet(); + } + + public static void decProducerCnt(boolean isTimeout) { + statsInfo.producerOnlineCnt.decrementAndGet(); + if (isTimeout) { + statsInfo.producerTmoTotCnt.incrementAndGet(); + } + } + + public static void incSvrBalDisConConsumerCnt() { + statsInfo.svrBalDisConEventConsumerCnt.incrementAndGet(); + } + + public static void decSvrBalDisConConsumerCnt() { + statsInfo.svrBalDisConEventConsumerCnt.decrementAndGet(); + } + + public static void incSvrBalConEventConsumerCnt() { + statsInfo.svrBalConEventConsumerCnt.incrementAndGet(); + } + + public static void decSvrBalConEventConsumerCnt() { + statsInfo.svrBalConEventConsumerCnt.decrementAndGet(); + } + + public static void incBrokerConfigCnt() { + statsInfo.brokerConfigCnt.incrementAndGet(); + } + + public static void decBrokerConfigCnt() { + statsInfo.brokerConfigCnt.decrementAndGet(); + } + + public static void incBrokerOnlineCnt() { + statsInfo.brokerOnlineCnt.incrementAndGet(); + } + + public static void decBrokerOnlineCnt(boolean isTimeout) { + statsInfo.brokerOnlineCnt.decrementAndGet(); + if (isTimeout) { + statsInfo.brokerTmoTotCnt.incrementAndGet(); + } + } + + public static void incBrokerAbnormalCnt() { + statsInfo.brokerAbnCurCnt.incrementAndGet(); + statsInfo.brokerAbnTotCnt.incrementAndGet(); + } + + public static void decBrokerAbnormalCnt() { + statsInfo.brokerAbnCurCnt.decrementAndGet(); + } + + public static void incBrokerForbiddenCnt() { + statsInfo.brokerFbdCurCnt.incrementAndGet(); + statsInfo.brokerFbdTotCnt.incrementAndGet(); + } + + public static void decBrokerForbiddenCnt() { + statsInfo.brokerFbdCurCnt.decrementAndGet(); + } + + public static void updSvrBalanceDurations(long dltTime) { + statsInfo.svrBalDuration.update(dltTime); + statsInfo.svrBalDurationMin.update(dltTime); + statsInfo.svrBalDurationMax.update(dltTime); + } + + public static void updSvrBalResetDurations(long dltTime) { + statsInfo.svrBalResetDurMin.update(dltTime); + statsInfo.svrBalResetDurMax.update(dltTime); + } + + public static MasterMetrics getStatsInfo() { + return statsInfo; + } +} + diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java index 2987225773c..e22ad844d97 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java @@ -33,7 +33,7 @@ import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager; import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity; import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity; -import org.apache.inlong.tubemq.server.master.metrics.MasterMetric; +import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,15 +51,11 @@ public class BrokerAbnHolder { private final MetaDataManager metaDataManager; private final AtomicInteger brokerForbiddenCount = new AtomicInteger(0); - // master metrics - private final MasterMetric masterMetrics; public BrokerAbnHolder(final int maxAutoForbiddenCnt, - final MetaDataManager metaDataManager, - final MasterMetric masterMetrics) { + final MetaDataManager metaDataManager) { this.maxAutoForbiddenCnt = maxAutoForbiddenCnt; this.metaDataManager = metaDataManager; - this.masterMetrics = masterMetrics; } /** @@ -79,7 +75,7 @@ public void updateBrokerReportStatus(int brokerId, if (brokerForbiddenMap.get(brokerId) == null) { brokerAbnInfo = brokerAbnormalMap.remove(brokerId); if (brokerAbnInfo != null) { - masterMetrics.brokerAbnCurCnt.decrementAndGet(); + MasterMetricsHolder.decBrokerAbnormalCnt(); logger.warn(sBuffer.append("[Broker AutoForbidden] broker ") .append(brokerId).append(" return to normal!").toString()); sBuffer.delete(0, sBuffer.length()); @@ -104,8 +100,7 @@ public void updateBrokerReportStatus(int brokerId, if (brokerAbnInfo == null) { if (brokerAbnormalMap.putIfAbsent(brokerId, new BrokerAbnInfo(brokerId, reportReadStatus, reportWriteStatus)) == null) { - masterMetrics.brokerAbnTotCnt.incrementAndGet(); - masterMetrics.brokerAbnCurCnt.incrementAndGet(); + MasterMetricsHolder.incBrokerAbnormalCnt(); logger.warn(sBuffer.append("[Broker AutoForbidden] broker report abnormal, ") .append(brokerId).append("'s reportReadStatus=") .append(reportReadStatus).append(", reportWriteStatus=") @@ -124,8 +119,7 @@ public void updateBrokerReportStatus(int brokerId, if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) { if (brokerForbiddenMap.putIfAbsent(brokerId, tmpBrokerFbdInfo) == null) { brokerForbiddenCount.incrementAndGet(); - masterMetrics.brokerFbdTotCnt.incrementAndGet(); - masterMetrics.brokerFbdCurCnt.incrementAndGet(); + MasterMetricsHolder.incBrokerForbiddenCnt(); logger.warn(sBuffer .append("[Broker AutoForbidden] master add missing forbidden broker, ") .append(brokerId).append("'s manage status to ") @@ -143,7 +137,7 @@ public void updateBrokerReportStatus(int brokerId, brokerForbiddenCount.decrementAndGet(); return; } - masterMetrics.brokerFbdCurCnt.incrementAndGet(); + MasterMetricsHolder.incBrokerForbiddenCnt(); logger.warn(sBuffer .append("[Broker AutoForbidden] master auto forbidden broker, ") .append(brokerId).append("'s manage status to ") @@ -185,12 +179,12 @@ public Tuple2 getBrokerAutoFbdStatus(int brokerId) { public void removeBroker(Integer brokerId) { BrokerAbnInfo abnInfo = brokerAbnormalMap.remove(brokerId); if (abnInfo != null) { - masterMetrics.brokerAbnCurCnt.decrementAndGet(); + MasterMetricsHolder.decBrokerAbnormalCnt(); } BrokerFbdInfo brokerFbdInfo = brokerForbiddenMap.remove(brokerId); if (brokerFbdInfo != null) { this.brokerForbiddenCount.decrementAndGet(); - masterMetrics.brokerFbdCurCnt.decrementAndGet(); + MasterMetricsHolder.decBrokerForbiddenCnt(); } } @@ -275,10 +269,10 @@ public void relAutoForbiddenBrokerInfo(Set brokerIdSet, String reason) brokerFbdInfos.add(fbdInfo); BrokerAbnInfo abnInfo = this.brokerAbnormalMap.remove(brokerId); if (abnInfo != null) { - masterMetrics.brokerAbnCurCnt.decrementAndGet(); + MasterMetricsHolder.decBrokerAbnormalCnt(); } this.brokerForbiddenCount.decrementAndGet(); - masterMetrics.brokerFbdCurCnt.decrementAndGet(); + MasterMetricsHolder.decBrokerForbiddenCnt(); } } if (!brokerFbdInfos.isEmpty()) { diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java index e06fce42cfd..e636531c77e 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java @@ -45,7 +45,7 @@ import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager; import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver; import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity; -import org.apache.inlong.tubemq.server.master.metrics.MasterMetric; +import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,8 +76,6 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver { private final BrokerAbnHolder brokerAbnHolder; // broker topic configure for consumer and producer private final BrokerPSInfoHolder brokerPubSubInfo = new BrokerPSInfoHolder(); - // master metrics - private final MasterMetric masterMetrics; /** * Constructor by TMaster @@ -85,13 +83,11 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver { * @param tMaster the initial TMaster object */ public DefBrokerRunManager(TMaster tMaster) { - this.masterMetrics = tMaster.getMasterMetrics(); this.metaDataManager = tMaster.getDefMetaDataManager(); this.heartbeatManager = tMaster.getHeartbeatManager(); MasterConfig masterConfig = tMaster.getMasterConfig(); this.brokerAbnHolder = - new BrokerAbnHolder(masterConfig.getMaxAutoForbiddenCnt(), - this.metaDataManager, this.masterMetrics); + new BrokerAbnHolder(masterConfig.getMaxAutoForbiddenCnt(), this.metaDataManager); heartbeatManager.regBrokerCheckBusiness(masterConfig.getBrokerHeartbeatTimeoutMs(), new TimeoutListener() { @Override @@ -150,7 +146,7 @@ public void updBrokerStaticInfo(BrokerConfEntity entity) { || !brokerReg.equals(entity.getSimpleBrokerInfo()) || !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) { if (brokerReg == null) { - masterMetrics.brokerConfigCnt.incrementAndGet(); + MasterMetricsHolder.incBrokerConfigCnt(); } else { if (!brokerReg.equals(entity.getSimpleBrokerInfo())) { this.brokersMap.put(entity.getBrokerId(), entity.getSimpleBrokerInfo()); @@ -248,7 +244,7 @@ public boolean brokerRegister2M(String clientId, BrokerInfo brokerInfo, brokerInfo.getBrokerId(), tmpRunStatusInfo); if (runStatusInfo == null) { brokerTotalCount.incrementAndGet(); - masterMetrics.brokerOnlineCnt.incrementAndGet(); + MasterMetricsHolder.incBrokerOnlineCnt(); runStatusInfo = tmpRunStatusInfo; } } else { @@ -479,10 +475,7 @@ public boolean releaseBrokerRunInfo(int brokerId, String blockId, boolean isTime if (runStatusInfo == null) { return false; } - masterMetrics.brokerOnlineCnt.decrementAndGet(); - if (isTimeout) { - masterMetrics.brokerTmoTotCnt.incrementAndGet(); - } + MasterMetricsHolder.decBrokerOnlineCnt(isTimeout); brokerTotalCount.decrementAndGet(); brokerAbnHolder.removeBroker(brokerId); brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java index f4fa33575a1..19664806f2d 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.collections.CollectionUtils; import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent; -import org.apache.inlong.tubemq.server.master.metrics.MasterMetric; +import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,12 +42,9 @@ public class ConsumerEventManager { new ConcurrentHashMap<>(); private final ConsumerInfoHolder consumerHolder; - private final MasterMetric masterMetrics; - public ConsumerEventManager(ConsumerInfoHolder consumerHolder, - MasterMetric masterMetrics) { + public ConsumerEventManager(ConsumerInfoHolder consumerHolder) { this.consumerHolder = consumerHolder; - this.masterMetrics = masterMetrics; } public boolean addDisconnectEvent(String consumerId, @@ -59,7 +56,7 @@ public boolean addDisconnectEvent(String consumerId, LinkedList tmptList = disconnectEventMap.putIfAbsent(consumerId, eventList); if (tmptList == null) { - masterMetrics.svrBalDisConEventConsumerCnt.incrementAndGet(); + MasterMetricsHolder.incSvrBalDisConConsumerCnt(); } else { eventList = tmptList; } @@ -78,7 +75,7 @@ public boolean addConnectEvent(String consumerId, LinkedList tmptList = connectEventMap.putIfAbsent(consumerId, eventList); if (tmptList == null) { - masterMetrics.svrBalConEventConsumerCnt.incrementAndGet(); + MasterMetricsHolder.incSvrBalConEventConsumerCnt(); } else { eventList = tmptList; } @@ -138,9 +135,9 @@ public ConsumerEvent removeFirst(String consumerId) { if (eventList.isEmpty()) { currentEventMap.remove(consumerId); if (selDisConnMap) { - masterMetrics.svrBalDisConEventConsumerCnt.decrementAndGet(); + MasterMetricsHolder.decSvrBalDisConConsumerCnt(); } else { - masterMetrics.svrBalConEventConsumerCnt.decrementAndGet(); + MasterMetricsHolder.decSvrBalConEventConsumerCnt(); } } } @@ -199,11 +196,11 @@ public void removeAll(String consumerId) { LinkedList eventInfos = disconnectEventMap.remove(consumerId); if (eventInfos != null) { - masterMetrics.svrBalDisConEventConsumerCnt.decrementAndGet(); + MasterMetricsHolder.decSvrBalDisConConsumerCnt(); } eventInfos = connectEventMap.remove(consumerId); if (eventInfos != null) { - masterMetrics.svrBalConEventConsumerCnt.decrementAndGet(); + MasterMetricsHolder.decSvrBalConEventConsumerCnt(); } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java index 122f6683a1f..0fce762136d 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java @@ -30,7 +30,7 @@ import org.apache.inlong.tubemq.server.common.utils.RowLock; import org.apache.inlong.tubemq.server.master.MasterConfig; import org.apache.inlong.tubemq.server.master.TMaster; -import org.apache.inlong.tubemq.server.master.metrics.MasterMetric; +import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +39,6 @@ public class ConsumerInfoHolder { private static final Logger logger = LoggerFactory.getLogger(ConsumerInfoHolder.class); private final MasterConfig masterConfig; // master configure - private final MasterMetric masterMetrics; private final RowLock groupRowLock; //lock private final ConcurrentHashMap groupInfoMap = new ConcurrentHashMap<>(); @@ -51,7 +50,6 @@ public class ConsumerInfoHolder { new ConcurrentHashSet<>(); public ConsumerInfoHolder(TMaster tMasterr) { - this.masterMetrics = tMasterr.getMasterMetrics(); this.masterConfig = tMasterr.getMasterConfig(); this.groupRowLock = new RowLock("Group-RowLock", this.masterConfig.getRowLockWaitDurMs()); @@ -355,19 +353,19 @@ public boolean addConsumer(ConsumerInfo consumer, boolean isNotAllocated, consumeGroupInfo = groupInfoMap.putIfAbsent(group, tmpGroupInfo); if (consumeGroupInfo == null) { consumeGroupInfo = tmpGroupInfo; - masterMetrics.consumeGroupCnt.incrementAndGet(); if (tmpGroupInfo.isClientBalance()) { clientBalanceGroupSet.add(group); - masterMetrics.cltBalConsumeGroupCnt.incrementAndGet(); } else { serverBalanceGroupSet.add(group); } + MasterMetricsHolder.incConsumerCnt(true, + consumeGroupInfo.isClientBalance()); } } if (consumeGroupInfo.addConsumer(consumer, sBuffer, result)) { - Boolean isNewAdd = (Boolean) result.checkData; - if (isNewAdd) { - masterMetrics.consumerCnt.incrementAndGet(); + if ((Boolean) result.checkData) { + MasterMetricsHolder.incConsumerCnt(false, + consumeGroupInfo.isClientBalance()); } if (!isNotAllocated) { consumeGroupInfo.settAllocated(); @@ -391,12 +389,15 @@ public boolean addConsumer(ConsumerInfo consumer, boolean isNotAllocated, * * @param group group name of consumer * @param consumerId consumer id + * @param isTimeout if timeout * @return ConsumerInfo */ - public ConsumerInfo removeConsumer(String group, String consumerId) { + public ConsumerInfo removeConsumer(String group, String consumerId, boolean isTimeout) { if (group == null || consumerId == null) { return null; } + boolean isCltBal = false; + boolean rmvGroup = false; ConsumerInfo consumer = null; Integer lid = null; try { @@ -406,11 +407,23 @@ public ConsumerInfo removeConsumer(String group, String consumerId) { if (consumeGroupInfo != null) { consumer = consumeGroupInfo.removeConsumer(consumerId); if (consumeGroupInfo.isGroupEmpty()) { - groupInfoMap.remove(group); + rmvGroup = (groupInfoMap.remove(group) != null); if (consumeGroupInfo.isClientBalance()) { - clientBalanceGroupSet.add(group); + isCltBal = true; + clientBalanceGroupSet.remove(group); } else { - serverBalanceGroupSet.add(group); + serverBalanceGroupSet.remove(group); + } + if (rmvGroup) { + if (consumer == null) { + MasterMetricsHolder.decConsumeGroupCnt(isTimeout, isCltBal); + } else { + MasterMetricsHolder.decConsumerCnt(isTimeout, true, isCltBal); + } + } + } else { + if (consumer != null) { + MasterMetricsHolder.decConsumerCnt(isTimeout, false, false); } } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java index 6acd0d0598d..22a578cef22 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java @@ -20,6 +20,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.inlong.tubemq.corebase.cluster.ProducerInfo; +import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder; public class ProducerInfoHolder { @@ -30,12 +31,13 @@ public ProducerInfo getProducerInfo(String producerId) { return producerInfoMap.get(producerId); } - public boolean setProducerInfo(String producerId, + public void setProducerInfo(String producerId, Set topicSet, String host, boolean overTLS) { - ProducerInfo oldObj = producerInfoMap.put(producerId, - new ProducerInfo(producerId, topicSet, host, overTLS)); - return (oldObj == null); + if (producerInfoMap.put(producerId, + new ProducerInfo(producerId, topicSet, host, overTLS)) == null) { + MasterMetricsHolder.incProducerCnt(); + } } public void updateProducerInfo(String producerId, @@ -51,8 +53,12 @@ public void updateProducerInfo(String producerId, } } - public ProducerInfo removeProducer(String producerId) { - return producerInfoMap.remove(producerId); + public ProducerInfo removeProducer(String producerId, boolean isTimeout) { + ProducerInfo info = producerInfoMap.remove(producerId); + if (info != null) { + MasterMetricsHolder.decProducerCnt(isTimeout); + } + return info; } public void clear() { diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/BrokerMetricsTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/BrokerMetricsTest.java index be7640c0b17..5f84d597c18 100644 --- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/BrokerMetricsTest.java +++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/BrokerMetricsTest.java @@ -19,6 +19,7 @@ import org.apache.inlong.tubemq.corebase.metric.MetricValues; import org.apache.inlong.tubemq.server.broker.metrics.BrokerMetrics; +import org.apache.inlong.tubemq.server.broker.metrics.BrokerMetricsHolder; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -29,45 +30,199 @@ public class BrokerMetricsTest { LoggerFactory.getLogger(BrokerMetricsTest.class); @Test - public void testAgentMetrics() { + public void testBrokerMetrics() { try { BrokerMetrics metrics = new BrokerMetrics(); - metrics.zkExceptionCnt.incrementAndGet(); - metrics.consumerTmoTotCnt.incrementAndGet(); - metrics.syncDataDurMax.update(10000); - metrics.syncDataDurMin.update(2000); - metrics.syncDataDurMax.update(20000); - metrics.syncDataDurMin.update(1000); - metrics.syncDataDurMin.update(3000); - metrics.syncDataDurMax.update(30000); + // test case 1, set data + metrics.getIoExceptionCnt().incrementAndGet(); + metrics.getZkExceptionCnt().incrementAndGet(); + metrics.getConsumerOnlineCnt().incrementAndGet(); + metrics.getConsumerOnlineCnt().incrementAndGet(); + metrics.getConsumerTmoTotCnt().incrementAndGet(); + metrics.getHbExceptionCnt().incrementAndGet(); + metrics.getMasterNoNodeCnt().incrementAndGet(); + + metrics.getSyncDataDurMax().update(20000); + metrics.getSyncDataDurMax().update(10000); + metrics.getSyncDataDurMax().update(30000); + metrics.getSyncDataDurMin().update(2000); + metrics.getSyncDataDurMin().update(1000); + metrics.getSyncDataDurMin().update(3000); + + metrics.getSyncZkDurMax().update(20000); + metrics.getSyncZkDurMax().update(1000); + metrics.getSyncZkDurMax().update(30000); + + metrics.getSyncZkDurMin().update(2000); + metrics.getSyncZkDurMin().update(100); + metrics.getSyncZkDurMin().update(3000); + // get metric and compare data MetricValues result1 = metrics.getMetrics(); - Assert.assertEquals(Long.valueOf(1000), - result1.getMetricValues().get(metrics.syncDataDurMin.getName())); - Assert.assertEquals(Long.valueOf(30000), - result1.getMetricValues().get(metrics.syncDataDurMax.getName())); Assert.assertEquals(Long.valueOf(1), - result1.getMetricValues().get(metrics.zkExceptionCnt.getName())); + result1.getMetricValues().get(metrics.getIoExceptionCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getZkExceptionCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getConsumerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName())); Assert.assertEquals(Long.valueOf(1), - result1.getMetricValues().get(metrics.consumerTmoTotCnt.getName())); - // get and reset value + result1.getMetricValues().get(metrics.getHbExceptionCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getMasterNoNodeCnt().getName())); + Assert.assertEquals(Long.valueOf(30000), + result1.getMetricValues().get(metrics.getSyncDataDurMax().getName())); + Assert.assertEquals(Long.valueOf(1000), + result1.getMetricValues().get(metrics.getSyncDataDurMin().getName())); + Assert.assertEquals(Long.valueOf(30000), + result1.getMetricValues().get(metrics.getSyncZkDurMax().getName())); + Assert.assertEquals(Long.valueOf(100), + result1.getMetricValues().get(metrics.getSyncZkDurMin().getName())); + // get and reset value 2 final MetricValues result2 = metrics.getAndReSetMetrics(); - metrics.zkExceptionCnt.incrementAndGet(); - metrics.zkExceptionCnt.getAndSet(); - metrics.consumerTmoTotCnt.incrementAndGet(); - metrics.consumerTmoTotCnt.update(10); - metrics.syncDataDurMax.update(20000); - metrics.syncDataDurMin.update(2000); + // update metric data to 3 + metrics.getIoExceptionCnt().incrementAndGet(); + metrics.getZkExceptionCnt().incrementAndGet(); + metrics.getConsumerOnlineCnt().incrementAndGet(); + metrics.getConsumerOnlineCnt().decrementAndGet(); + metrics.getConsumerTmoTotCnt().incrementAndGet(); + metrics.getHbExceptionCnt().incrementAndGet(); + metrics.getMasterNoNodeCnt().incrementAndGet(); + + metrics.getSyncDataDurMax().update(10); + metrics.getSyncDataDurMax().update(10000); + metrics.getSyncDataDurMax().update(20000); + metrics.getSyncDataDurMin().update(10); + metrics.getSyncDataDurMin().update(1000); + metrics.getSyncDataDurMin().update(5000); + + metrics.getSyncZkDurMax().update(10); + metrics.getSyncZkDurMax().update(1000); + metrics.getSyncZkDurMax().update(2000); + + metrics.getSyncZkDurMin().update(3000); + metrics.getSyncZkDurMin().update(10); + metrics.getSyncZkDurMin().update(6000); + MetricValues result3 = metrics.getMetrics(); Assert.assertEquals(result1.getLastResetTime(), result2.getLastResetTime()); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getIoExceptionCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getZkExceptionCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result3.getMetricValues().get(metrics.getConsumerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getHbExceptionCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getMasterNoNodeCnt().getName())); + Assert.assertEquals(Long.valueOf(20000), + result3.getMetricValues().get(metrics.getSyncDataDurMax().getName())); + Assert.assertEquals(Long.valueOf(10), + result3.getMetricValues().get(metrics.getSyncDataDurMin().getName())); Assert.assertEquals(Long.valueOf(2000), - result3.getMetricValues().get(metrics.syncDataDurMin.getName())); + result3.getMetricValues().get(metrics.getSyncZkDurMax().getName())); + Assert.assertEquals(Long.valueOf(10), + result3.getMetricValues().get(metrics.getSyncZkDurMin().getName())); + } catch (Exception ex) { + logger.error("error happens" + ex); + } + } + + @Test + public void testBrokerMetricsHolder() { + try { + // case 1, set data + BrokerMetricsHolder.incConsumerCnt(); + BrokerMetricsHolder.decConsumerCnt(false); + BrokerMetricsHolder.incConsumerCnt(); + BrokerMetricsHolder.decConsumerCnt(true); + BrokerMetricsHolder.incConsumerCnt(); + + BrokerMetricsHolder.incZKExceptionCnt(); + BrokerMetricsHolder.incZKExceptionCnt(); + + BrokerMetricsHolder.incMasterNoNodeCnt(); + BrokerMetricsHolder.incHBExceptionCnt(); + BrokerMetricsHolder.incIOExceptionCnt(); + BrokerMetricsHolder.incZKExceptionCnt(); + + BrokerMetricsHolder.updSyncDataDurations(10000); + BrokerMetricsHolder.updSyncDataDurations(2000); + BrokerMetricsHolder.updSyncDataDurations(20000); + + BrokerMetricsHolder.updSyncZKDurations(1000); + BrokerMetricsHolder.updSyncZKDurations(30); + BrokerMetricsHolder.updSyncZKDurations(30000); + // get data and check + BrokerMetrics metrics = BrokerMetricsHolder.getStatsInfo(); + MetricValues result1 = metrics.getMetrics(); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getConsumerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName())); + Assert.assertEquals(Long.valueOf(3), + result1.getMetricValues().get(metrics.getZkExceptionCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getMasterNoNodeCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getHbExceptionCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getIoExceptionCnt().getName())); Assert.assertEquals(Long.valueOf(20000), - result3.getMetricValues().get(metrics.syncDataDurMax.getName())); + result1.getMetricValues().get(metrics.getSyncDataDurMax().getName())); + Assert.assertEquals(Long.valueOf(2000), + result1.getMetricValues().get(metrics.getSyncDataDurMin().getName())); + Assert.assertEquals(Long.valueOf(30000), + result1.getMetricValues().get(metrics.getSyncZkDurMax().getName())); + Assert.assertEquals(Long.valueOf(30), + result1.getMetricValues().get(metrics.getSyncZkDurMin().getName())); + + // get and reset value 2 + final MetricValues result2 = metrics.getAndReSetMetrics(); + BrokerMetricsHolder.incConsumerCnt(); + BrokerMetricsHolder.incConsumerCnt(); + BrokerMetricsHolder.incConsumerCnt(); + BrokerMetricsHolder.decConsumerCnt(false); + BrokerMetricsHolder.decConsumerCnt(true); + + BrokerMetricsHolder.incZKExceptionCnt(); + BrokerMetricsHolder.incZKExceptionCnt(); + + BrokerMetricsHolder.updSyncDataDurations(1); + BrokerMetricsHolder.updSyncDataDurations(5000); + BrokerMetricsHolder.updSyncDataDurations(30000); + + BrokerMetricsHolder.updSyncZKDurations(100); + BrokerMetricsHolder.updSyncZKDurations(10); + BrokerMetricsHolder.updSyncZKDurations(5000); + // get and check 3 + MetricValues result3 = metrics.getMetrics(); + Assert.assertEquals(result1.getLastResetTime(), + result2.getLastResetTime()); + Assert.assertEquals(Long.valueOf(2), + result3.getMetricValues().get(metrics.getConsumerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result3.getMetricValues().get(metrics.getZkExceptionCnt().getName())); + Assert.assertEquals(Long.valueOf(0), + result3.getMetricValues().get(metrics.getMasterNoNodeCnt().getName())); Assert.assertEquals(Long.valueOf(0), - result3.getMetricValues().get(metrics.zkExceptionCnt.getName())); + result3.getMetricValues().get(metrics.getHbExceptionCnt().getName())); + Assert.assertEquals(Long.valueOf(0), + result3.getMetricValues().get(metrics.getIoExceptionCnt().getName())); + Assert.assertEquals(Long.valueOf(30000), + result3.getMetricValues().get(metrics.getSyncDataDurMax().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getSyncDataDurMin().getName())); + Assert.assertEquals(Long.valueOf(5000), + result3.getMetricValues().get(metrics.getSyncZkDurMax().getName())); Assert.assertEquals(Long.valueOf(10), - result3.getMetricValues().get(metrics.consumerTmoTotCnt.getName())); + result3.getMetricValues().get(metrics.getSyncZkDurMin().getName())); } catch (Exception ex) { logger.error("error happens" + ex); } diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java index 193bebeb749..a3186835e4e 100644 --- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java +++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java @@ -17,26 +17,471 @@ package org.apache.inlong.tubemq.server.master; -import org.apache.inlong.commons.config.metrics.MetricValue; -import org.apache.inlong.tubemq.server.master.metrics.MasterMetric; +import org.apache.inlong.tubemq.corebase.metric.MetricValues; +import org.apache.inlong.tubemq.server.master.metrics.MasterMetrics; +import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - public class MasterMetricsTest { private static final Logger logger = LoggerFactory.getLogger(MasterMetricsTest.class); @Test - public void testAgentMetrics() { + public void testMasterMetrics() { + try { + MasterMetrics metrics = new MasterMetrics(); + // test case 1, set data + metrics.getConsumerOnlineCnt().incrementAndGet(); + metrics.getConsumerOnlineCnt().incrementAndGet(); + metrics.getConsumerTmoTotCnt().incrementAndGet(); + metrics.getConsumerTmoTotCnt().incrementAndGet(); + metrics.getConsumeGroupCnt().incrementAndGet(); + metrics.getConsumeGroupCnt().incrementAndGet(); + metrics.getConsumeGroupTmoTotCnt().incrementAndGet(); + metrics.getCltBalConsumeGroupCnt().incrementAndGet(); + metrics.getCltBalGroupTmototCnt().incrementAndGet(); + + metrics.getProducerOnlineCnt().incrementAndGet(); + metrics.getProducerOnlineCnt().incrementAndGet(); + metrics.getProducerTmoTotCnt().incrementAndGet(); + metrics.getProducerTmoTotCnt().incrementAndGet(); + + metrics.getBrokerConfigCnt().incrementAndGet(); + metrics.getBrokerConfigCnt().incrementAndGet(); + metrics.getBrokerOnlineCnt().incrementAndGet(); + metrics.getBrokerOnlineCnt().incrementAndGet(); + metrics.getBrokerTmoTotCnt().incrementAndGet(); + + metrics.getBrokerAbnCurCnt().incrementAndGet(); + metrics.getBrokerAbnCurCnt().incrementAndGet(); + metrics.getBrokerAbnTotCnt().incrementAndGet(); + metrics.getBrokerAbnTotCnt().incrementAndGet(); + metrics.getBrokerFbdCurCnt().incrementAndGet(); + metrics.getBrokerFbdCurCnt().incrementAndGet(); + metrics.getBrokerFbdTotCnt().incrementAndGet(); + metrics.getBrokerFbdTotCnt().incrementAndGet(); + metrics.getBrokerFbdTotCnt().incrementAndGet(); + + metrics.getSvrBalDuration().update(100); + metrics.getSvrBalDuration().update(500); + metrics.getSvrBalDuration().update(300); + + metrics.getSvrBalDurationMin().update(700); + metrics.getSvrBalDurationMin().update(200); + metrics.getSvrBalDurationMin().update(300); + + metrics.getSvrBalDurationMax().update(700); + metrics.getSvrBalDurationMax().update(1000); + metrics.getSvrBalDurationMax().update(300); + + metrics.getSvrBalResetDurMin().update(700); + metrics.getSvrBalResetDurMin().update(200); + metrics.getSvrBalResetDurMin().update(300); + + metrics.getSvrBalResetDurMax().update(700); + metrics.getSvrBalResetDurMax().update(1000); + metrics.getSvrBalResetDurMax().update(300); + + metrics.getSvrBalConEventConsumerCnt().incrementAndGet(); + metrics.getSvrBalConEventConsumerCnt().incrementAndGet(); + metrics.getSvrBalConEventConsumerCnt().incrementAndGet(); + + metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet(); + metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet(); + metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet(); + // get metric and compare data + MetricValues result1 = metrics.getMetrics(); + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getConsumerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getConsumeGroupCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getCltBalConsumeGroupCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getCltBalGroupTmototCnt().getName())); + + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getProducerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getProducerTmoTotCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getBrokerConfigCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getBrokerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getBrokerTmoTotCnt().getName())); + + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getBrokerAbnCurCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getBrokerAbnTotCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getBrokerFbdCurCnt().getName())); + Assert.assertEquals(Long.valueOf(3), + result1.getMetricValues().get(metrics.getBrokerFbdTotCnt().getName())); + + Assert.assertEquals(Long.valueOf(300), + result1.getMetricValues().get(metrics.getSvrBalDuration().getName())); + Assert.assertEquals(Long.valueOf(200), + result1.getMetricValues().get(metrics.getSvrBalDurationMin().getName())); + Assert.assertEquals(Long.valueOf(1000), + result1.getMetricValues().get(metrics.getSvrBalDurationMax().getName())); + Assert.assertEquals(Long.valueOf(200), + result1.getMetricValues().get(metrics.getSvrBalResetDurMin().getName())); + Assert.assertEquals(Long.valueOf(1000), + result1.getMetricValues().get(metrics.getSvrBalResetDurMax().getName())); + + Assert.assertEquals(Long.valueOf(3), + result1.getMetricValues().get( + metrics.getSvrBalConEventConsumerCnt().getName())); + + Assert.assertEquals(Long.valueOf(3), + result1.getMetricValues().get( + metrics.getSvrBalDisConEventConsumerCnt().getName())); + + // get and reset value 2 + final MetricValues result2 = metrics.getAndReSetMetrics(); + // update metric data to 3 + metrics.getConsumerOnlineCnt().incrementAndGet(); + metrics.getConsumerOnlineCnt().decrementAndGet(); + metrics.getConsumeGroupCnt().incrementAndGet(); + metrics.getConsumeGroupTmoTotCnt().incrementAndGet(); + metrics.getCltBalConsumeGroupCnt().incrementAndGet(); + metrics.getCltBalGroupTmototCnt().incrementAndGet(); + + metrics.getProducerOnlineCnt().incrementAndGet(); + metrics.getProducerOnlineCnt().incrementAndGet(); + + metrics.getBrokerConfigCnt().incrementAndGet(); + metrics.getBrokerConfigCnt().incrementAndGet(); + metrics.getBrokerOnlineCnt().decrementAndGet(); + metrics.getBrokerOnlineCnt().decrementAndGet(); + metrics.getBrokerTmoTotCnt().incrementAndGet(); + + metrics.getBrokerAbnCurCnt().incrementAndGet(); + metrics.getBrokerAbnCurCnt().incrementAndGet(); + metrics.getBrokerFbdCurCnt().incrementAndGet(); + metrics.getBrokerFbdCurCnt().incrementAndGet(); + metrics.getBrokerFbdTotCnt().incrementAndGet(); + + metrics.getSvrBalDuration().update(100); + metrics.getSvrBalDuration().update(700); + metrics.getSvrBalDuration().update(20); + + metrics.getSvrBalDurationMin().update(1000); + metrics.getSvrBalDurationMin().update(50); + metrics.getSvrBalDurationMin().update(3000); + + metrics.getSvrBalDurationMax().update(700); + metrics.getSvrBalDurationMax().update(800); + metrics.getSvrBalDurationMax().update(300); + + metrics.getSvrBalResetDurMin().update(700); + metrics.getSvrBalResetDurMin().update(10); + metrics.getSvrBalResetDurMin().update(300); + + metrics.getSvrBalResetDurMax().update(700); + metrics.getSvrBalResetDurMax().update(2000); + metrics.getSvrBalResetDurMax().update(300); + + metrics.getSvrBalConEventConsumerCnt().incrementAndGet(); + metrics.getSvrBalConEventConsumerCnt().incrementAndGet(); + metrics.getSvrBalConEventConsumerCnt().incrementAndGet(); + + metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet(); + metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet(); + + // get metric and compare data + MetricValues result3 = metrics.getMetrics(); + Assert.assertEquals(result1.getLastResetTime(), + result2.getLastResetTime()); + Assert.assertEquals(Long.valueOf(2), + result3.getMetricValues().get(metrics.getConsumerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(0), + result3.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName())); + Assert.assertEquals(Long.valueOf(3), + result3.getMetricValues().get(metrics.getConsumeGroupCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result3.getMetricValues().get(metrics.getCltBalConsumeGroupCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getCltBalGroupTmototCnt().getName())); + + Assert.assertEquals(Long.valueOf(4), + result3.getMetricValues().get(metrics.getProducerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(0), + result3.getMetricValues().get(metrics.getProducerTmoTotCnt().getName())); + Assert.assertEquals(Long.valueOf(4), + result3.getMetricValues().get(metrics.getBrokerConfigCnt().getName())); + Assert.assertEquals(Long.valueOf(0), + result3.getMetricValues().get(metrics.getBrokerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getBrokerTmoTotCnt().getName())); + + Assert.assertEquals(Long.valueOf(4), + result3.getMetricValues().get(metrics.getBrokerAbnCurCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result3.getMetricValues().get(metrics.getBrokerAbnTotCnt().getName())); + Assert.assertEquals(Long.valueOf(4), + result3.getMetricValues().get(metrics.getBrokerFbdCurCnt().getName())); + Assert.assertEquals(Long.valueOf(3), + result3.getMetricValues().get(metrics.getBrokerFbdTotCnt().getName())); + + Assert.assertEquals(Long.valueOf(20), + result3.getMetricValues().get(metrics.getSvrBalDuration().getName())); + Assert.assertEquals(Long.valueOf(50), + result3.getMetricValues().get(metrics.getSvrBalDurationMin().getName())); + Assert.assertEquals(Long.valueOf(800), + result3.getMetricValues().get(metrics.getSvrBalDurationMax().getName())); + Assert.assertEquals(Long.valueOf(10), + result3.getMetricValues().get(metrics.getSvrBalResetDurMin().getName())); + Assert.assertEquals(Long.valueOf(2000), + result3.getMetricValues().get(metrics.getSvrBalResetDurMax().getName())); + + Assert.assertEquals(Long.valueOf(6), + result3.getMetricValues().get( + metrics.getSvrBalConEventConsumerCnt().getName())); + Assert.assertEquals(Long.valueOf(5), + result3.getMetricValues().get( + metrics.getSvrBalDisConEventConsumerCnt().getName())); + + } catch (Exception ex) { + logger.error("error happens" + ex); + } + } + + @Test + public void testMasterMetricsHolder() { try { - MasterMetric taskMetrics = MasterMetric.create(); - taskMetrics.svrBalLatency.incrementAndGet(); - Map result = taskMetrics.snapshot(); - Assert.assertEquals(1, taskMetrics.svrBalLatency.get()); + // test case 1, set data + // add 12 consumer, 8 group, 4 client balance + MasterMetricsHolder.incConsumerCnt(false, false); + MasterMetricsHolder.incConsumerCnt(false, true); + MasterMetricsHolder.incConsumerCnt(false, false); + MasterMetricsHolder.incConsumerCnt(false, true); + MasterMetricsHolder.incConsumerCnt(true, false); + MasterMetricsHolder.incConsumerCnt(true, false); + MasterMetricsHolder.incConsumerCnt(true, true); + MasterMetricsHolder.incConsumerCnt(true, true); + MasterMetricsHolder.incConsumerCnt(true, false); + MasterMetricsHolder.incConsumerCnt(true, false); + MasterMetricsHolder.incConsumerCnt(true, true); + MasterMetricsHolder.incConsumerCnt(true, true); + // dec 8 consumer, add 4 timeout consumer, + // dec 4 group, add 2 timeout group, dec 2 client balance group + MasterMetricsHolder.decConsumerCnt(false, false, false); + MasterMetricsHolder.decConsumerCnt(false, false, true); + MasterMetricsHolder.decConsumerCnt(false, true, false); + MasterMetricsHolder.decConsumerCnt(false, true, true); + MasterMetricsHolder.decConsumerCnt(true, false, false); + MasterMetricsHolder.decConsumerCnt(true, false, true); + MasterMetricsHolder.decConsumerCnt(true, true, false); + MasterMetricsHolder.decConsumerCnt(true, true, true); + // dec 4 group, add 2 timeout group, dec 2 client balance group + MasterMetricsHolder.decConsumeGroupCnt(false, false); + MasterMetricsHolder.decConsumeGroupCnt(false, true); + MasterMetricsHolder.decConsumeGroupCnt(true, false); + MasterMetricsHolder.decConsumeGroupCnt(true, true); + // add 3 producer + // dec 3 producer, 2 timeout producer + MasterMetricsHolder.incProducerCnt(); + MasterMetricsHolder.incProducerCnt(); + MasterMetricsHolder.incProducerCnt(); + MasterMetricsHolder.decProducerCnt(false); + MasterMetricsHolder.decProducerCnt(true); + MasterMetricsHolder.decProducerCnt(true); + // add 3 disconcnt + // dec 2 disconcnt + MasterMetricsHolder.incSvrBalDisConConsumerCnt(); + MasterMetricsHolder.incSvrBalDisConConsumerCnt(); + MasterMetricsHolder.incSvrBalDisConConsumerCnt(); + MasterMetricsHolder.decSvrBalDisConConsumerCnt(); + MasterMetricsHolder.decSvrBalDisConConsumerCnt(); + // add 3 concnt + // dec 2 concnt + MasterMetricsHolder.incSvrBalConEventConsumerCnt(); + MasterMetricsHolder.incSvrBalConEventConsumerCnt(); + MasterMetricsHolder.incSvrBalConEventConsumerCnt(); + MasterMetricsHolder.decSvrBalConEventConsumerCnt(); + MasterMetricsHolder.decSvrBalConEventConsumerCnt(); + // add 3 broker configure count + // dec 2 broker configure count + MasterMetricsHolder.incBrokerConfigCnt(); + MasterMetricsHolder.incBrokerConfigCnt(); + MasterMetricsHolder.incBrokerConfigCnt(); + MasterMetricsHolder.decBrokerConfigCnt(); + MasterMetricsHolder.decBrokerConfigCnt(); + // add 3 broker online count + // dec 2 broker online count, 1 timeout count + MasterMetricsHolder.incBrokerOnlineCnt(); + MasterMetricsHolder.incBrokerOnlineCnt(); + MasterMetricsHolder.incBrokerOnlineCnt(); + MasterMetricsHolder.decBrokerOnlineCnt(false); + MasterMetricsHolder.decBrokerOnlineCnt(true); + // add 3 broker abnormal count, 3 total abnormal count + // dec 1 broker abnormal count + MasterMetricsHolder.incBrokerAbnormalCnt(); + MasterMetricsHolder.decBrokerAbnormalCnt(); + MasterMetricsHolder.incBrokerAbnormalCnt(); + MasterMetricsHolder.incBrokerAbnormalCnt(); + // add 4 broker forbidden count, 4 total forbidden count + // dec 1 broker forbidden count + MasterMetricsHolder.incBrokerForbiddenCnt(); + MasterMetricsHolder.decBrokerForbiddenCnt(); + MasterMetricsHolder.incBrokerForbiddenCnt(); + MasterMetricsHolder.incBrokerForbiddenCnt(); + MasterMetricsHolder.incBrokerForbiddenCnt(); + // max: 1000, min 100 + MasterMetricsHolder.updSvrBalanceDurations(300); + MasterMetricsHolder.updSvrBalanceDurations(500); + MasterMetricsHolder.updSvrBalanceDurations(100); + MasterMetricsHolder.updSvrBalanceDurations(1000); + // max: 5000, min 500 + MasterMetricsHolder.updSvrBalResetDurations(3000); + MasterMetricsHolder.updSvrBalResetDurations(500); + MasterMetricsHolder.updSvrBalResetDurations(1000); + MasterMetricsHolder.updSvrBalResetDurations(5000); + // get metric and compare data + MasterMetrics metrics = MasterMetricsHolder.getStatsInfo(); + MetricValues result1 = metrics.getMetrics(); + Assert.assertEquals(Long.valueOf(4), + result1.getMetricValues().get(metrics.getConsumerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(4), + result1.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName())); + Assert.assertEquals(Long.valueOf(0), + result1.getMetricValues().get(metrics.getConsumeGroupCnt().getName())); + Assert.assertEquals(Long.valueOf(0), + result1.getMetricValues().get(metrics.getCltBalConsumeGroupCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getCltBalGroupTmototCnt().getName())); + + Assert.assertEquals(Long.valueOf(0), + result1.getMetricValues().get(metrics.getProducerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getProducerTmoTotCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getBrokerConfigCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getBrokerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get(metrics.getBrokerTmoTotCnt().getName())); + + Assert.assertEquals(Long.valueOf(2), + result1.getMetricValues().get(metrics.getBrokerAbnCurCnt().getName())); + Assert.assertEquals(Long.valueOf(3), + result1.getMetricValues().get(metrics.getBrokerAbnTotCnt().getName())); + Assert.assertEquals(Long.valueOf(3), + result1.getMetricValues().get(metrics.getBrokerFbdCurCnt().getName())); + Assert.assertEquals(Long.valueOf(4), + result1.getMetricValues().get(metrics.getBrokerFbdTotCnt().getName())); + + Assert.assertEquals(Long.valueOf(1000), + result1.getMetricValues().get(metrics.getSvrBalDuration().getName())); + Assert.assertEquals(Long.valueOf(100), + result1.getMetricValues().get(metrics.getSvrBalDurationMin().getName())); + Assert.assertEquals(Long.valueOf(1000), + result1.getMetricValues().get(metrics.getSvrBalDurationMax().getName())); + Assert.assertEquals(Long.valueOf(500), + result1.getMetricValues().get(metrics.getSvrBalResetDurMin().getName())); + Assert.assertEquals(Long.valueOf(5000), + result1.getMetricValues().get(metrics.getSvrBalResetDurMax().getName())); + + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get( + metrics.getSvrBalConEventConsumerCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result1.getMetricValues().get( + metrics.getSvrBalDisConEventConsumerCnt().getName())); + + // get and reset value 2 + final MetricValues result2 = metrics.getAndReSetMetrics(); + // update metric data to 3 + // test case 3, set data + // add 3 consumer, 2 group, 1 client balance + MasterMetricsHolder.incConsumerCnt(false, false); + MasterMetricsHolder.incConsumerCnt(true, false); + MasterMetricsHolder.incConsumerCnt(true, true); + // dec 2 consumer, add 1 timeout consumer, + // dec 1 group, add 1 timeout group + MasterMetricsHolder.decConsumerCnt(true, true, true); + MasterMetricsHolder.decConsumerCnt(false, false, true); + // dec 1 group, add 1 timeout group + MasterMetricsHolder.decConsumeGroupCnt(true, false); + // add 2 producer + // dec 1 producer + MasterMetricsHolder.incProducerCnt(); + MasterMetricsHolder.incProducerCnt(); + MasterMetricsHolder.decProducerCnt(false); + // add 1 abnormal ,dec 1 abnormal + MasterMetricsHolder.incBrokerAbnormalCnt(); + MasterMetricsHolder.decBrokerAbnormalCnt(); + + // max: 1000, min 100 + MasterMetricsHolder.updSvrBalanceDurations(5000); + MasterMetricsHolder.updSvrBalanceDurations(500); + MasterMetricsHolder.updSvrBalanceDurations(100); + MasterMetricsHolder.updSvrBalanceDurations(8000); + // max: 5000, min 500 + MasterMetricsHolder.updSvrBalResetDurations(2000); + MasterMetricsHolder.updSvrBalResetDurations(100); + MasterMetricsHolder.updSvrBalResetDurations(1000); + MasterMetricsHolder.updSvrBalResetDurations(4000); + + // get metric and compare data + MetricValues result3 = metrics.getMetrics(); + Assert.assertEquals(result1.getLastResetTime(), + result2.getLastResetTime()); + Assert.assertEquals(Long.valueOf(5), + result3.getMetricValues().get(metrics.getConsumerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName())); + Assert.assertEquals(Long.valueOf(0), + result3.getMetricValues().get(metrics.getConsumeGroupCnt().getName())); + Assert.assertEquals(Long.valueOf(0), + result3.getMetricValues().get(metrics.getCltBalConsumeGroupCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getCltBalGroupTmototCnt().getName())); + + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getProducerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(0), + result3.getMetricValues().get(metrics.getProducerTmoTotCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getBrokerConfigCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get(metrics.getBrokerOnlineCnt().getName())); + Assert.assertEquals(Long.valueOf(0), + result3.getMetricValues().get(metrics.getBrokerTmoTotCnt().getName())); + + Assert.assertEquals(Long.valueOf(2), + result3.getMetricValues().get(metrics.getBrokerAbnCurCnt().getName())); + Assert.assertEquals(Long.valueOf(3), + result3.getMetricValues().get(metrics.getBrokerAbnTotCnt().getName())); + Assert.assertEquals(Long.valueOf(3), + result3.getMetricValues().get(metrics.getBrokerFbdCurCnt().getName())); + Assert.assertEquals(Long.valueOf(3), + result3.getMetricValues().get(metrics.getBrokerFbdTotCnt().getName())); + + Assert.assertEquals(Long.valueOf(8000), + result3.getMetricValues().get(metrics.getSvrBalDuration().getName())); + Assert.assertEquals(Long.valueOf(100), + result3.getMetricValues().get(metrics.getSvrBalDurationMin().getName())); + Assert.assertEquals(Long.valueOf(8000), + result3.getMetricValues().get(metrics.getSvrBalDurationMax().getName())); + Assert.assertEquals(Long.valueOf(100), + result3.getMetricValues().get(metrics.getSvrBalResetDurMin().getName())); + Assert.assertEquals(Long.valueOf(4000), + result3.getMetricValues().get(metrics.getSvrBalResetDurMax().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get( + metrics.getSvrBalConEventConsumerCnt().getName())); + Assert.assertEquals(Long.valueOf(1), + result3.getMetricValues().get( + metrics.getSvrBalDisConEventConsumerCnt().getName())); } catch (Exception ex) { logger.error("error happens" + ex); } diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java index 8d93ed00337..24d3777c4f9 100644 --- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java +++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java @@ -19,7 +19,6 @@ import static org.mockito.Mockito.mock; import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent; -import org.apache.inlong.tubemq.server.master.metrics.MasterMetric; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -28,14 +27,11 @@ public class ConsumerEventManagerTest { private ConsumerEventManager consumerEventManager; private ConsumerInfoHolder consumerInfoHolder; - private MasterMetric masterMetrics; @Before public void setUp() throws Exception { consumerInfoHolder = mock(ConsumerInfoHolder.class); - masterMetrics = MasterMetric.create(); - consumerEventManager = - new ConsumerEventManager(consumerInfoHolder, masterMetrics); + consumerEventManager = new ConsumerEventManager(consumerInfoHolder); } @After