Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix] Fixed implicit conversions from long -> int (apache#22055)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Feb 15, 2024
1 parent 7e73967 commit fc2e314
Show file tree
Hide file tree
Showing 13 changed files with 41 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public class BrokerLoadData {
private double msgThroughputOut; // bytes/sec
private double msgRateIn; // messages/sec
private double msgRateOut; // messages/sec
private int bundleCount;
private int topics;
private long bundleCount;
private long topics;

// Load data features computed from the above resources.
private double maxResourceUsage; // max of resource usages
Expand Down Expand Up @@ -115,8 +115,8 @@ public void update(final SystemResourceUsage usage,
double msgThroughputOut,
double msgRateIn,
double msgRateOut,
int bundleCount,
int topics,
long bundleCount,
long topics,
ServiceConfiguration conf) {
updateSystemResourceUsage(usage.cpu, usage.memory, usage.directMemory, usage.bandwidthIn, usage.bandwidthOut);
this.msgThroughputIn = msgThroughputIn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,
int loadBalancerBrokerMaxTopics = context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
brokers.keySet().removeIf(broker -> {
Optional<BrokerLoadData> brokerLoadDataOpt = context.brokerLoadDataStore().get(broker);
long topics = brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0);
long topics = brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0L);
// TODO: The broker load data might be delayed, so the max topic check might not accurate.
return topics >= loadBalancerBrokerMaxTopics;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public long calculateRank() {
percentageUsage = (entry.getValue().usage / entry.getValue().limit) * 100;
}
// give equal weight to each resource
double resourceWeight = weight * percentageUsage;
int resourceWeight = (int) (weight * percentageUsage);
// any resource usage over 75% doubles the whole weight per resource
if (percentageUsage > throttle) {
final int i = resourcesWithHighUsage++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public ResourceUnit findBrokerForPlacement(Multimap<Long, ResourceUnit> finalCan
}
int weightedSelector = rand.nextInt(totalAvailability);
log.debug("Generated Weighted Selector Number - [{}] ", weightedSelector);
int weightRangeSoFar = 0;
long weightRangeSoFar = 0;
for (Map.Entry<Long, ResourceUnit> candidateOwner : finalCandidates.entries()) {
weightRangeSoFar += candidateOwner.getKey();
if (weightedSelector < weightRangeSoFar) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ protected static double getRgRemoteUsageMessageCount (String rgName, String monC
}

// Visibility for unit testing
protected static double getRgUsageReportedCount (String rgName, String monClassName) {
return rgLocalUsageReportCount.labels(rgName, monClassName).get();
protected static long getRgUsageReportedCount (String rgName, String monClassName) {
return (long) rgLocalUsageReportCount.labels(rgName, monClassName).get();
}

// Visibility for unit testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,48 +483,48 @@ protected BytesAndMessagesCount getPublishRateLimiters (String rgName) throws Pu
}

// Visibility for testing.
protected static double getRgQuotaByteCount (String rgName, String monClassName) {
return rgCalculatedQuotaBytes.labels(rgName, monClassName).get();
protected static long getRgQuotaByteCount (String rgName, String monClassName) {
return (long) rgCalculatedQuotaBytes.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgQuotaMessageCount (String rgName, String monClassName) {
return rgCalculatedQuotaMessages.labels(rgName, monClassName).get();
protected static long getRgQuotaMessageCount (String rgName, String monClassName) {
return (long) rgCalculatedQuotaMessages.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgLocalUsageByteCount (String rgName, String monClassName) {
return rgLocalUsageBytes.labels(rgName, monClassName).get();
protected static long getRgLocalUsageByteCount (String rgName, String monClassName) {
return (long) rgLocalUsageBytes.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgLocalUsageMessageCount (String rgName, String monClassName) {
return rgLocalUsageMessages.labels(rgName, monClassName).get();
protected static long getRgLocalUsageMessageCount (String rgName, String monClassName) {
return (long) rgLocalUsageMessages.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgUpdatesCount (String rgName) {
return rgUpdates.labels(rgName).get();
protected static long getRgUpdatesCount (String rgName) {
return (long) rgUpdates.labels(rgName).get();
}

// Visibility for testing.
protected static double getRgTenantRegistersCount (String rgName) {
return rgTenantRegisters.labels(rgName).get();
protected static long getRgTenantRegistersCount (String rgName) {
return (long) rgTenantRegisters.labels(rgName).get();
}

// Visibility for testing.
protected static double getRgTenantUnRegistersCount (String rgName) {
return rgTenantUnRegisters.labels(rgName).get();
protected static long getRgTenantUnRegistersCount (String rgName) {
return (long) rgTenantUnRegisters.labels(rgName).get();
}

// Visibility for testing.
protected static double getRgNamespaceRegistersCount (String rgName) {
return rgNamespaceRegisters.labels(rgName).get();
protected static long getRgNamespaceRegistersCount (String rgName) {
return (long) rgNamespaceRegisters.labels(rgName).get();
}

// Visibility for testing.
protected static double getRgNamespaceUnRegistersCount (String rgName) {
return rgNamespaceUnRegisters.labels(rgName).get();
protected static long getRgNamespaceUnRegistersCount (String rgName) {
return (long) rgNamespaceUnRegisters.labels(rgName).get();
}

// Visibility for testing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public class BrokerService implements Closeable {
// Keep track of topics and partitions served by this broker for fast lookup.
@Getter
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<Integer>> owningTopics;
private int numberOfNamespaceBundles = 0;
private long numberOfNamespaceBundles = 0;

private final EventLoopGroup acceptorGroup;
private final EventLoopGroup workerGroup;
Expand Down Expand Up @@ -2309,7 +2309,7 @@ private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle,
topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.SUCCESS);
}

public int getNumberOfNamespaceBundles() {
public long getNumberOfNamespaceBundles() {
this.numberOfNamespaceBundles = 0;
this.multiLayerTopicsMap.forEach((namespaceName, bundles) -> {
this.numberOfNamespaceBundles += bundles.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

public class BrokerStats extends NamespaceStats {

public int bundleCount;
public int topics;
public long bundleCount;
public long topics;
public BrokerStats(int ratePeriodInSeconds) {
super(ratePeriodInSeconds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class NamespaceStats {
public int consumerCount;
public int producerCount;
public int replicatorCount;
public int subsCount;
public long subsCount;
public static final String BRK_ADD_ENTRY_LATENCY_PREFIX = "brk_AddEntryLatencyBuckets";
public long[] addLatencyBucket = new long[ENTRY_LATENCY_BUCKETS_USEC.length + 1];
public static final String[] ADD_LATENCY_BUCKET_KEYS = new String[ENTRY_LATENCY_BUCKETS_USEC.length + 1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,17 +682,11 @@ private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
for (ResourceGroupMonitoringClass mc : ResourceGroupMonitoringClass.values()) {
String mcName = mc.name();
int mcIndex = mc.ordinal();
double quotaBytes = ResourceGroupService.getRgQuotaByteCount(rgName, mcName);
totalQuotaBytes[mcIndex] += quotaBytes;
double quotaMesgs = ResourceGroupService.getRgQuotaMessageCount(rgName, mcName);
totalQuotaMessages[mcIndex] += quotaMesgs;
double usedBytes = ResourceGroupService.getRgLocalUsageByteCount(rgName, mcName);
totalUsedBytes[mcIndex] += usedBytes;
double usedMesgs = ResourceGroupService.getRgLocalUsageMessageCount(rgName, mcName);
totalUsedMessages[mcIndex] += usedMesgs;

double usageReportedCount = ResourceGroup.getRgUsageReportedCount(rgName, mcName);
totalUsageReportCounts[mcIndex] += usageReportedCount;
totalQuotaBytes[mcIndex] += ResourceGroupService.getRgQuotaByteCount(rgName, mcName);
totalQuotaMessages[mcIndex] += ResourceGroupService.getRgQuotaMessageCount(rgName, mcName);
totalUsedBytes[mcIndex] += ResourceGroupService.getRgLocalUsageByteCount(rgName, mcName);
totalUsedMessages[mcIndex] += ResourceGroupService.getRgLocalUsageMessageCount(rgName, mcName);
totalUsageReportCounts[mcIndex] += ResourceGroup.getRgUsageReportedCount(rgName, mcName);
}

totalTenantRegisters += ResourceGroupService.getRgTenantRegistersCount(rgName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface LoadManagerReport extends ServiceLookupData {

Map<String, NamespaceBundleStats> getBundleStats();

int getNumTopics();
long getNumTopics();

int getNumBundles();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class LoadReport implements LoadManagerReport {
private long timestamp;
private double msgRateIn;
private double msgRateOut;
private int numTopics;
private long numTopics;
private int numConsumers;
private int numProducers;
private int numBundles;
Expand Down Expand Up @@ -205,7 +205,7 @@ public String getLoadReportType() {
}

@Override
public int getNumTopics() {
public long getNumTopics() {
numTopics = 0;
if (this.bundleStats != null) {
this.bundleStats.forEach((bundle, stats) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ public void setLastStats(Map<String, NamespaceBundleStats> lastStats) {
}

@Override
public int getNumTopics() {
public long getNumTopics() {
return numTopics;
}

Expand Down

0 comments on commit fc2e314

Please sign in to comment.