Skip to content

Commit

Permalink
Change snapshot threadpool for deletion
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Aug 6, 2024
1 parent 256041c commit 94f0e51
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,7 @@ private void executeStaleShardDelete(
final int basePathLen = basePath.length();
List<String> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS);
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.wrap(listener, l -> {
try {
// filtering files for which remote store lock release and cleanup succeeded,
// remaining files for which it failed will be retried in next snapshot delete run.
Expand Down Expand Up @@ -1548,7 +1548,7 @@ private void cleanupStaleBlobs(
listener.onResponse(deleteResult);
}, listener::onFailure), 2);

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION);
final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
if (staleRootBlobs.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
Expand Down Expand Up @@ -1780,7 +1780,7 @@ private void executeOneStaleIndexDelete(
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
if (indexEntry != null) {
final String indexSnId = indexEntry.getKey();
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.supply(listener, () -> {
DeleteResult deleteResult = DeleteResult.ZERO;
try {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
public static class Names {
public static final String SAME = "same";
public static final String GENERIC = "generic";
public static final String SNAPSHOT_DELETION = "snapshot_deletion";
@Deprecated
public static final String LISTENER = "listener";
public static final String GET = "get";
Expand Down Expand Up @@ -165,6 +166,7 @@ public static ThreadPoolType fromType(String type) {
HashMap<String, ThreadPoolType> map = new HashMap<>();
map.put(Names.SAME, ThreadPoolType.DIRECT);
map.put(Names.GENERIC, ThreadPoolType.SCALING);
map.put(Names.SNAPSHOT_DELETION, ThreadPoolType.SCALING);
map.put(Names.LISTENER, ThreadPoolType.FIXED);
map.put(Names.GET, ThreadPoolType.FIXED);
map.put(Names.ANALYZE, ThreadPoolType.FIXED);
Expand Down Expand Up @@ -234,6 +236,7 @@ public ThreadPool(
final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors);
final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.SNAPSHOT_DELETION, new ScalingExecutorBuilder(Names.SNAPSHOT_DELETION, 1, 500, TimeValue.timeValueSeconds(30)));
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000));
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000));
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
Expand Down

0 comments on commit 94f0e51

Please sign in to comment.