Skip to content

Commit

Permalink
Add queryIds and cpuUsage to ResourceGroupInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
miniway authored and cberner committed Nov 7, 2016
1 parent 0fca220 commit 7d98b8e
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.ResourceGroup;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.resourceGroups.SchedulingPolicy;
Expand Down Expand Up @@ -125,14 +126,19 @@ public ResourceGroupInfo getInfo()
List<ResourceGroupInfo> infos = subGroups.values().stream()
.map(InternalResourceGroup::getInfo)
.collect(Collectors.toList());
Set<QueryId> queryIds = runningQueries.stream()
.map(QueryExecution::getQueryId)
.collect(Collectors.toSet());
return new ResourceGroupInfo(
id,
new DataSize(softMemoryLimitBytes, BYTE),
maxRunningQueries,
maxQueuedQueries,
runningQueries.size() + descendantRunningQueries,
queuedQueries.size() + descendantQueuedQueries,
cpuUsageMillis,
new DataSize(cachedMemoryUsageBytes, BYTE),
queryIds,
infos);
}
}
Expand All @@ -159,6 +165,22 @@ public int getQueuedQueries()
}
}

@Managed
public long getMemoryUsageBytes()
{
synchronized (root) {
return cachedMemoryUsageBytes;
}
}

@Managed
public long getCpuUsageMillis()
{
synchronized (root) {
return cpuUsageMillis;
}
}

@Override
public DataSize getSoftMemoryLimit()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
*/
package com.facebook.presto.execution.resourceGroups;

import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;

import java.util.List;
import java.util.Set;

import static java.util.Objects.requireNonNull;

Expand All @@ -30,7 +33,9 @@ public class ResourceGroupInfo
private final int maxQueuedQueries;
private final int runningQueries;
private final int queuedQueries;
private final long cpuUsage;
private final DataSize memoryUsage;
private final Set<QueryId> queryIds;
private final List<ResourceGroupInfo> subGroups;

public ResourceGroupInfo(
Expand All @@ -40,7 +45,9 @@ public ResourceGroupInfo(
int maxQueuedQueries,
int runningQueries,
int queuedQueries,
long cpuUsage,
DataSize memoryUsage,
Set<QueryId> queryIds,
List<ResourceGroupInfo> subGroups)
{
this.id = id;
Expand All @@ -49,7 +56,9 @@ public ResourceGroupInfo(
this.maxQueuedQueries = maxQueuedQueries;
this.runningQueries = runningQueries;
this.queuedQueries = queuedQueries;
this.cpuUsage = cpuUsage;
this.memoryUsage = requireNonNull(memoryUsage, "memoryUsage is null");
this.queryIds = ImmutableSet.copyOf(requireNonNull(queryIds, "queryIds is null"));
this.subGroups = ImmutableList.copyOf(requireNonNull(subGroups, "subGroups is null"));
}

Expand Down Expand Up @@ -88,8 +97,18 @@ public int getQueuedQueries()
return queuedQueries;
}

public long getCpuUsage()
{
return cpuUsage;
}

public DataSize getMemoryUsage()
{
return memoryUsage;
}

public Set<QueryId> getQueryIds()
{
return queryIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,27 @@
*/
package com.facebook.presto.execution.resourceGroups;

import com.facebook.presto.execution.QueryInfo;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.resourceGroups.ResourceGroupManagerPlugin;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.testng.annotations.Test;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.tests.tpch.TpchQueryRunner.createQueryRunner;
import static io.airlift.testing.Assertions.assertGreaterThan;
import static io.airlift.testing.Assertions.assertLessThan;
import static io.airlift.units.Duration.nanosSince;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestResourceGroupIntegration
{
Expand All @@ -38,7 +49,7 @@ public void testMemoryFraction()
long startTime = System.nanoTime();
while (true) {
SECONDS.sleep(1);
ResourceGroupInfo global = queryRunner.getCoordinator().getResourceGroupManager().get().getResourceGroupInfo(new ResourceGroupId("global"));
ResourceGroupInfo global = getResourceGroupInfo(queryRunner, new ResourceGroupId("global"));
if (global.getSoftMemoryLimit().toBytes() > 0) {
break;
}
Expand All @@ -47,8 +58,46 @@ public void testMemoryFraction()
}
}

@Test
public void testResourceGroupInfo()
throws Exception
{
try (DistributedQueryRunner queryRunner = createQueryRunner(ImmutableMap.of(), ImmutableMap.of("experimental.resource-groups-enabled", "true"))) {
queryRunner.installPlugin(new ResourceGroupManagerPlugin());
queryRunner.getCoordinator().getResourceGroupManager().get().setConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_info.json")));

CompletableFuture<MaterializedResult> future = runAsync(queryRunner, "SELECT COUNT(*), clerk FROM orders GROUP BY clerk");
while (true) {
List<QueryInfo> queries = queryRunner.getCoordinator().getQueryManager().getAllQueryInfo();
if (!queries.isEmpty() && queries.get(0).getState().isDone()) {
break;
}
if (queries.isEmpty() || !queries.get(0).getState().equals(QueryState.RUNNING)) {
MILLISECONDS.sleep(1);
continue;
}
ResourceGroupInfo global = getResourceGroupInfo(queryRunner, new ResourceGroupId("global"));
assertEquals(global.getQueryIds(), ImmutableSet.of(queries.get(0).getQueryId()));
}
ResourceGroupInfo global = getResourceGroupInfo(queryRunner, new ResourceGroupId("global"));
assertGreaterThan(global.getCpuUsage(), 0L);
future.get();
assertTrue(future.isDone());
}
}

private String getResourceFilePath(String fileName)
{
return this.getClass().getClassLoader().getResource(fileName).getPath();
}

private static CompletableFuture<MaterializedResult> runAsync(DistributedQueryRunner queryRunner, String query)
{
return CompletableFuture.supplyAsync(() -> queryRunner.execute(query));
}

private static ResourceGroupInfo getResourceGroupInfo(DistributedQueryRunner queryRunner, ResourceGroupId id)
{
return queryRunner.getCoordinator().getResourceGroupManager().get().getResourceGroupInfo(id);
}
}
21 changes: 21 additions & 0 deletions presto-tests/src/test/resources/resource_groups_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"rootGroups": [
{
"name": "global",
"softMemoryLimit": "50%",
"hardCpuLimit": "5m",
"softCpuLimit": "30s",
"maxRunning": 100,
"maxQueued": 1000,
"subGroups": [
]
}
],
"selectors": [
{
"group": "global"
}
],
"cpuQuotaPeriod": "30s"
}

0 comments on commit 7d98b8e

Please sign in to comment.