Skip to content

Commit

Permalink
Lighten the load on Cassandra backend with less repair_run scans
Browse files Browse the repository at this point in the history
Reaper checks the running and paused repair runs every 10 to 30 seconds to perform maintenance operations.
When the repair history starts growing, Reaper will scan them all to look for running/paused repairs because the repair_run_by_cluster table doesn't contain the repair run state.
This commit adds this column, which allows to filter runs early in the process instead of scanning them all in the repair_run table.
Filter runs by state in the repair_run_by_state table to avoid reading them all from the repair_run table.
Push limit from the UI to the DAO to avoid reading more repair runs from the database than required.
  • Loading branch information
adejanovski committed Aug 11, 2020
1 parent fca00c4 commit 2a676b0
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -614,10 +614,11 @@ public Response abortRepairRunSegment(@PathParam("id") UUID repairRunId, @PathPa
@GET
@Path("/cluster/{cluster_name}")
public Response getRepairRunsForCluster(
@PathParam("cluster_name") String clusterName) {
@PathParam("cluster_name") String clusterName,
@QueryParam("limit") Optional<Integer> limit) {

LOG.debug("get repair run for cluster called with: cluster_name = {}", clusterName);
final Collection<RepairRun> repairRuns = context.storage.getRepairRunsForCluster(clusterName, Optional.empty());
final Collection<RepairRun> repairRuns = context.storage.getRepairRunsForCluster(clusterName, limit);
final Collection<RepairRunStatus> repairRunViews = new ArrayList<>();
for (final RepairRun repairRun : repairRuns) {
repairRunViews.add(getRepairRunStatus(repairRun));
Expand Down Expand Up @@ -656,7 +657,8 @@ private static URI buildRepairRunUri(UriInfo uriInfo, RepairRun repairRun) {
public Response listRepairRuns(
@QueryParam("state") Optional<String> state,
@QueryParam("cluster_name") Optional<String> cluster,
@QueryParam("keyspace_name") Optional<String> keyspace) {
@QueryParam("keyspace_name") Optional<String> keyspace,
@QueryParam("limit") Optional<Integer> limit) {

try {
final Set desiredStates = splitStateParam(state);
Expand All @@ -670,7 +672,7 @@ public Response listRepairRuns(

List<RepairRunStatus> runStatuses = Lists.newArrayList();
for (final Cluster clstr : clusters) {
Collection<RepairRun> runs = context.storage.getRepairRunsForCluster(clstr.getName(), Optional.empty());
Collection<RepairRun> runs = context.storage.getRepairRunsForCluster(clstr.getName(), limit);

runStatuses.addAll(
(List<RepairRunStatus>) getRunStatuses(runs, desiredStates)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -342,7 +343,7 @@ public RepairRun startRepairRun(RepairRun runToBeStarted) throws ReaperException

public RepairRun updateRepairRunIntensity(RepairRun repairRun, Double intensity) throws ReaperException {
RepairRun updatedRun = repairRun.with().intensity(intensity).build(repairRun.getId());
if (!context.storage.updateRepairRun(updatedRun)) {
if (!context.storage.updateRepairRun(updatedRun, Optional.of(false))) {
throw new ReaperException("failed updating repair run " + updatedRun.getId());
}
return updatedRun;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,9 @@ void updateLastEvent(String newEvent) {
"Will not update lastEvent of run that has already terminated. The message was: " + "\"{}\"",
newEvent);
} else {
context.storage.updateRepairRun(repairRun.get().with().lastEvent(newEvent).build(repairRunId));
context.storage.updateRepairRun(
repairRun.get().with().lastEvent(newEvent).build(repairRunId),
Optional.of(false));
LOG.info(newEvent);
}
}
Expand Down Expand Up @@ -644,7 +646,7 @@ private RepairRun fixMissingRepairRunTables(RepairRun repairRun, RepairUnit repa
.tables(RepairUnitService.create(context).getTablesToRepair(cluster, repairUnit))
.build(repairRun.getId());

context.storage.updateRepairRun(newRepairRun);
context.storage.updateRepairRun(newRepairRun, Optional.of(false));
return newRepairRun;
}
return repairRun;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.cassandrareaper.storage.cassandra.Migration016;
import io.cassandrareaper.storage.cassandra.Migration021;
import io.cassandrareaper.storage.cassandra.Migration024;
import io.cassandrareaper.storage.cassandra.Migration025;

import java.io.IOException;
import java.math.BigInteger;
Expand Down Expand Up @@ -116,6 +117,7 @@ public final class CassandraStorage implements IStorage, IDistributedStorage {

private static final int METRICS_PARTITIONING_TIME_MINS = 10;
private static final int LEAD_DURATION = 600;
private static final int MAX_RETURNED_REPAIR_RUNS = 1000;
/* Simple stmts */
private static final String SELECT_CLUSTER = "SELECT * FROM cluster";
private static final String SELECT_REPAIR_SCHEDULE = "SELECT * FROM repair_schedule_v1";
Expand Down Expand Up @@ -148,6 +150,7 @@ public RepairUnit load(UUID repairUnitId) throws Exception {
private PreparedStatement getClusterPrepStmt;
private PreparedStatement deleteClusterPrepStmt;
private PreparedStatement insertRepairRunPrepStmt;
private PreparedStatement insertRepairRunNoStatePrepStmt;
private PreparedStatement insertRepairRunClusterIndexPrepStmt;
private PreparedStatement insertRepairRunUnitIndexPrepStmt;
private PreparedStatement getRepairRunPrepStmt;
Expand Down Expand Up @@ -279,6 +282,9 @@ private static void initializeAndUpgradeSchema(
Migration021.migrate(session, config.getCassandraFactory().getKeyspace());
// Switch metrics table to TWCS if possible, this is intentionally executed every startup
Migration024.migrate(session, config.getCassandraFactory().getKeyspace());
if (database.getVersion() == 25) {
Migration025.migrate(session, config.getCassandraFactory().getKeyspace());
}
} else {
LOG.info(
String.format("Keyspace %s already at schema version %d", session.getLoggedKeyspace(), currentVersion));
Expand Down Expand Up @@ -339,8 +345,14 @@ private void prepareStatements() {
+ "start_time, end_time, pause_time, intensity, last_event, segment_count, repair_parallelism,tables) "
+ "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
.setConsistencyLevel(ConsistencyLevel.QUORUM);
insertRepairRunNoStatePrepStmt = session
.prepare(
"INSERT INTO repair_run(id, cluster_name, repair_unit_id, cause, owner, creation_time, "
+ "start_time, end_time, pause_time, intensity, last_event, segment_count, repair_parallelism,tables) "
+ "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
.setConsistencyLevel(ConsistencyLevel.QUORUM);
insertRepairRunClusterIndexPrepStmt
= session.prepare("INSERT INTO repair_run_by_cluster(cluster_name, id) values(?, ?)");
= session.prepare("INSERT INTO repair_run_by_cluster_v2(cluster_name, id, repair_run_state) values(?, ?, ?)");
insertRepairRunUnitIndexPrepStmt
= session.prepare("INSERT INTO repair_run_by_unit(repair_unit_id, id) values(?, ?)");
getRepairRunPrepStmt = session
Expand All @@ -349,13 +361,14 @@ private void prepareStatements() {
+ "pause_time,intensity,last_event,segment_count,repair_parallelism,tables "
+ "FROM repair_run WHERE id = ? LIMIT 1")
.setConsistencyLevel(ConsistencyLevel.QUORUM);
getRepairRunForClusterPrepStmt = session.prepare("SELECT * FROM repair_run_by_cluster WHERE cluster_name = ?");
getRepairRunForClusterPrepStmt = session.prepare(
"SELECT * FROM repair_run_by_cluster_v2 WHERE cluster_name = ? limit ?");
getRepairRunForUnitPrepStmt = session.prepare("SELECT * FROM repair_run_by_unit WHERE repair_unit_id = ?");
deleteRepairRunPrepStmt = session.prepare("DELETE FROM repair_run WHERE id = ?");
deleteRepairRunByClusterPrepStmt
= session.prepare("DELETE FROM repair_run_by_cluster WHERE cluster_name = ?");
= session.prepare("DELETE FROM repair_run_by_cluster_v2 WHERE cluster_name = ?");
deleteRepairRunByClusterByIdPrepStmt
= session.prepare("DELETE FROM repair_run_by_cluster WHERE id = ? and cluster_name = ?");
= session.prepare("DELETE FROM repair_run_by_cluster_v2 WHERE id = ? and cluster_name = ?");
deleteRepairRunByUnitPrepStmt = session.prepare("DELETE FROM repair_run_by_unit "
+ "WHERE id = ? and repair_unit_id= ?");
insertRepairUnitPrepStmt = session
Expand Down Expand Up @@ -729,7 +742,10 @@ public RepairRun addRepairRun(Builder repairRun, Collection<RepairSegment.Builde
futures.add(session.executeAsync(repairRunBatch));
futures.add(
session.executeAsync(
insertRepairRunClusterIndexPrepStmt.bind(newRepairRun.getClusterName(), newRepairRun.getId())));
insertRepairRunClusterIndexPrepStmt.bind(
newRepairRun.getClusterName(),
newRepairRun.getId(),
newRepairRun.getRunState().toString())));
futures.add(
session.executeAsync(
insertRepairRunUnitIndexPrepStmt.bind(newRepairRun.getRepairUnitId(), newRepairRun.getId())));
Expand All @@ -744,7 +760,20 @@ public RepairRun addRepairRun(Builder repairRun, Collection<RepairSegment.Builde

@Override
public boolean updateRepairRun(RepairRun repairRun) {
session.execute(
return updateRepairRun(repairRun, Optional.of(true));
}

@Override
public boolean updateRepairRun(RepairRun repairRun, Optional<Boolean> updateRepairState) {
if (updateRepairState.orElse(true)) {
BatchStatement updateRepairRunBatch = new BatchStatement(BatchStatement.Type.LOGGED);
// Updates of the last event impact the repair state.
// We want to limit overwrites in this case.
updateRepairRunBatch.add(
insertRepairRunClusterIndexPrepStmt.bind(
repairRun.getClusterName(), repairRun.getId(), repairRun.getRunState().toString()));
// Repair state will be updated
updateRepairRunBatch.add(
insertRepairRunPrepStmt.bind(
repairRun.getId(),
repairRun.getClusterName(),
Expand All @@ -761,6 +790,27 @@ public boolean updateRepairRun(RepairRun repairRun) {
repairRun.getSegmentCount(),
repairRun.getRepairParallelism().toString(),
repairRun.getTables()));
session.execute(updateRepairRunBatch);
} else {
session.execute(
insertRepairRunNoStatePrepStmt.bind(
repairRun.getId(),
repairRun.getClusterName(),
repairRun.getRepairUnitId(),
repairRun.getCause(),
repairRun.getOwner(),
repairRun.getCreationTime(),
repairRun.getStartTime(),
repairRun.getEndTime(),
repairRun.getPauseTime(),
repairRun.getIntensity(),
repairRun.getLastEvent(),
repairRun.getSegmentCount(),
repairRun.getRepairParallelism().toString(),
repairRun.getTables()));
}


return true;
}

Expand All @@ -783,7 +833,7 @@ public Collection<RepairRun> getRepairRunsForCluster(String clusterName, Optiona
List<ResultSetFuture> repairRunFutures = Lists.<ResultSetFuture>newArrayList();

// Grab all ids for the given cluster name
Collection<UUID> repairRunIds = getRepairRunIdsForCluster(clusterName);
Collection<UUID> repairRunIds = getRepairRunIdsForCluster(clusterName, limit);
// Grab repair runs asynchronously for all the ids returned by the index table
for (UUID repairRunId : repairRunIds) {
repairRunFutures.add(session.executeAsync(getRepairRunPrepStmt.bind(repairRunId)));
Expand Down Expand Up @@ -835,7 +885,7 @@ public Collection<RepairRun> getRepairRunsWithState(RunState runState) {
List<Collection<UUID>> repairRunIds = getClusters()
.stream()
// Grab all ids for the given cluster name
.map(cluster -> getRepairRunIdsForCluster(cluster.getName()))
.map(cluster -> getRepairRunIdsForClusterWithState(cluster.getName(), runState))
.collect(Collectors.toList());

for (Collection<UUID> clusterRepairRunIds : repairRunIds) {
Expand Down Expand Up @@ -1147,9 +1197,10 @@ public Collection<RepairParameters> getOngoingRepairsInCluster(String clusterNam
}

@Override
public SortedSet<UUID> getRepairRunIdsForCluster(String clusterName) {
public SortedSet<UUID> getRepairRunIdsForCluster(String clusterName, Optional<Integer> limit) {
SortedSet<UUID> repairRunIds = Sets.newTreeSet((u0, u1) -> (int)(u0.timestamp() - u1.timestamp()));
ResultSet results = session.execute(getRepairRunForClusterPrepStmt.bind(clusterName));
ResultSet results = session.execute(getRepairRunForClusterPrepStmt.bind(clusterName, limit.orElse(
MAX_RETURNED_REPAIR_RUNS)));
for (Row result : results) {
repairRunIds.add(result.getUUID("id"));
}
Expand All @@ -1158,6 +1209,20 @@ public SortedSet<UUID> getRepairRunIdsForCluster(String clusterName) {
return repairRunIds;
}

private SortedSet<UUID> getRepairRunIdsForClusterWithState(String clusterName, RunState runState) {
SortedSet<UUID> repairRunIds = Sets.newTreeSet((u0, u1) -> (int)(u0.timestamp() - u1.timestamp()));
ResultSet results = session.execute(getRepairRunForClusterPrepStmt.bind(clusterName, MAX_RETURNED_REPAIR_RUNS));
results.all()
.stream()
.filter(run -> run.getString("repair_run_state").equals(runState.toString()))
.map(run -> run.getUUID("id"))
.forEach(runId -> repairRunIds.add(runId));


LOG.trace("repairRunIds : {}", repairRunIds);
return repairRunIds;
}

@Override
public int getSegmentAmountForRepairRun(UUID runId) {
return (int) session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public interface IStorage {

RepairRun addRepairRun(RepairRun.Builder repairRun, Collection<RepairSegment.Builder> newSegments);

boolean updateRepairRun(RepairRun repairRun, Optional<Boolean> updateRepairState);

boolean updateRepairRun(RepairRun repairRun);

Optional<RepairRun> getRepairRun(UUID id);
Expand Down Expand Up @@ -104,7 +106,7 @@ public interface IStorage {

Collection<RepairParameters> getOngoingRepairsInCluster(String clusterName);

SortedSet<UUID> getRepairRunIdsForCluster(String clusterName);
SortedSet<UUID> getRepairRunIdsForCluster(String clusterName, Optional<Integer> limit);

int getSegmentAmountForRepairRun(UUID runId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public Cluster getCluster(String clusterName) {
@Override
public Cluster deleteCluster(String clusterName) {
getRepairSchedulesForCluster(clusterName).forEach(schedule -> deleteRepairSchedule(schedule.getId()));
getRepairRunIdsForCluster(clusterName).forEach(runId -> deleteRepairRun(runId));
getRepairRunIdsForCluster(clusterName, Optional.empty()).forEach(runId -> deleteRepairRun(runId));

getEventSubscriptions(clusterName)
.stream()
Expand Down Expand Up @@ -153,6 +153,11 @@ public RepairRun addRepairRun(RepairRun.Builder repairRun, Collection<RepairSegm

@Override
public boolean updateRepairRun(RepairRun repairRun) {
return updateRepairRun(repairRun, Optional.of(true));
}

@Override
public boolean updateRepairRun(RepairRun repairRun, Optional<Boolean> updateRepairState) {
if (!getRepairRun(repairRun.getId()).isPresent()) {
return false;
} else {
Expand Down Expand Up @@ -368,7 +373,7 @@ public Collection<RepairParameters> getOngoingRepairsInCluster(String clusterNam
}

@Override
public SortedSet<UUID> getRepairRunIdsForCluster(String clusterName) {
public SortedSet<UUID> getRepairRunIdsForCluster(String clusterName, Optional<Integer> limit) {
SortedSet<UUID> repairRunIds = Sets.newTreeSet((u0, u1) -> (int)(u0.timestamp() - u1.timestamp()));
for (RepairRun repairRun : repairRuns.values()) {
if (repairRun.getClusterName().equalsIgnoreCase(clusterName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Cluster getCluster(String clusterName) {
@Override
public Cluster deleteCluster(String clusterName) {
getRepairSchedulesForCluster(clusterName).forEach(schedule -> deleteRepairSchedule(schedule.getId()));
getRepairRunIdsForCluster(clusterName).forEach(runId -> deleteRepairRun(runId));
getRepairRunIdsForCluster(clusterName, Optional.empty()).forEach(runId -> deleteRepairRun(runId));

getEventSubscriptions(clusterName)
.stream()
Expand Down Expand Up @@ -345,6 +345,11 @@ public RepairRun addRepairRun(RepairRun.Builder newRepairRun, Collection<RepairS

@Override
public boolean updateRepairRun(RepairRun repairRun) {
return updateRepairRun(repairRun, Optional.of(true));
}

@Override
public boolean updateRepairRun(RepairRun repairRun, Optional<Boolean> updateRepairState) {
boolean result = false;
try (Handle h = jdbi.open()) {
int rowsAdded = getPostgresStorage(h).updateRepairRun(repairRun);
Expand Down Expand Up @@ -480,7 +485,7 @@ public Collection<RepairParameters> getOngoingRepairsInCluster(String clusterNam
}

@Override
public SortedSet<UUID> getRepairRunIdsForCluster(String clusterName) {
public SortedSet<UUID> getRepairRunIdsForCluster(String clusterName, Optional<Integer> limit) {
SortedSet<UUID> result = Sets.newTreeSet(Collections.reverseOrder());
try (Handle h = jdbi.open()) {
for (Long l : getPostgresStorage(h).getRepairRunIdsForCluster(clusterName)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2019-2019 The Last Pickle Ltd
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cassandrareaper.storage.cassandra;


import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Migration025 {

private static final Logger LOG = LoggerFactory.getLogger(Migration025.class);
private static final String V1_TABLE = "repair_run_by_cluster";
private static final String V2_TABLE = "repair_run_by_cluster_v2";
private static PreparedStatement v2_insert;

private Migration025() {
}

/**
* Switch to v2 of repair_run_by_cluster
*/
public static void migrate(Session session, String keyspace) {

try {
if (session.getCluster().getMetadata().getKeyspace(keyspace).getTable(V1_TABLE) != null) {
v2_insert = session.prepare(
"INSERT INTO " + V2_TABLE + "(cluster_name, id, repair_run_state) values (?, ?, ?)");
LOG.info("Converting {} table...", V1_TABLE);
ResultSet results = session.execute("SELECT * FROM " + V1_TABLE);
for (Row row:results) {
String state = session.execute("SELECT distinct state from repair_run where id = " + row.getUUID("id")).one()
.getString("state");
session.execute(v2_insert.bind(row.getString("cluster_name"), row.getUUID("id"), state));
}
session.execute("DROP TABLE " + V1_TABLE);
}
} catch (RuntimeException e) {
LOG.error("Failed transferring rows to " + V2_TABLE, e);
}
}

}
Loading

0 comments on commit 2a676b0

Please sign in to comment.