diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index ef40a18ab08ed..c61a8e5e72ba8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.resourcegroup; +import com.google.common.annotations.VisibleForTesting; import io.prometheus.client.Counter; import java.util.HashMap; import java.util.Set; @@ -216,24 +217,28 @@ public void rgFillResourceUsage(ResourceUsage resourceUsage) { resourceUsage.setOwner(this.getID()); p = resourceUsage.setPublish(); - this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Publish, p); + if (!this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Publish, p)) { + resourceUsage.clearPublish(); + } p = resourceUsage.setDispatch(); - this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p); + if (!this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p)) { + resourceUsage.clearDispatch(); + } // Punt storage for now. } // Transport manager mandated op. public void rgResourceUsageListener(String broker, ResourceUsage resourceUsage) { - NetworkUsage p; - - p = resourceUsage.getPublish(); - this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Publish, p, broker); - - p = resourceUsage.getDispatch(); - this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p, broker); + if (resourceUsage.hasPublish()) { + this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Publish, resourceUsage.getPublish(), broker); + } + if (resourceUsage.hasDispatch()) { + this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, resourceUsage.getDispatch(), + broker); + } // Punt storage for now. } @@ -449,12 +454,6 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas bytesUsed = monEntity.usedLocallySinceLastReport.bytes; messagesUsed = monEntity.usedLocallySinceLastReport.messages; - monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0; - - monEntity.totalUsedLocally.bytes += bytesUsed; - monEntity.totalUsedLocally.messages += messagesUsed; - - monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis(); if (sendReport) { p.setBytesPerPeriod(bytesUsed); @@ -462,6 +461,10 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas monEntity.lastReportedValues.bytes = bytesUsed; monEntity.lastReportedValues.messages = messagesUsed; monEntity.numSuppressedUsageReports = 0; + monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0; + monEntity.totalUsedLocally.bytes += bytesUsed; + monEntity.totalUsedLocally.messages += messagesUsed; + monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis(); } else { numSuppressions = monEntity.numSuppressedUsageReports++; } @@ -594,6 +597,11 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { }; } + @VisibleForTesting + PerMonitoringClassFields getMonitoredEntity(ResourceGroupMonitoringClass monClass) { + return this.monitoringClassFields[monClass.ordinal()]; + } + public final String resourceGroupName; public PerMonitoringClassFields[] monitoringClassFields = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index d3f8eb7613a40..1f4b3f32cb902 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -687,7 +687,7 @@ protected void calculateQuotaForAllResourceGroups() { timeUnitScale); this.resourceUsagePublishPeriodInSeconds = newPeriodInSeconds; maxIntervalForSuppressingReportsMSecs = - this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds; + TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds; } } @@ -706,7 +706,7 @@ private void initialize() { periodInSecs, this.timeUnitScale); maxIntervalForSuppressingReportsMSecs = - this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds; + TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java new file mode 100644 index 0000000000000..658b7c94165d9 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resourcegroup; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerMonitoringClassFields; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; +import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; +import org.apache.pulsar.common.policies.data.ResourceGroup; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class ResourceGroupReportLocalUsageTest extends MockedPulsarServiceBaseTest { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + + @Test + public void testRgFillResourceUsage() throws Exception { + pulsar.getResourceGroupServiceManager().close(); + AtomicBoolean needReport = new AtomicBoolean(false); + ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.HOURS, null, + new ResourceQuotaCalculator() { + @Override + public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes, + long currentMessagesUsed, long lastReportedMessages, + long lastReportTimeMSecsSinceEpoch) { + return needReport.get(); + } + + @Override + public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) { + return 0; + } + }); + String rgName = "rg-1"; + ResourceGroup rgConfig = new ResourceGroup(); + rgConfig.setPublishRateInBytes(1000L); + rgConfig.setPublishRateInMsgs(2000); + service.resourceGroupCreate(rgName, rgConfig); + + org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName); + BytesAndMessagesCount bytesAndMessagesCount = new BytesAndMessagesCount(); + bytesAndMessagesCount.bytes = 20; + bytesAndMessagesCount.messages = 10; + resourceGroup.incrementLocalUsageStats(ResourceGroupMonitoringClass.Publish, bytesAndMessagesCount); + ResourceUsage resourceUsage = new ResourceUsage(); + resourceGroup.rgFillResourceUsage(resourceUsage); + assertFalse(resourceUsage.hasDispatch()); + assertFalse(resourceUsage.hasPublish()); + + PerMonitoringClassFields publishMonitoredEntity = + resourceGroup.getMonitoredEntity(ResourceGroupMonitoringClass.Publish); + assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, bytesAndMessagesCount.messages); + assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, bytesAndMessagesCount.bytes); + assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0); + assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0); + assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0); + assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0); + + needReport.set(true); + resourceGroup.rgFillResourceUsage(resourceUsage); + assertTrue(resourceUsage.hasDispatch()); + assertTrue(resourceUsage.hasPublish()); + assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0); + assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0); + assertEquals(publishMonitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages); + assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes); + assertEquals(publishMonitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages); + assertEquals(publishMonitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes); + } +} \ No newline at end of file