Skip to content

Commit

Permalink
RavenDB-21551 Fix race between deletion request and applying that on …
Browse files Browse the repository at this point in the history
…the relevant node
  • Loading branch information
karmeli87 committed Nov 5, 2023
1 parent 8ac8a15 commit cfe0206
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ public WaitForRaftIndexCommand(long index)
_index = index;
}

public WaitForRaftIndexCommand(long index, string node) : this(index)
{
SelectedNodeTag = node;
}

public override HttpRequestMessage CreateRequest(JsonOperationContext ctx, ServerNode node, out string url)
{
url = $"{node.Url}/rachis/waitfor?index={_index}";
Expand Down
2 changes: 1 addition & 1 deletion src/Raven.Server/Extensions/HttpExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static string GetFullUrl(this HttpRequest request)
throw new InvalidOperationException("Missing Host");

string path = (request.PathBase.HasValue || request.Path.HasValue) ? (request.PathBase + request.Path).ToString() : "/";
return request.Scheme + "://" + request.Host + path + request.Query;
return request.Scheme + "://" + request.Host + path + request.QueryString;
}

public static string ExtractNodeUrlFromRequest(HttpRequest request)
Expand Down
87 changes: 39 additions & 48 deletions src/Raven.Server/Web/RequestHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Runtime.CompilerServices;
Expand Down Expand Up @@ -198,74 +199,64 @@ protected async Task WaitForExecutionOnRelevantNodes(JsonOperationContext contex
if (members.Count == 0)
throw new InvalidOperationException("Cannot wait for execution when there are no nodes to execute ON.");

var executors = new List<ClusterRequestExecutor>();

try
using (var cts = CreateHttpRequestBoundTimeLimitedOperationToken(ServerStore.Configuration.Cluster.OperationTimeout.AsTimeSpan))
using (var requestExecutor = ClusterRequestExecutor.Create(clusterTopology.Members.Values.ToArray(), ServerStore.Server.Certificate.Certificate,
DocumentConventions.DefaultForServer))
{
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(ServerStore.ServerShutdown))
{
cts.CancelAfter(ServerStore.Configuration.Cluster.OperationTimeout.AsTimeSpan);

var waitingTasks = new List<Task<Exception>>();
List<Exception> exceptions = null;
var waitingTasks = new List<Task<Exception>>();
List<Exception> exceptions = null;

foreach (var member in members)
{
var url = clusterTopology.GetUrlFromTag(member);
var executor = ClusterRequestExecutor.CreateForSingleNode(url, ServerStore.Server.Certificate.Certificate, DocumentConventions.DefaultForServer);
executors.Add(executor);
waitingTasks.Add(ExecuteTask(executor, member, cts.Token));
}
foreach (var member in members)
{
var url = clusterTopology.GetUrlFromTag(member);
waitingTasks.Add(ExecuteTask(requestExecutor, member, cts.Token));
}

while (waitingTasks.Count > 0)
{
var task = await Task.WhenAny(waitingTasks);
waitingTasks.Remove(task);
while (waitingTasks.Count > 0)
{
var task = await Task.WhenAny(waitingTasks);
waitingTasks.Remove(task);

if (task.Result == null)
continue;
if (task.Result == null)
continue;

var exception = task.Result.ExtractSingleInnerException();
var exception = task.Result.ExtractSingleInnerException();

if (exceptions == null)
exceptions = new List<Exception>();
if (exceptions == null)
exceptions = new List<Exception>();

exceptions.Add(exception);
}
exceptions.Add(exception);
}

if (exceptions != null)
if (exceptions != null)
{
var allTimeouts = true;
foreach (var exception in exceptions)
{
var allTimeouts = true;
foreach (var exception in exceptions)
{
if (exception is OperationCanceledException)
continue;
if (exception is OperationCanceledException)
continue;

allTimeouts = false;
}
allTimeouts = false;
}

var aggregateException = new AggregateException(exceptions);
var aggregateException = new AggregateException(exceptions);

if (allTimeouts)
throw new TimeoutException($"Waited too long for the raft command (number {index}) to be executed on any of the relevant nodes to this command.", aggregateException);
if (allTimeouts)
throw new TimeoutException($"Waited too long for the raft command (number {index}) to be executed on any of the relevant nodes to this command.",
aggregateException);

throw new InvalidDataException($"The database '{database}' was created but is not accessible, because all of the nodes on which this database was supposed to reside on, threw an exception.", aggregateException);
}
}
}
finally
{
foreach (var executor in executors)
{
executor.Dispose();
throw new InvalidDataException(
$"The database '{database}' was created but is not accessible, because all of the nodes on which this database was supposed to reside on, threw an exception.",
aggregateException);
}
}

async Task<Exception> ExecuteTask(RequestExecutor executor, string nodeTag, CancellationToken token)
{
try
{
await executor.ExecuteAsync(new WaitForRaftIndexCommand(index), context, token: token);
var cmd = new WaitForRaftIndexCommand(index, nodeTag);
await executor.ExecuteAsync(cmd, context, token: token);
return null;
}
catch (RavenException re) when (re.InnerException is HttpRequestException)
Expand Down
47 changes: 35 additions & 12 deletions src/Raven.Server/Web/System/AdminDatabasesHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ public async Task Delete()
{
await ServerStore.EnsureNotPassiveAsync();

var waitOnRecordDeletion = new List<string>();
var waitOnDeletion = new List<string>();
var pendingDeletes = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var databasesToDelete = new HashSet<string>(StringComparer.OrdinalIgnoreCase);

Expand Down Expand Up @@ -752,14 +752,9 @@ public async Task Delete()
pendingDeletes.Add(node);
topology.RemoveFromTopology(node);
}

if (topology.Count == 0)
waitOnRecordDeletion.Add(databaseName);

continue;
}

waitOnRecordDeletion.Add(databaseName);
waitOnDeletion.Add(databaseName);
}
}

Expand All @@ -779,23 +774,46 @@ public async Task Delete()
}

await ServerStore.Cluster.WaitForIndexNotification(index);

long actualDeletionIndex = index;

var timeToWaitForConfirmation = parameters.TimeToWaitForConfirmation ?? TimeSpan.FromSeconds(15);

var sp = Stopwatch.StartNew();
int databaseIndex = 0;
while (waitOnRecordDeletion.Count > databaseIndex)


while (waitOnDeletion.Count > databaseIndex)
{
var databaseName = waitOnRecordDeletion[databaseIndex];
var databaseName = waitOnDeletion[databaseIndex];
using (context.OpenReadTransaction())
using (var raw = ServerStore.Cluster.ReadRawDatabaseRecord(context, databaseName))
{
if (ServerStore.Cluster.DatabaseExists(context, databaseName) == false)
if (raw == null)
{
waitOnRecordDeletion.RemoveAt(databaseIndex);
waitOnDeletion.RemoveAt(databaseIndex);
continue;
}

if (parameters.FromNodes != null && parameters.FromNodes.Length > 0)
{
{
var allNodesDeleted = true;
foreach (var node in parameters.FromNodes)
{
if (raw.DeletionInProgress.ContainsKey(node) == false)
continue;

allNodesDeleted = false;
break;
}

if (allNodesDeleted)
{
waitOnDeletion.RemoveAt(databaseIndex);
continue;
}
}
}
}
// we'll now wait for the _next_ operation in the cluster
// since deletion involve multiple operations in the cluster
Expand All @@ -820,6 +838,11 @@ public async Task Delete()
}
}

if (parameters.FromNodes != null && parameters.FromNodes.Length > 0)
{
await WaitForExecutionOnRelevantNodes(context, "server", ServerStore.GetClusterTopology(), parameters.FromNodes.ToList(), actualDeletionIndex);
}

await using (var writer = new AsyncBlittableJsonTextWriter(context, ResponseBodyStream()))
{
context.Write(writer, new DynamicJsonValue
Expand Down

0 comments on commit cfe0206

Please sign in to comment.