forked from phoenixframework/phoenix_pubsub
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix transfer req loop when a node was replaced
Scenario how Node2 is replaced by Node3 (this is also basically a rolling update): 1. Node1 and Node2 are up and synced 2. Kill Node2 (node1 will start permdown grace period for Node2) 3. Spawn Node3 4. Node1 sends a heartbeat that includes clocks for Node1 & Node2 5. Node3 receives the heartbeat. It sees node1 clock is dominating because there's Node2 clock. It requests transfer from Node1. 6. Node1 sends transfer ack to Node3 7. Node3 uses `State#extract` to process the transfer payload which discards Node2 values. 8. It all starts again from step 4 on the next heartbeat. This loop between steps 4 and 8 lasts until Node1 permdown period for Node2 triggers and it doesn't put it to the heartbeat clocks any more. The solution here is not to include down replicas in the heartbeat notifications. This fixes phoenixframework#135
- Loading branch information
Showing
3 changed files
with
67 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,14 +5,26 @@ defmodule Phoenix.Tracker.ShardReplicationTest do | |
@primary :"[email protected]" | ||
@node1 :"[email protected]" | ||
@node2 :"[email protected]" | ||
@node3 :"[email protected]" | ||
|
||
setup config do | ||
tracker = config.test | ||
{:ok, shard_pid} = start_shard(name: tracker) | ||
{:ok, topic: to_string(config.test), | ||
shard: shard_name(tracker), | ||
shard_pid: shard_pid, | ||
tracker: tracker} | ||
|
||
conf = [ | ||
topic: to_string(tracker), | ||
shard: shard_name(tracker), | ||
tracker: tracker | ||
] | ||
|
||
conf = | ||
if config[:primary] != false do | ||
{:ok, shard_pid} = start_shard(name: tracker) | ||
Keyword.put(conf, :shard_pid, shard_pid) | ||
else | ||
conf | ||
end | ||
|
||
{:ok, conf} | ||
end | ||
|
||
test "heartbeats", %{shard: shard} do | ||
|
@@ -342,6 +354,51 @@ defmodule Phoenix.Tracker.ShardReplicationTest do | |
assert replicas(shard) == %{} | ||
end | ||
|
||
@tag primary: false | ||
test "rolling node update", %{topic: topic, tracker: tracker, shard: shard_name} do | ||
# By default permdown period is 1.5 seconds in the tests. This however is | ||
# not enough to test this case. | ||
permdown_period = 2_000 | ||
tracker_opts = [name: tracker, permdown_period: permdown_period] | ||
|
||
# Add 2 online shards | ||
spy_on_server(@node1, self(), shard_name) | ||
{node1_node, {:ok, node1_shard}} = start_shard(@node1, tracker_opts) | ||
|
||
spy_on_server(@node2, self(), shard_name) | ||
{node2_node, {:ok, node2_shard}} = start_shard(@node2, tracker_opts) | ||
|
||
assert_receive {{:replica_up, @node1}, @node2}, @timeout | ||
assert_receive {{:replica_up, @node2}, @node1}, @timeout | ||
|
||
# Add one user to node1 | ||
track_presence(@node1, shard_name, spawn_pid(), topic, "node1", %{}) | ||
|
||
assert_heartbeat to: @node1, from: @node2 | ||
assert_heartbeat to: @node2, from: @node1 | ||
|
||
# Remove node 2 (starts permdown grace on node1) | ||
Process.unlink(node2_node) | ||
Process.exit(node2_shard, :kill) | ||
assert_receive {{:replica_down, @node2}, @node1}, @timeout | ||
|
||
# Start node 3 (has no knowledge of node2) | ||
spy_on_server(@node3, self(), shard_name) | ||
{node3_node, {:ok, node3_shard}} = start_shard(@node3, tracker_opts) | ||
|
||
assert_receive {{:replica_up, @node3}, @node1}, @timeout | ||
assert_receive {{:replica_up, @node1}, @node3}, @timeout | ||
|
||
# Sends transfer request once | ||
assert_transfer_req(from: @node3, to: @node1) | ||
|
||
# Does not send more transfer requests | ||
refute_transfer_req(from: @node3, to: @node1) | ||
|
||
# Wait until node1 is permanently down | ||
assert_receive {{:replica_permdown, @node2}, @node1}, permdown_period * 2 | ||
end | ||
|
||
## Helpers | ||
|
||
def spawn_pid, do: spawn(fn -> :timer.sleep(:infinity) end) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,7 @@ Supervisor.start_link( | |
) | ||
|
||
unless :clustered in exclude do | ||
Phoenix.PubSub.Cluster.spawn([:"[email protected]", :"[email protected]"]) | ||
Phoenix.PubSub.Cluster.spawn([:"[email protected]", :"[email protected]", :"[email protected]"]) | ||
end | ||
|
||
ExUnit.start() |