From 8c3fb704d4d08ee919120375d58e23ac87f018f1 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Tue, 4 Feb 2020 11:14:56 +0100 Subject: [PATCH 1/3] Randomize child.id when recovering from a "dead node". This is to prevent Horde.DynamicSupervisor from doing any de-duping. We cannot allow _both_ Horde.Registry and Horde.DynamicSupervisor to de-dupe, so we will make this the exclusive domain of Horde.Registry. --- lib/horde/dynamic_supervisor_impl.ex | 50 +++++++++++++++------------- test/cluster_test.exs | 2 +- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/lib/horde/dynamic_supervisor_impl.ex b/lib/horde/dynamic_supervisor_impl.ex index 0dac6d91..f0ac79d9 100644 --- a/lib/horde/dynamic_supervisor_impl.ex +++ b/lib/horde/dynamic_supervisor_impl.ex @@ -409,19 +409,15 @@ defmodule Horde.DynamicSupervisorImpl do {:ok, %{name: chosen_node}} -> current_member = Map.get(state.members_info, current_node) - cond do - this_node != current_node and this_node == chosen_node -> - # handle_dead_nodes - case current_member do - %{status: :dead} -> - {_, state} = add_child(child_spec, state) - state + case {current_node, chosen_node} do + {_same_node, _same_node} -> + # process is running on the node on which it belongs - _ -> - state - end + state + + {^this_node, _other_node} -> + # process is running here but belongs somewhere else - this_node == current_node and chosen_node != this_node -> case state.supervisor_options[:process_redistribution] do :active -> handoff_child(child_spec, state) @@ -430,7 +426,21 @@ defmodule Horde.DynamicSupervisorImpl do state end - true -> + {_other_node, ^this_node} -> + # process is running on another node but belongs here + + case current_member do + %{status: :dead} -> + {_, state} = add_child(randomize_child_id(child_spec), state) + state + + _ -> + state + end + + {_other_node1, _other_node2} -> + # process is neither running here nor belongs here + state end @@ -468,17 +478,6 @@ defmodule Horde.DynamicSupervisorImpl do defp update_process(state, {:add, {:process, child_id}, {node, child_spec, child_pid}}) do this_node = fully_qualified_name(state.name) - case {Map.get(state.processes_by_id, child_id), node} do - {{^this_node, _child_spec, _child_pid}, ^this_node} -> - nil - - {{^this_node, _child_spec, _child_pid}, _other_node} -> - Horde.ProcessesSupervisor.terminate_child_by_id(supervisor_name(state.name), child_id) - - _ -> - nil - end - new_process_pid_to_id = case Map.get(state.processes_by_id, child_id) do {_, _, old_pid} -> Map.delete(state.process_pid_to_id, old_pid) @@ -689,6 +688,11 @@ defmodule Horde.DynamicSupervisorImpl do defp handoff_child(child, state) do {_, _, child_pid} = Map.get(state.processes_by_id, child.id) + # we send a special exit signal to the process here. + # when the process has exited, Horde.ProcessSupervisor + # will cast `{:relinquish_child_process, child_id}` + # to this process for cleanup. + Horde.ProcessesSupervisor.send_exit_signal( supervisor_name(state.name), child_pid, diff --git a/test/cluster_test.exs b/test/cluster_test.exs index 576fb261..a4caab80 100644 --- a/test/cluster_test.exs +++ b/test/cluster_test.exs @@ -1,5 +1,5 @@ defmodule ClusterTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false describe "members option" do test "can join registry by specifying members in init" do From bb8a09dafd64d44f1c34cb793bee2eb71576f810 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Thu, 27 Feb 2020 08:14:17 +0100 Subject: [PATCH 2/3] Fix(?) network_partition_test and various cosmetic changes --- lib/horde/dynamic_supervisor_impl.ex | 35 ++-- lib/horde/registry.ex | 4 +- lib/horde/registry_impl.ex | 4 +- mix.lock | 34 ++-- test/netsplit_test.exs | 235 ++++++++++++++++++--------- 5 files changed, 192 insertions(+), 120 deletions(-) diff --git a/lib/horde/dynamic_supervisor_impl.ex b/lib/horde/dynamic_supervisor_impl.ex index f0ac79d9..3cc170cf 100644 --- a/lib/horde/dynamic_supervisor_impl.ex +++ b/lib/horde/dynamic_supervisor_impl.ex @@ -93,14 +93,9 @@ defmodule Horde.DynamicSupervisorImpl do @doc false def handle_call(:horde_shutting_down, _f, state) do - state = %{state | shutting_down: true} - - DeltaCrdt.mutate( - crdt_name(state.name), - :add, - [{:member_node_info, fully_qualified_name(state.name)}, node_info(state)], - :infinity - ) + state = + %{state | shutting_down: true} + |> set_own_node_status() {:reply, :ok, state} end @@ -373,7 +368,7 @@ defmodule Horde.DynamicSupervisorImpl do |> update_processes(diffs) new_state = - if has_membership_change?(diffs) do + if has_membership_changed?(diffs) do monitor_supervisors(new_state) |> set_own_node_status() |> handle_quorum_change() @@ -386,19 +381,16 @@ defmodule Horde.DynamicSupervisorImpl do {:noreply, new_state} end - def has_membership_change?([{:add, {:member_node_info, _}, _} | _diffs]), do: true - - def has_membership_change?([{:remove, {:member_node_info, _}} | _diffs]), do: true + def has_membership_changed?([{:add, {:member_node_info, _}, _} = diff | _diffs]), do: true + def has_membership_changed?([{:remove, {:member_node_info, _}} = diff | _diffs]), do: true + def has_membership_changed?([{:add, {:member, _}, _} = diff | _diffs]), do: true + def has_membership_changed?([{:remove, {:member, _}} = diff | _diffs]), do: true - def has_membership_change?([{:add, {:member, _}, _} | _diffs]), do: true - - def has_membership_change?([{:remove, {:member, _}} | _diffs]), do: true - - def has_membership_change?([_diff | diffs]) do - has_membership_change?(diffs) + def has_membership_changed?([_diff | diffs]) do + has_membership_changed?(diffs) end - def has_membership_change?([]), do: false + def has_membership_changed?([]), do: false defp handoff_processes(state) do this_node = fully_qualified_name(state.name) @@ -426,12 +418,13 @@ defmodule Horde.DynamicSupervisorImpl do state end - {_other_node, ^this_node} -> + {current_node, ^this_node} -> # process is running on another node but belongs here case current_member do %{status: :dead} -> - {_, state} = add_child(randomize_child_id(child_spec), state) + {response, state} = add_child(randomize_child_id(child_spec), state) + state _ -> diff --git a/lib/horde/registry.ex b/lib/horde/registry.ex index cd5198c0..bf1a2c32 100644 --- a/lib/horde/registry.ex +++ b/lib/horde/registry.ex @@ -415,7 +415,9 @@ defmodule Horde.Registry do defp process_alive?(pid) do n = node(pid) - Node.list() |> Enum.member?(n) && :rpc.call(n, Process, :alive?, [pid]) + + Node.list() |> Enum.member?(n) && + :rpc.call(n, Process, :alive?, [pid]) end defp member_in_cluster?(registry, member) do diff --git a/lib/horde/registry_impl.ex b/lib/horde/registry_impl.ex index f400ecf7..3dc5a678 100644 --- a/lib/horde/registry_impl.ex +++ b/lib/horde/registry_impl.ex @@ -112,7 +112,9 @@ defmodule Horde.RegistryImpl do {:noreply, new_state} end - def handle_info({:EXIT, pid, _reason}, state) do + def handle_info({:EXIT, pid, reason}, state) do + Logger.debug("#{inspect(node)} #{inspect(pid)} is dead, removing: #{inspect(reason)}") + case :ets.take(state.pids_ets_table, pid) do [{_pid, keys}] -> Enum.each(keys, fn key -> diff --git a/mix.lock b/mix.lock index 5bc5b2e6..a030ddb9 100644 --- a/mix.lock +++ b/mix.lock @@ -1,21 +1,21 @@ %{ - "benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm"}, - "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm"}, + "benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"}, + "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, "delta_crdt": {:hex, :delta_crdt, "0.5.10", "e866f8d1b89bee497a98b9793e9ba0ea514112a1c41a0c30dcde3463d4984d14", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, - "dialyxir": {:hex, :dialyxir, "1.0.0-rc.6", "78e97d9c0ff1b5521dd68041193891aebebce52fc3b93463c0a6806874557d7d", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm"}, - "earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm"}, - "erlex": {:hex, :erlex, "0.2.4", "23791959df45fe8f01f388c6f7eb733cc361668cbeedd801bf491c55a029917b", [:mix], [], "hexpm"}, - "ex_doc": {:hex, :ex_doc, "0.20.2", "1bd0dfb0304bade58beb77f20f21ee3558cc3c753743ae0ddbb0fd7ba2912331", [:mix], [{:earmark, "~> 1.3", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"}, - "global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm"}, - "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm"}, - "local_cluster": {:hex, :local_cluster, "1.1.0", "a2a0e3e965aa1549939108066bfa537ce89f0107917f5b0260153e2fdb304116", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm"}, - "makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, - "merkle_map": {:hex, :merkle_map, "0.2.0", "5391ac61e016ce4aeb66ce39f05206a382fd4b66ee4b63c08a261d5633eadd76", [:mix], [], "hexpm"}, + "dialyxir": {:hex, :dialyxir, "1.0.0-rc.6", "78e97d9c0ff1b5521dd68041193891aebebce52fc3b93463c0a6806874557d7d", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "49496d63267bc1a4614ffd5f67c45d9fc3ea62701a6797975bc98bc156d2763f"}, + "earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm", "e3be2bc3ae67781db529b80aa7e7c49904a988596e2dbff897425b48b3581161"}, + "erlex": {:hex, :erlex, "0.2.4", "23791959df45fe8f01f388c6f7eb733cc361668cbeedd801bf491c55a029917b", [:mix], [], "hexpm", "4a12ebc7cd8f24f2d0fce93d279fa34eb5068e0e885bb841d558c4d83c52c439"}, + "ex_doc": {:hex, :ex_doc, "0.20.2", "1bd0dfb0304bade58beb77f20f21ee3558cc3c753743ae0ddbb0fd7ba2912331", [:mix], [{:earmark, "~> 1.3", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "8e24fc8ff9a50b9f557ff020d6c91a03cded7e59ac3e0eec8a27e771430c7d27"}, + "global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm", "85d944cecd0f8f96b20ce70b5b16ebccedfcd25e744376b131e89ce61ba93176"}, + "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"}, + "local_cluster": {:hex, :local_cluster, "1.1.0", "a2a0e3e965aa1549939108066bfa537ce89f0107917f5b0260153e2fdb304116", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm", "fef6476083cf6f4c0526bb682de7ff75cd8b4bd4b7e20b3be60c1dab05f28ca7"}, + "makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5fbc8e549aa9afeea2847c0769e3970537ed302f93a23ac612602e805d9d1e7f"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "adf0218695e22caeda2820eaba703fa46c91820d53813a2223413da3ef4ba515"}, + "merkle_map": {:hex, :merkle_map, "0.2.0", "5391ac61e016ce4aeb66ce39f05206a382fd4b66ee4b63c08a261d5633eadd76", [:mix], [], "hexpm", "fb1cc3a80e0b0d439a83bdb42bde4d03e8970a436bc949b9fa8d951c18fdafde"}, "murmur": {:hex, :murmur, "1.0.1", "a6e6bced2dd0d666090a9cf3e73699f3b9176bbcf32d35b0f022f137667608e3", [:mix], [], "hexpm"}, - "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"}, - "schism": {:hex, :schism, "1.0.1", "b700883b4023b06faa5ab4add3aba5706877feb0a3dcfe8127b5dfeefe2513a5", [:mix], [], "hexpm"}, - "stream_data": {:hex, :stream_data, "0.4.3", "62aafd870caff0849a5057a7ec270fad0eb86889f4d433b937d996de99e3db25", [:mix], [], "hexpm"}, - "telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"}, - "telemetry_poller": {:hex, :telemetry_poller, "0.4.0", "da64dea54b77604023e8d15dc61a5df8968f4c9e013eba561bfb2bc614b15432", [:rebar3], [], "hexpm"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm", "5c040b8469c1ff1b10093d3186e2e10dbe483cd73d79ec017993fb3985b8a9b3"}, + "schism": {:hex, :schism, "1.0.1", "b700883b4023b06faa5ab4add3aba5706877feb0a3dcfe8127b5dfeefe2513a5", [:mix], [], "hexpm", "23080d2e0b4490eb2e207c8fee71d34bc0e58cc4f0f6879ca06b8fabe0c531ca"}, + "stream_data": {:hex, :stream_data, "0.4.3", "62aafd870caff0849a5057a7ec270fad0eb86889f4d433b937d996de99e3db25", [:mix], [], "hexpm", "7dafd5a801f0bc897f74fcd414651632b77ca367a7ae4568778191fc3bf3a19a"}, + "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"}, + "telemetry_poller": {:hex, :telemetry_poller, "0.4.0", "da64dea54b77604023e8d15dc61a5df8968f4c9e013eba561bfb2bc614b15432", [:rebar3], [], "hexpm", "f3374de85219675fceedd13386a39768c6f5e4b1a439a502da8c7dc142a43367"}, } diff --git a/test/netsplit_test.exs b/test/netsplit_test.exs index 5eaebd1d..0c5baa40 100644 --- a/test/netsplit_test.exs +++ b/test/netsplit_test.exs @@ -1,110 +1,185 @@ -defmodule NetsplitTest do +defmodule NetSplitTest do use ExUnit.Case + require Logger - @tag :skip - test "supervisor recovers after netsplit" do - [node1, node2] = nodes = LocalCluster.start_nodes("cluster-", 2) + Enum.each(1..1, fn x -> + test "test #{x}" do + nodes = LocalCluster.start_nodes("loner-cluster", 4, files: [__ENV__.file]) - Enum.each(nodes, fn node -> - assert :pong = Node.ping(node) - end) + [n1, n2, n3, n4] = nodes - [sup | _] = supervisors = Enum.map(nodes, fn node -> {:horde_supervisor, node} end) + Enum.each(nodes, &Node.spawn(&1, __MODULE__, :setup_horde, [nodes])) - Enum.each(supervisors, fn {name, node} -> - Node.spawn(node, LocalClusterHelper, :start, [ - Horde.DynamicSupervisor, - :start_link, - [[strategy: :one_for_one, name: name, members: supervisors]] - ]) - end) + Process.sleep(1000) - Process.sleep(1000) + num_procs = 1000 - Horde.DynamicSupervisor.start_child(sup, %{ - id: :first_child, - start: {EchoServer, :start_link, [self()]} - }) + Enum.each(1..num_procs, fn x -> + {:ok, _pid} = + :rpc.call(n1, Horde.DynamicSupervisor, :start_child, [ + TestNetSplitSup, + {TestNetSplitServer, name: :"test_netsplit_server_#{x}"} + ]) + end) - assert_receive {n, :hello_echo_server} - [other_node] = nodes -- [n] - refute_receive {^other_node, :hello_echo_server}, 1000 + Process.sleep(1000) - Schism.partition([node1]) + Logger.info("CREATING SCHISM") - assert_receive {^node1, :hello_echo_server}, 60_000 - assert_receive {^node2, :hello_echo_server}, 60_000 + g1 = [n1, n2] + Schism.partition(g1) - Schism.heal(nodes) + Process.sleep(2000) - assert_receive {^other_node, :hello_echo_server}, 1100 - refute_receive {^n, :hello_echo_server}, 1100 - end + Logger.debug("CHECKING NODE 1") + + pids = + Enum.map(1..num_procs, fn x -> + pid = + :rpc.call(n1, Horde.Registry, :whereis_name, [ + {TestReg2, :"test_netsplit_server_#{x}"} + ]) + + assert {:"server_#{x}", pid, is_pid(pid)} == {:"server_#{x}", pid, true} + pid + end) + + assert pids |> Enum.uniq() |> length == num_procs + assert Enum.all?(pids, &is_pid/1) + + Logger.debug("CHECKING NODE 3") + + pids = + Enum.map(1..num_procs, fn x -> + pid = + :rpc.call(n3, Horde.Registry, :whereis_name, [ + {TestReg2, :"test_netsplit_server_#{x}"} + ]) - test "name conflict after healing netsplit" do - cluster_name = "cluster" - server_name = "server" - sleep_millis = 2000 + assert {:"server_#{x}", pid, is_pid(pid)} == {:"server_#{x}", pid, true} + pid + end) - [node1, node2, node3] = nodes = LocalCluster.start_nodes(cluster_name, 3) + assert pids |> Enum.uniq() |> length == num_procs + assert Enum.all?(pids, &is_pid/1) - Enum.each(nodes, fn node -> - assert :pong = Node.ping(node) - end) + Logger.info("HEALING SCHISM") - # Start a test supervision tree in all three nodes - Enum.each(nodes, fn node -> - Node.spawn(node, LocalClusterHelper, :start, [ - MySupervisionTree, - :start_link, - [[cluster: cluster_name, distribution: Horde.UniformQuorumDistribution, sync_interval: 5]] - ]) - end) + Schism.heal(nodes) - # Wait for supervisor and registry in all nodes - Process.sleep(sleep_millis) + Process.sleep(2000) - Enum.each(nodes, fn node -> - assert MySupervisor.alive?(node) - assert MyRegistry.alive?(node) - end) + pids = + Enum.map(1..num_procs, fn x -> + pid = + :rpc.call(hd(nodes), Horde.Registry, :whereis_name, [ + {TestReg2, :"test_netsplit_server_#{x}"} + ]) - Schism.partition([node1, node2]) - Schism.partition([node3]) + assert {:"server_#{x}", pid, is_pid(pid)} == {:"server_#{x}", pid, true} + pid + end) - Process.sleep(sleep_millis) + :rpc.call(hd(nodes), Horde.DynamicSupervisor, :which_children, [TestNetSplitSup]) - Enum.each(nodes, fn node -> - assert MySupervisor.alive?(node) - assert MyRegistry.alive?(node) - end) + assert pids |> Enum.uniq() |> length == num_procs + assert Enum.all?(pids, &is_pid/1) + end + end) - # Create a server with the same name in both partitions - {:ok, pid1} = MyCluster.start_server(node1, server_name) - {:ok, pid2} = MyCluster.start_server(node3, server_name) + def setup_horde(nodes) do + {:ok, _} = Application.ensure_all_started(:horde) - assert Enum.member?([node1, node2], node(pid1)) - assert node3 == node(pid2) + registries = for n <- nodes, do: {TestReg2, n} + supervisors = for n <- nodes, do: {TestNetSplitSup, n} - Process.sleep(sleep_millis) + {:ok, _} = + Horde.Registry.start_link( + name: TestReg2, + keys: :unique, + delta_crdt_options: [sync_interval: 250], + members: registries + ) - # Heal the cluster - Schism.heal(nodes) - Process.sleep(sleep_millis) + {:ok, _} = + Horde.DynamicSupervisor.start_link( + name: TestNetSplitSup, + strategy: :one_for_one, + delta_crdt_options: [sync_interval: 200], + members: supervisors + ) + + :telemetry.attach( + "delta-crdt-syncs", + [:delta_crdt, :sync, :done], + fn _, %{keys_updated_count: count}, _, _ -> + Logger.debug("#{inspect(node())} delta_crdt synced #{count} keys") + end, + nil + ) + + receive do + end + end +end + +defmodule TestNetSplitServer do + require Logger + use GenServer, restart: :transient + + def start_link(args) do + GenServer.start_link(__MODULE__, args, + name: {:via, Horde.Registry, {TestReg2, Keyword.get(args, :name)}} + ) + |> case do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + Logger.debug( + "#{inspect(node())} #{inspect(pid)} server_#{Keyword.get(args, :name)} already started" + ) + + :ignore + end + end - Enum.each(nodes, fn node -> - assert MySupervisor.alive?(node) - assert MyRegistry.alive?(node) - end) + def init(args) do + Process.flag(:trap_exit, true) - # Verify all nodes are able to see the same pid for that server - pid1 = MyCluster.whereis_server(node1, server_name) - pid2 = MyCluster.whereis_server(node2, server_name) - pid3 = MyCluster.whereis_server(node3, server_name) + Logger.debug( + "#{inspect(node())} #{inspect(self())} server_#{Keyword.get(args, :name)} started" + ) + + do_ping(args) + + {:ok, args} + end + + def handle_info(:ping, state) do + do_ping(state) + {:noreply, state} + end + + defp do_ping(state) do + Logger.debug( + "#{inspect(node())} #{inspect(self())} server_#{Keyword.get(state, :name)} still running" + ) + + Process.send_after(self(), :ping, 100) + end + + def handle_info({:EXIT, _, {:name_conflict, _, _, _}} = msg, state) do + Logger.debug( + "#{inspect(node())} #{inspect(self())} server_#{Keyword.get(state, :name)} stopped because of name conflict" + ) + + {:stop, :normal, state} + end - assert is_pid(pid1) - assert pid1 == pid2 - assert pid1 == pid3 - assert pid2 == pid3 + def terminate(state) do + Logger.debug( + "#{inspect(node())} #{inspect(self())} server_#{Keyword.get(state, :name)} terminated normally" + ) end end From 62cf425ed0254ff044f06582631e2f103ac4c408 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Thu, 27 Feb 2020 08:23:17 +0100 Subject: [PATCH 3/3] Fix compiler warnings --- lib/horde/dynamic_supervisor_impl.ex | 16 +++++++--------- lib/horde/registry_impl.ex | 4 +--- mix.lock | 2 +- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/lib/horde/dynamic_supervisor_impl.ex b/lib/horde/dynamic_supervisor_impl.ex index 3cc170cf..93e81687 100644 --- a/lib/horde/dynamic_supervisor_impl.ex +++ b/lib/horde/dynamic_supervisor_impl.ex @@ -381,10 +381,10 @@ defmodule Horde.DynamicSupervisorImpl do {:noreply, new_state} end - def has_membership_changed?([{:add, {:member_node_info, _}, _} = diff | _diffs]), do: true - def has_membership_changed?([{:remove, {:member_node_info, _}} = diff | _diffs]), do: true - def has_membership_changed?([{:add, {:member, _}, _} = diff | _diffs]), do: true - def has_membership_changed?([{:remove, {:member, _}} = diff | _diffs]), do: true + def has_membership_changed?([{:add, {:member_node_info, _}, _} = _diff | _diffs]), do: true + def has_membership_changed?([{:remove, {:member_node_info, _}} = _diff | _diffs]), do: true + def has_membership_changed?([{:add, {:member, _}, _} = _diff | _diffs]), do: true + def has_membership_changed?([{:remove, {:member, _}} = _diff | _diffs]), do: true def has_membership_changed?([_diff | diffs]) do has_membership_changed?(diffs) @@ -402,7 +402,7 @@ defmodule Horde.DynamicSupervisorImpl do current_member = Map.get(state.members_info, current_node) case {current_node, chosen_node} do - {_same_node, _same_node} -> + {same_node, same_node} -> # process is running on the node on which it belongs state @@ -418,12 +418,12 @@ defmodule Horde.DynamicSupervisorImpl do state end - {current_node, ^this_node} -> + {_current_node, ^this_node} -> # process is running on another node but belongs here case current_member do %{status: :dead} -> - {response, state} = add_child(randomize_child_id(child_spec), state) + {_response, state} = add_child(randomize_child_id(child_spec), state) state @@ -469,8 +469,6 @@ defmodule Horde.DynamicSupervisorImpl do end defp update_process(state, {:add, {:process, child_id}, {node, child_spec, child_pid}}) do - this_node = fully_qualified_name(state.name) - new_process_pid_to_id = case Map.get(state.processes_by_id, child_id) do {_, _, old_pid} -> Map.delete(state.process_pid_to_id, old_pid) diff --git a/lib/horde/registry_impl.ex b/lib/horde/registry_impl.ex index 3dc5a678..f400ecf7 100644 --- a/lib/horde/registry_impl.ex +++ b/lib/horde/registry_impl.ex @@ -112,9 +112,7 @@ defmodule Horde.RegistryImpl do {:noreply, new_state} end - def handle_info({:EXIT, pid, reason}, state) do - Logger.debug("#{inspect(node)} #{inspect(pid)} is dead, removing: #{inspect(reason)}") - + def handle_info({:EXIT, pid, _reason}, state) do case :ets.take(state.pids_ets_table, pid) do [{_pid, keys}] -> Enum.each(keys, fn key -> diff --git a/mix.lock b/mix.lock index a030ddb9..56b88b4d 100644 --- a/mix.lock +++ b/mix.lock @@ -1,7 +1,7 @@ %{ "benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, - "delta_crdt": {:hex, :delta_crdt, "0.5.10", "e866f8d1b89bee497a98b9793e9ba0ea514112a1c41a0c30dcde3463d4984d14", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, + "delta_crdt": {:hex, :delta_crdt, "0.5.10", "e866f8d1b89bee497a98b9793e9ba0ea514112a1c41a0c30dcde3463d4984d14", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ed5c685df9528788d7c056762c23f75358f3cadd4779698188a55ccae24d087a"}, "dialyxir": {:hex, :dialyxir, "1.0.0-rc.6", "78e97d9c0ff1b5521dd68041193891aebebce52fc3b93463c0a6806874557d7d", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "49496d63267bc1a4614ffd5f67c45d9fc3ea62701a6797975bc98bc156d2763f"}, "earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm", "e3be2bc3ae67781db529b80aa7e7c49904a988596e2dbff897425b48b3581161"}, "erlex": {:hex, :erlex, "0.2.4", "23791959df45fe8f01f388c6f7eb733cc361668cbeedd801bf491c55a029917b", [:mix], [], "hexpm", "4a12ebc7cd8f24f2d0fce93d279fa34eb5068e0e885bb841d558c4d83c52c439"},