Skip to content

Commit

Permalink
[fix][broker] Fix ResourceGroup report local usage (apache#22340)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
(cherry picked from commit 0b2b6d5)
  • Loading branch information
nodece committed Mar 26, 2024
1 parent 99eb49a commit 2481cd8
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import java.util.HashMap;
import java.util.Set;
Expand Down Expand Up @@ -216,24 +217,28 @@ public void rgFillResourceUsage(ResourceUsage resourceUsage) {
resourceUsage.setOwner(this.getID());

p = resourceUsage.setPublish();
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Publish, p);
if (!this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Publish, p)) {
resourceUsage.clearPublish();
}

p = resourceUsage.setDispatch();
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p);
if (!this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p)) {
resourceUsage.clearDispatch();
}

// Punt storage for now.
}

// Transport manager mandated op.
public void rgResourceUsageListener(String broker, ResourceUsage resourceUsage) {
NetworkUsage p;

p = resourceUsage.getPublish();
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Publish, p, broker);

p = resourceUsage.getDispatch();
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p, broker);
if (resourceUsage.hasPublish()) {
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Publish, resourceUsage.getPublish(), broker);
}

if (resourceUsage.hasDispatch()) {
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, resourceUsage.getDispatch(),
broker);
}
// Punt storage for now.
}

Expand Down Expand Up @@ -449,19 +454,17 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas

bytesUsed = monEntity.usedLocallySinceLastReport.bytes;
messagesUsed = monEntity.usedLocallySinceLastReport.messages;
monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;

monEntity.totalUsedLocally.bytes += bytesUsed;
monEntity.totalUsedLocally.messages += messagesUsed;

monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();

if (sendReport) {
p.setBytesPerPeriod(bytesUsed);
p.setMessagesPerPeriod(messagesUsed);
monEntity.lastReportedValues.bytes = bytesUsed;
monEntity.lastReportedValues.messages = messagesUsed;
monEntity.numSuppressedUsageReports = 0;
monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
monEntity.totalUsedLocally.bytes += bytesUsed;
monEntity.totalUsedLocally.messages += messagesUsed;
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
} else {
numSuppressions = monEntity.numSuppressedUsageReports++;
}
Expand Down Expand Up @@ -594,6 +597,11 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
};
}

@VisibleForTesting
PerMonitoringClassFields getMonitoredEntity(ResourceGroupMonitoringClass monClass) {
return this.monitoringClassFields[monClass.ordinal()];
}

public final String resourceGroupName;

public PerMonitoringClassFields[] monitoringClassFields =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ protected void calculateQuotaForAllResourceGroups() {
timeUnitScale);
this.resourceUsagePublishPeriodInSeconds = newPeriodInSeconds;
maxIntervalForSuppressingReportsMSecs =
this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds;
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;
}
}

Expand All @@ -706,7 +706,7 @@ private void initialize() {
periodInSecs,
this.timeUnitScale);
maxIntervalForSuppressingReportsMSecs =
this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds;
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerMonitoringClassFields;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class ResourceGroupReportLocalUsageTest extends MockedPulsarServiceBaseTest {

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}


@Test
public void testRgFillResourceUsage() throws Exception {
pulsar.getResourceGroupServiceManager().close();
AtomicBoolean needReport = new AtomicBoolean(false);
ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.HOURS, null,
new ResourceQuotaCalculator() {
@Override
public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes,
long currentMessagesUsed, long lastReportedMessages,
long lastReportTimeMSecsSinceEpoch) {
return needReport.get();
}

@Override
public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
return 0;
}
});
String rgName = "rg-1";
ResourceGroup rgConfig = new ResourceGroup();
rgConfig.setPublishRateInBytes(1000L);
rgConfig.setPublishRateInMsgs(2000);
service.resourceGroupCreate(rgName, rgConfig);

org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName);
BytesAndMessagesCount bytesAndMessagesCount = new BytesAndMessagesCount();
bytesAndMessagesCount.bytes = 20;
bytesAndMessagesCount.messages = 10;
resourceGroup.incrementLocalUsageStats(ResourceGroupMonitoringClass.Publish, bytesAndMessagesCount);
ResourceUsage resourceUsage = new ResourceUsage();
resourceGroup.rgFillResourceUsage(resourceUsage);
assertFalse(resourceUsage.hasDispatch());
assertFalse(resourceUsage.hasPublish());

PerMonitoringClassFields publishMonitoredEntity =
resourceGroup.getMonitoredEntity(ResourceGroupMonitoringClass.Publish);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, bytesAndMessagesCount.messages);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, bytesAndMessagesCount.bytes);
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0);
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0);
assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0);
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0);

needReport.set(true);
resourceGroup.rgFillResourceUsage(resourceUsage);
assertTrue(resourceUsage.hasDispatch());
assertTrue(resourceUsage.hasPublish());
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0);
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages);
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes);
assertEquals(publishMonitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages);
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes);
}
}

0 comments on commit 2481cd8

Please sign in to comment.