Skip to content

Commit

Permalink
RavenDB-23020 unify the database topology creation
Browse files Browse the repository at this point in the history
  • Loading branch information
karmeli87 committed Oct 14, 2024
1 parent 8abd28e commit 42f932e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/Raven.Server/Web/System/DatabaseHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private static void SetReplicationFactor(DatabaseTopology databaseTopology, Clus
databaseTopology.ReplicationFactor = Math.Min(databaseTopology.ReplicationFactor, clusterTopology.AllNodes.Count);
}

private static void InitializeDatabaseTopology(ServerStore server, DatabaseTopology databaseTopology, ClusterTopology clusterTopology, int replicationFactor,
public static void InitializeDatabaseTopology(ServerStore server, DatabaseTopology databaseTopology, ClusterTopology clusterTopology, int replicationFactor,
string clusterTransactionId)
{
Debug.Assert(databaseTopology != null);
Expand Down
72 changes: 34 additions & 38 deletions src/Raven.Server/Web/System/ShardedAdminDatabaseHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,46 +102,30 @@ public async Task AddNodeToOrchestratorTopology()
}
}

private static string FindFitNodeForDatabase(string databaseName, DatabaseTopology topology, bool isEncrypted, ClusterTopology clusterTopology, Dictionary<string, int> nodeToInstanceCount = null)
private static string FindFitNodeForDatabase(string databaseName, DatabaseTopology topology, bool isEncrypted, ClusterTopology clusterTopology, string node = null)
{
var candidateNodes = clusterTopology.Members.Keys
.Concat(clusterTopology.Promotables.Keys)
.Concat(clusterTopology.Watchers.Keys)
.ToList();
.ToHashSet(StringComparer.InvariantCultureIgnoreCase);

candidateNodes.RemoveAll(n => topology.AllNodes.Contains(n));
candidateNodes.RemoveWhere(n => topology.AllNodes.Contains(n));

if (candidateNodes.Count == 0)
throw new InvalidOperationException($"Looking for a fit node for database {databaseName} but all nodes in cluster are already taken");

candidateNodes.RemoveAll(n => isEncrypted && AdminDatabasesHandler.NotUsingHttps(clusterTopology.GetUrlFromTag(n)));
candidateNodes.RemoveWhere(n => isEncrypted && AdminDatabasesHandler.NotUsingHttps(clusterTopology.GetUrlFromTag(n)));

if (isEncrypted && candidateNodes.Count == 0)
throw new InvalidOperationException($"Database {databaseName} is encrypted and requires a node which supports SSL. There is no such node available in the cluster.");

if (candidateNodes.Count == 0)
throw new InvalidOperationException($"Database {databaseName} already exists on all the nodes of the cluster");

//find node with the least shard replicas on it
string minNode = null;
if (string.IsNullOrEmpty(node) == false && candidateNodes.Contains(node) == false)
throw new InvalidOperationException($"Cannot put the requested node '{node}' to topology");

if (nodeToInstanceCount != null)
{
var minCount = int.MaxValue;
foreach (var (node, count) in nodeToInstanceCount)
{
if (candidateNodes.Contains(node) == false)
continue;

if (count < minCount)
{
minCount = count;
minNode = node;
}
}
}

return minNode ?? clusterTopology.AllNodes.Keys.ElementAt((new Random(DateTime.Now.Microsecond)).Next(clusterTopology.Count - 1));
return node ?? candidateNodes.ElementAt(Random.Shared.Next(candidateNodes.Count - 1));
}

[RavenAction("/admin/databases/orchestrator", "DELETE", AuthorizationStatus.Operator)]
Expand Down Expand Up @@ -242,7 +226,7 @@ public async Task CreateNewShard()

await ServerStore.EnsureNotPassiveAsync();

using (ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
using (ServerStore.Engine.ContextPool.AllocateOperationContext(out ClusterOperationContext context))
using (context.OpenReadTransaction())
{
var databaseRecord = ServerStore.Cluster.ReadRawDatabaseRecord(context, database, out var index);
Expand Down Expand Up @@ -285,7 +269,6 @@ public async Task CreateNewShard()
throw new InvalidOperationException($"Can't add a new shard to database '{database}' because it is currently being deleted.");
}

var newShardTopology = new DatabaseTopology();

int newChosenShardNumber;

Expand All @@ -307,30 +290,43 @@ public async Task CreateNewShard()
if (replicationFactor.HasValue && replicationFactor.Value > clusterTopology.AllNodes.Count)
throw new InvalidOperationException($"Replication factor {replicationFactor.Value} cannot exceed the number of nodes in the cluster {clusterTopology.AllNodes.Count}.");

newShardTopology.ReplicationFactor = replicationFactor ?? (nodesList.Count > 0 ? nodesList.Count : databaseRecord.Sharding.Shards.ElementAt(0).Value.ReplicationFactor);
newShardTopology.DynamicNodesDistribution = dynamicNodeDistribution ?? databaseRecord.Sharding.Shards.ElementAt(0).Value.DynamicNodesDistribution;
replicationFactor ??= (nodesList.Count > 0 ? nodesList.Count : databaseRecord.Sharding.Shards.ElementAt(0).Value.ReplicationFactor);
dynamicNodeDistribution ??= databaseRecord.Sharding.Shards.ElementAt(0).Value.DynamicNodesDistribution;

var nodeToInstanceCount = new Dictionary<string, int>();
foreach (var node in clusterTopology.AllNodes.Keys)
if (nodesList.Count == 0)
{
nodeToInstanceCount[node] = 0;
}
var nodeToInstanceCount = new Dictionary<string, int>();
foreach (var node in clusterTopology.AllNodes.Keys)
{
nodeToInstanceCount[node] = 0;
}

foreach (var (_, topology) in databaseRecord.Sharding.Shards)
{
foreach (var node in topology.AllNodes)
foreach (var (_, topology) in databaseRecord.Sharding.Shards)
{
nodeToInstanceCount[node] += 1;
foreach (var node in topology.AllNodes)
{
nodeToInstanceCount[node] += 1;
}
}

nodesList = nodeToInstanceCount.OrderBy(n => n.Value).Select(kvp => kvp.Key).ToList();
}

var newShardTopology = new DatabaseTopology
{
DynamicNodesDistribution = dynamicNodeDistribution.Value
};

for (int i = 0; i < newShardTopology.ReplicationFactor; i++)
for (int i = 0; i < replicationFactor; i++)
{
var node = nodesList.ElementAtOrDefault(i);
node ??= FindFitNodeForDatabase(database, newShardTopology, databaseRecord.IsEncrypted, clusterTopology, nodeToInstanceCount);
node = FindFitNodeForDatabase(database, newShardTopology, databaseRecord.IsEncrypted, clusterTopology, node);
newShardTopology.Members.Add(node);
}


DatabaseHelper.InitializeDatabaseTopology(ServerStore, newShardTopology, clusterTopology, replicationFactor.Value,
databaseRecord.Sharding.Orchestrator.Topology.ClusterTransactionIdBase64);

var update = new CreateNewShardCommand(database, newChosenShardNumber, newShardTopology, SystemTime.UtcNow, raftRequestId);

var (newIndex, _) = await ServerStore.SendToLeaderAsync(update);
Expand Down

0 comments on commit 42f932e

Please sign in to comment.