From 0af9239d1a4fd1628a16b8c837154c07bb1138f1 Mon Sep 17 00:00:00 2001 From: Indrek Juhkam Date: Thu, 28 Nov 2019 14:39:39 +0200 Subject: [PATCH] Fix transfer req loop when a node was replaced Scenario how Node2 is replaced by Node3 (this is also basically a rolling update): 1. Node1 and Node2 are up and synced 2. Kill Node2 (node1 will start permdown grace period for Node2) 3. Spawn Node3 4. Node1 sends a heartbeat that includes clocks for Node1 & Node2 5. Node3 receives the heartbeat. It sees node1 clock is dominating because there's Node2 clock. It requests transfer from Node1. 6. Node1 sends transfer ack to Node3 7. Node3 uses `State#extract` to process the transfer payload which discards Node2 values. 8. It all starts again from step 4 on the next heartbeat. This loop between steps 4 and 8 lasts until Node1 permdown period for Node2 triggers and it doesn't put it to the heartbeat clocks any more. The solution here is not to include down replicas in the heartbeat notifications. This fixes #135 --- lib/phoenix/tracker/state.ex | 9 +++- .../tracker/shard_replication_test.exs | 53 +++++++++++++++++-- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 1d2d0291d..5e691040c 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -68,7 +68,14 @@ defmodule Phoenix.Tracker.State do Returns the causal context for the set. """ @spec clocks(t) :: {name, context} - def clocks(%State{replica: rep, context: ctx}), do: {rep, ctx} + def clocks(%State{replica: rep, context: ctx} = state) do + # Exclude down replicas from clocks as they are also not included in + # deltas. Otherwise if this node knows of a down node X in permdown grace + # period, another node Y which came up after X will keep requesting full + # state from this node as the clock of Y will be domainted by the clock of + # this node. + {rep, Map.drop(ctx, down_replicas(state))} + end @doc """ Adds a new element to the set. diff --git a/test/phoenix/tracker/shard_replication_test.exs b/test/phoenix/tracker/shard_replication_test.exs index 88473717d..faf977666 100644 --- a/test/phoenix/tracker/shard_replication_test.exs +++ b/test/phoenix/tracker/shard_replication_test.exs @@ -8,11 +8,16 @@ defmodule Phoenix.Tracker.ShardReplicationTest do setup config do tracker = config.test - {:ok, shard_pid} = start_shard(name: tracker) - {:ok, topic: to_string(config.test), + + tracker_opts = config |> Map.get(:tracker_opts, []) |> Keyword.put_new(:name, tracker) + {:ok, shard_pid} = start_shard(tracker_opts) + + {:ok, + topic: to_string(tracker), shard: shard_name(tracker), shard_pid: shard_pid, - tracker: tracker} + tracker: tracker, + tracker_opts: tracker_opts} end test "heartbeats", %{shard: shard} do @@ -342,6 +347,48 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert replicas(shard) == %{} end + # By default permdown period is 1.5 seconds in the tests. This however is + # not enough to test this case. + @tag tracker_opts: [permdown_period: 2000] + test "rolling node update", %{topic: topic, shard: shard_name, tracker_opts: tracker_opts} do + permdown_period = Keyword.fetch!(tracker_opts, :permdown_period) + + # Ensure 2 online shards - primary and node1 + spy_on_server(@primary, self(), shard_name) + spy_on_server(@node1, self(), shard_name) + {node1_node, {:ok, node1_shared}} = start_shard(@node1, tracker_opts) + + assert_receive {{:replica_up, @primary}, @node1}, @timeout + assert_receive {{:replica_up, @node1}, @primary}, @timeout + + # Add one user to primary to ensure that transfer are requested from new nodes + track_presence(@primary, shard_name, spawn_pid(), topic, "primary", %{}) + + assert_heartbeat(to: @primary, from: @node1) + assert_heartbeat(to: @node1, from: @primary) + + # Remove node 1 (starts permdown grace on primary) + Process.unlink(node1_node) + Process.exit(node1_shared, :kill) + assert_receive {{:replica_down, @node1}, @primary}, @timeout + + # Start node 2 (has no knowledge of node1) + spy_on_server(@node2, self(), shard_name) + {_node2_node, {:ok, _node2_shard}} = start_shard(@node2, tracker_opts) + + assert_receive {{:replica_up, @node2}, @primary}, @timeout + assert_receive {{:replica_up, @primary}, @node2}, @timeout + + # Sends transfer request once + assert_transfer_req(from: @node2, to: @primary) + + # Does not send more transfer requests + refute_transfer_req(from: @node2, to: @primary) + + # Wait until primary is permanently down + assert_receive {{:replica_permdown, @node1}, @primary}, permdown_period * 2 + end + ## Helpers def spawn_pid, do: spawn(fn -> :timer.sleep(:infinity) end)