From c40cc8fd108fdc3dcddedf52692fa7f220a0348e Mon Sep 17 00:00:00 2001 From: Indrek Juhkam Date: Thu, 28 Nov 2019 14:39:39 +0200 Subject: [PATCH] Fix transfer req loop when a node was replaced Scenario how Node2 is replaced by Node3 (this is also basically a rolling update): 1. Node1 and Node2 are up and synced 2. Kill Node2 (node1 will start permdown grace period for Node2) 3. Spawn Node3 4. Node1 sends a heartbeat that includes clocks for Node1 & Node2 5. Node3 receives the heartbeat. It sees node1 clock is dominating because there's Node2 clock. It requests transfer from Node1. 6. Node1 sends transfer ack to Node3 7. Node3 uses `State#extract` to process the transfer payload which discards Node2 values. 8. It all starts again from step 4 on the next heartbeat. This loop between steps 4 and 8 lasts until Node1 permdown period for Node2 triggers and it doesn't put it to the heartbeat clocks any more. The solution here is not to include down replicas in the heartbeat notifications. This fixes #135 --- lib/phoenix/tracker/state.ex | 9 +++- .../tracker/shard_replication_test.exs | 53 +++++++++++++++++-- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 1d2d0291d..7ab8bfa51 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -68,7 +68,14 @@ defmodule Phoenix.Tracker.State do Returns the causal context for the set. """ @spec clocks(t) :: {name, context} - def clocks(%State{replica: rep, context: ctx}), do: {rep, ctx} + def clocks(%State{replica: rep, context: ctx} = state) do + # Exclude down replicas from clocks as they are also not included in + # deltas. Otherwise if this node knows of a down node X in permdown grace + # period, another node Y which came up after X went down will keep + # requesting full state from this node as the clock of Y will be dominated + # by the clock of this node. + {rep, Map.drop(ctx, down_replicas(state))} + end @doc """ Adds a new element to the set. diff --git a/test/phoenix/tracker/shard_replication_test.exs b/test/phoenix/tracker/shard_replication_test.exs index 88473717d..6761ad7e8 100644 --- a/test/phoenix/tracker/shard_replication_test.exs +++ b/test/phoenix/tracker/shard_replication_test.exs @@ -8,11 +8,16 @@ defmodule Phoenix.Tracker.ShardReplicationTest do setup config do tracker = config.test - {:ok, shard_pid} = start_shard(name: tracker) - {:ok, topic: to_string(config.test), + + tracker_opts = config |> Map.get(:tracker_opts, []) |> Keyword.put_new(:name, tracker) + {:ok, shard_pid} = start_shard(tracker_opts) + + {:ok, + topic: to_string(tracker), shard: shard_name(tracker), shard_pid: shard_pid, - tracker: tracker} + tracker: tracker, + tracker_opts: tracker_opts} end test "heartbeats", %{shard: shard} do @@ -342,6 +347,48 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert replicas(shard) == %{} end + # By default permdown period is 1.5 seconds in the tests. This however is + # not enough to test this case. + @tag tracker_opts: [permdown_period: 2000] + test "rolling node update", %{topic: topic, shard: shard_name, tracker_opts: tracker_opts} do + permdown_period = Keyword.fetch!(tracker_opts, :permdown_period) + + # Ensure 2 online shards - primary and node1 + spy_on_server(@primary, self(), shard_name) + spy_on_server(@node1, self(), shard_name) + {node1_node, {:ok, node1_shared}} = start_shard(@node1, tracker_opts) + + assert_receive {{:replica_up, @primary}, @node1}, @timeout + assert_receive {{:replica_up, @node1}, @primary}, @timeout + + # Add one user to primary to ensure transfers are requested from new nodes + track_presence(@primary, shard_name, spawn_pid(), topic, "primary", %{}) + + assert_heartbeat(to: @primary, from: @node1) + assert_heartbeat(to: @node1, from: @primary) + + # Remove node 1 (starts permdown grace on primary) + Process.unlink(node1_node) + Process.exit(node1_shared, :kill) + assert_receive {{:replica_down, @node1}, @primary}, @timeout + + # Start node 2 (has no knowledge of node1) + spy_on_server(@node2, self(), shard_name) + {_node2_node, {:ok, _node2_shard}} = start_shard(@node2, tracker_opts) + + assert_receive {{:replica_up, @node2}, @primary}, @timeout + assert_receive {{:replica_up, @primary}, @node2}, @timeout + + # Sends transfer request once + assert_transfer_req(from: @node2, to: @primary) + + # Does not send more transfer requests + refute_transfer_req(from: @node2, to: @primary) + + # Wait until primary is permanently down + assert_receive {{:replica_permdown, @node1}, @primary}, permdown_period * 2 + end + ## Helpers def spawn_pid, do: spawn(fn -> :timer.sleep(:infinity) end)