Skip to content

Commit

Permalink
[segment replication] decouple the rateLimiter of segrep and recovery (
Browse files Browse the repository at this point in the history
…opensearch-project#12939)

add setting "indices.replication.max_bytes_per_sec" which takes effect when not negative

Signed-off-by: maxliu <[email protected]>
  • Loading branch information
Ferrari248 committed Apr 10, 2024
1 parent cf5ecd1 commit 25936fc
Showing 1 changed file with 4 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ public final class RemoteSegmentFileChunkWriter implements FileChunkWriter {
private final AtomicLong requestSeqNoGenerator;
private final RetryableTransportClient retryableTransportClient;
private final ShardId shardId;
private final RecoverySettings recoverySettings;
private final long replicationId;
private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final TransportRequestOptions fileChunkRequestOptions;
private final Consumer<Long> onSourceThrottle;
private final Supplier<RateLimiter> rateLimiterProvider;
private final Supplier<RateLimiter> rateLimiterSupplier;
private final String action;

public RemoteSegmentFileChunkWriter(
Expand All @@ -53,15 +52,14 @@ public RemoteSegmentFileChunkWriter(
String action,
AtomicLong requestSeqNoGenerator,
Consumer<Long> onSourceThrottle,
Supplier<RateLimiter> rateLimiterProvider
Supplier<RateLimiter> rateLimiterSupplier
) {
this.replicationId = replicationId;
this.recoverySettings = recoverySettings;
this.retryableTransportClient = retryableTransportClient;
this.shardId = shardId;
this.requestSeqNoGenerator = requestSeqNoGenerator;
this.onSourceThrottle = onSourceThrottle;
this.rateLimiterProvider = rateLimiterProvider;
this.rateLimiterSupplier = rateLimiterSupplier;
this.fileChunkRequestOptions = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
Expand All @@ -82,7 +80,7 @@ public void writeFileChunk(
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
final RateLimiter rl = rateLimiterProvider.get();
final RateLimiter rl = rateLimiterSupplier.get();
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rl.getMinPauseCheckBytes()) {
Expand Down

0 comments on commit 25936fc

Please sign in to comment.