Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic Supervisor does not ensure uniqueness #196

Merged
merged 3 commits into from
Mar 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 39 additions & 44 deletions lib/horde/dynamic_supervisor_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,9 @@ defmodule Horde.DynamicSupervisorImpl do

@doc false
def handle_call(:horde_shutting_down, _f, state) do
state = %{state | shutting_down: true}

DeltaCrdt.mutate(
crdt_name(state.name),
:add,
[{:member_node_info, fully_qualified_name(state.name)}, node_info(state)],
:infinity
)
state =
%{state | shutting_down: true}
|> set_own_node_status()

{:reply, :ok, state}
end
Expand Down Expand Up @@ -373,7 +368,7 @@ defmodule Horde.DynamicSupervisorImpl do
|> update_processes(diffs)

new_state =
if has_membership_change?(diffs) do
if has_membership_changed?(diffs) do
monitor_supervisors(new_state)
|> set_own_node_status()
|> handle_quorum_change()
Expand All @@ -386,19 +381,16 @@ defmodule Horde.DynamicSupervisorImpl do
{:noreply, new_state}
end

def has_membership_change?([{:add, {:member_node_info, _}, _} | _diffs]), do: true

def has_membership_change?([{:remove, {:member_node_info, _}} | _diffs]), do: true

def has_membership_change?([{:add, {:member, _}, _} | _diffs]), do: true
def has_membership_changed?([{:add, {:member_node_info, _}, _} = _diff | _diffs]), do: true
def has_membership_changed?([{:remove, {:member_node_info, _}} = _diff | _diffs]), do: true
def has_membership_changed?([{:add, {:member, _}, _} = _diff | _diffs]), do: true
def has_membership_changed?([{:remove, {:member, _}} = _diff | _diffs]), do: true

def has_membership_change?([{:remove, {:member, _}} | _diffs]), do: true

def has_membership_change?([_diff | diffs]) do
has_membership_change?(diffs)
def has_membership_changed?([_diff | diffs]) do
has_membership_changed?(diffs)
end

def has_membership_change?([]), do: false
def has_membership_changed?([]), do: false

defp handoff_processes(state) do
this_node = fully_qualified_name(state.name)
Expand All @@ -409,19 +401,15 @@ defmodule Horde.DynamicSupervisorImpl do
{:ok, %{name: chosen_node}} ->
current_member = Map.get(state.members_info, current_node)

cond do
this_node != current_node and this_node == chosen_node ->
# handle_dead_nodes
case current_member do
%{status: :dead} ->
{_, state} = add_child(child_spec, state)
state
case {current_node, chosen_node} do
{same_node, same_node} ->
# process is running on the node on which it belongs

_ ->
state
end
state

{^this_node, _other_node} ->
# process is running here but belongs somewhere else

this_node == current_node and chosen_node != this_node ->
case state.supervisor_options[:process_redistribution] do
:active ->
handoff_child(child_spec, state)
Expand All @@ -430,7 +418,22 @@ defmodule Horde.DynamicSupervisorImpl do
state
end

true ->
{_current_node, ^this_node} ->
# process is running on another node but belongs here

case current_member do
%{status: :dead} ->
{_response, state} = add_child(randomize_child_id(child_spec), state)

state

_ ->
state
end

{_other_node1, _other_node2} ->
# process is neither running here nor belongs here

state
end

Expand Down Expand Up @@ -466,19 +469,6 @@ defmodule Horde.DynamicSupervisorImpl do
end

defp update_process(state, {:add, {:process, child_id}, {node, child_spec, child_pid}}) do
this_node = fully_qualified_name(state.name)

case {Map.get(state.processes_by_id, child_id), node} do
{{^this_node, _child_spec, _child_pid}, ^this_node} ->
nil

{{^this_node, _child_spec, _child_pid}, _other_node} ->
Horde.ProcessesSupervisor.terminate_child_by_id(supervisor_name(state.name), child_id)

_ ->
nil
end

new_process_pid_to_id =
case Map.get(state.processes_by_id, child_id) do
{_, _, old_pid} -> Map.delete(state.process_pid_to_id, old_pid)
Expand Down Expand Up @@ -689,6 +679,11 @@ defmodule Horde.DynamicSupervisorImpl do
defp handoff_child(child, state) do
{_, _, child_pid} = Map.get(state.processes_by_id, child.id)

# we send a special exit signal to the process here.
# when the process has exited, Horde.ProcessSupervisor
# will cast `{:relinquish_child_process, child_id}`
# to this process for cleanup.

Horde.ProcessesSupervisor.send_exit_signal(
supervisor_name(state.name),
child_pid,
Expand Down
4 changes: 3 additions & 1 deletion lib/horde/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,9 @@ defmodule Horde.Registry do

defp process_alive?(pid) do
n = node(pid)
Node.list() |> Enum.member?(n) && :rpc.call(n, Process, :alive?, [pid])

Node.list() |> Enum.member?(n) &&
:rpc.call(n, Process, :alive?, [pid])
end

defp member_in_cluster?(registry, member) do
Expand Down
36 changes: 18 additions & 18 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
%{
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm"},
"delta_crdt": {:hex, :delta_crdt, "0.5.10", "e866f8d1b89bee497a98b9793e9ba0ea514112a1c41a0c30dcde3463d4984d14", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"dialyxir": {:hex, :dialyxir, "1.0.0-rc.6", "78e97d9c0ff1b5521dd68041193891aebebce52fc3b93463c0a6806874557d7d", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm"},
"earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm"},
"erlex": {:hex, :erlex, "0.2.4", "23791959df45fe8f01f388c6f7eb733cc361668cbeedd801bf491c55a029917b", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.20.2", "1bd0dfb0304bade58beb77f20f21ee3558cc3c753743ae0ddbb0fd7ba2912331", [:mix], [{:earmark, "~> 1.3", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
"global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm"},
"libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm"},
"local_cluster": {:hex, :local_cluster, "1.1.0", "a2a0e3e965aa1549939108066bfa537ce89f0107917f5b0260153e2fdb304116", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm"},
"makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
"merkle_map": {:hex, :merkle_map, "0.2.0", "5391ac61e016ce4aeb66ce39f05206a382fd4b66ee4b63c08a261d5633eadd76", [:mix], [], "hexpm"},
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
"delta_crdt": {:hex, :delta_crdt, "0.5.10", "e866f8d1b89bee497a98b9793e9ba0ea514112a1c41a0c30dcde3463d4984d14", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ed5c685df9528788d7c056762c23f75358f3cadd4779698188a55ccae24d087a"},
"dialyxir": {:hex, :dialyxir, "1.0.0-rc.6", "78e97d9c0ff1b5521dd68041193891aebebce52fc3b93463c0a6806874557d7d", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "49496d63267bc1a4614ffd5f67c45d9fc3ea62701a6797975bc98bc156d2763f"},
"earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm", "e3be2bc3ae67781db529b80aa7e7c49904a988596e2dbff897425b48b3581161"},
"erlex": {:hex, :erlex, "0.2.4", "23791959df45fe8f01f388c6f7eb733cc361668cbeedd801bf491c55a029917b", [:mix], [], "hexpm", "4a12ebc7cd8f24f2d0fce93d279fa34eb5068e0e885bb841d558c4d83c52c439"},
"ex_doc": {:hex, :ex_doc, "0.20.2", "1bd0dfb0304bade58beb77f20f21ee3558cc3c753743ae0ddbb0fd7ba2912331", [:mix], [{:earmark, "~> 1.3", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "8e24fc8ff9a50b9f557ff020d6c91a03cded7e59ac3e0eec8a27e771430c7d27"},
"global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm", "85d944cecd0f8f96b20ce70b5b16ebccedfcd25e744376b131e89ce61ba93176"},
"libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"},
"local_cluster": {:hex, :local_cluster, "1.1.0", "a2a0e3e965aa1549939108066bfa537ce89f0107917f5b0260153e2fdb304116", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm", "fef6476083cf6f4c0526bb682de7ff75cd8b4bd4b7e20b3be60c1dab05f28ca7"},
"makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5fbc8e549aa9afeea2847c0769e3970537ed302f93a23ac612602e805d9d1e7f"},
"makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "adf0218695e22caeda2820eaba703fa46c91820d53813a2223413da3ef4ba515"},
"merkle_map": {:hex, :merkle_map, "0.2.0", "5391ac61e016ce4aeb66ce39f05206a382fd4b66ee4b63c08a261d5633eadd76", [:mix], [], "hexpm", "fb1cc3a80e0b0d439a83bdb42bde4d03e8970a436bc949b9fa8d951c18fdafde"},
"murmur": {:hex, :murmur, "1.0.1", "a6e6bced2dd0d666090a9cf3e73699f3b9176bbcf32d35b0f022f137667608e3", [:mix], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
"schism": {:hex, :schism, "1.0.1", "b700883b4023b06faa5ab4add3aba5706877feb0a3dcfe8127b5dfeefe2513a5", [:mix], [], "hexpm"},
"stream_data": {:hex, :stream_data, "0.4.3", "62aafd870caff0849a5057a7ec270fad0eb86889f4d433b937d996de99e3db25", [:mix], [], "hexpm"},
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"},
"telemetry_poller": {:hex, :telemetry_poller, "0.4.0", "da64dea54b77604023e8d15dc61a5df8968f4c9e013eba561bfb2bc614b15432", [:rebar3], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm", "5c040b8469c1ff1b10093d3186e2e10dbe483cd73d79ec017993fb3985b8a9b3"},
"schism": {:hex, :schism, "1.0.1", "b700883b4023b06faa5ab4add3aba5706877feb0a3dcfe8127b5dfeefe2513a5", [:mix], [], "hexpm", "23080d2e0b4490eb2e207c8fee71d34bc0e58cc4f0f6879ca06b8fabe0c531ca"},
"stream_data": {:hex, :stream_data, "0.4.3", "62aafd870caff0849a5057a7ec270fad0eb86889f4d433b937d996de99e3db25", [:mix], [], "hexpm", "7dafd5a801f0bc897f74fcd414651632b77ca367a7ae4568778191fc3bf3a19a"},
"telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
"telemetry_poller": {:hex, :telemetry_poller, "0.4.0", "da64dea54b77604023e8d15dc61a5df8968f4c9e013eba561bfb2bc614b15432", [:rebar3], [], "hexpm", "f3374de85219675fceedd13386a39768c6f5e4b1a439a502da8c7dc142a43367"},
}
2 changes: 1 addition & 1 deletion test/cluster_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule ClusterTest do
use ExUnit.Case, async: true
use ExUnit.Case, async: false

describe "members option" do
test "can join registry by specifying members in init" do
Expand Down
Loading