Skip to content

Commit

Permalink
fix: stop pool connection if credentials fail (#321)
Browse files Browse the repository at this point in the history
  • Loading branch information
abc3 authored Mar 12, 2024
1 parent e82dd75 commit 973e813
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 4 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.43
1.1.44
3 changes: 2 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ if config_env() != :test do
prom_poll_rate: System.get_env("PROM_POLL_RATE", "15000") |> String.to_integer(),
global_upstream_ca: upstream_ca,
global_downstream_cert: downstream_cert,
global_downstream_key: downstream_key
global_downstream_key: downstream_key,
reconnect_on_db_close: System.get_env("RECONNECT_ON_DB_CLOSE") == "true"

config :supavisor, Supavisor.Repo,
url: System.get_env("DATABASE_URL", "ecto://postgres:postgres@localhost:6432/postgres"),
Expand Down
8 changes: 7 additions & 1 deletion lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,9 @@ defmodule Supavisor.ClientHandler do
end

# linked DbHandler went down
def handle_event(:info, {:EXIT, db_pid, reason}, _, _) do
def handle_event(:info, {:EXIT, db_pid, reason}, _, data) do
Logger.error("ClientHandler: DbHandler #{inspect(db_pid)} exited #{inspect(reason)}")
HH.sock_send(data.sock, Server.error_message("XX000", "DbHandler exited"))
{:stop, {:shutdown, :db_handler_exit}}
end

Expand All @@ -521,6 +522,11 @@ defmodule Supavisor.ClientHandler do
end
end

def handle_event(:info, {:disconnect, reason}, _, _data) do
Logger.warning("ClientHandler: Disconnected due to #{inspect(reason)}")
{:stop, {:shutdown, {:disconnect, reason}}}
end

# emulate handle_cast
def handle_event(:cast, {:client_cast, bin, status}, _, data) do
Logger.debug("ClientHandler: --> --> bin #{inspect(byte_size(bin))} bytes")
Expand Down
24 changes: 23 additions & 1 deletion lib/supavisor/db_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,25 @@ defmodule Supavisor.DbHandler do
{_, :authentication_md5} ->
{:keep_state, data}

{:error_response, ["SFATAL", "VFATAL", "C28P01", reason, _, _, _]} ->
tenant = Supavisor.tenant(data.id)

for node <- [node() | Node.list()] do
:erpc.cast(node, fn ->
Cachex.del(Supavisor.Cache, {:secrets, tenant, data.user})
Cachex.del(Supavisor.Cache, {:secrets_check, tenant, data.user})

Registry.dispatch(Supavisor.Registry.TenantClients, data.id, fn entries ->
for {client_handler, _meta} <- entries,
do: send(client_handler, {:disconnect, reason})
end)
end)
end

Supavisor.stop(data.id)
Logger.error("DbHandler: Auth error #{inspect(reason)}")
{:stop, :invalid_password, data}

{:error_response, error} ->
Logger.error("DbHandler: Error auth response #{inspect(error)}")
{:keep_state, data}
Expand Down Expand Up @@ -394,7 +413,10 @@ defmodule Supavisor.DbHandler do

def handle_event(_, {closed, _}, state, data) when closed in @sock_closed do
Logger.error("DbHandler: Connection closed when state was #{state}")
{:next_state, :connect, data, {:state_timeout, 2_500, :connect}}

if Application.get_env(:supavisor, :reconnect_on_db_close),
do: {:next_state, :connect, data, {:state_timeout, @reconnect_timeout, :connect}},
else: {:stop, :db_termination, data}
end

# linked client_handler went down
Expand Down
25 changes: 25 additions & 0 deletions priv/repo/seeds_after_migration.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,29 @@ end
_ -> nil
end

if !Tenants.get_tenant_by_external_id("is_manager") do
%{
db_host: db_conf[:hostname],
db_port: db_conf[:port],
db_database: db_conf[:database],
external_id: "is_manager",
default_parameter_status: %{server_version: version},
require_user: false,
auth_query: "SELECT rolname, rolpassword FROM pg_authid WHERE rolname=$1;",
users: [
%{
"db_user" => db_conf[:username],
"db_password" => db_conf[:password],
"pool_size" => 2,
"mode_type" => "transaction",
"is_manager" => true,
"pool_checkout_timeout" => 1000
}
]
}
|> Tenants.create_tenant()
end

["proxy_tenant1", "syn_tenant", "prom_tenant"]
|> Enum.each(fn tenant ->
if !Tenants.get_tenant_by_external_id(tenant) do
Expand Down Expand Up @@ -66,6 +89,8 @@ end)
{:ok, _} =
Repo.transaction(fn ->
[
"drop user if exists dev_postgres;",
"create user dev_postgres with password 'postgres';",
"drop table if exists \"public\".\"test\";",
"create sequence if not exists test_id_seq;",
"create table \"public\".\"test\" (
Expand Down
31 changes: 31 additions & 0 deletions test/integration/proxy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,37 @@ defmodule Supavisor.Integration.ProxyTest do
}, _}}} = parse_uri(url) |> single_connection()
end

test "change role password", %{origin: origin} do
Process.flag(:trap_exit, true)
db_conf = Application.get_env(:supavisor, Repo)

conn = fn pass ->
"postgresql://dev_postgres.is_manager:#{pass}@#{db_conf[:hostname]}:#{Application.get_env(:supavisor, :proxy_port_transaction)}/postgres?sslmode=disable"
end

first_pass = conn.("postgres")
new_pass = conn.("postgres_new")

{:ok, pid} = parse_uri(first_pass) |> single_connection()

assert [%Postgrex.Result{rows: [["1"]]}] = P.SimpleConnection.call(pid, {:query, "select 1;"})

P.query(origin, "alter user dev_postgres with password 'postgres_new';", [])
Supavisor.stop({{:single, "is_manager"}, "dev_postgres", :transaction, "postgres"})

:timer.sleep(1000)

assert {:error,
{_,
{:stop,
%Postgrex.Error{
message: "error received in SCRAM server final message: \"Wrong password\""
}, _}}} = parse_uri(new_pass) |> single_connection()

{:ok, pid} = parse_uri(new_pass) |> single_connection()
assert [%Postgrex.Result{rows: [["1"]]}] = P.SimpleConnection.call(pid, {:query, "select 1;"})
end

defp single_connection(db_conf, c_port \\ nil) when is_list(db_conf) do
port = c_port || db_conf[:port]

Expand Down

0 comments on commit 973e813

Please sign in to comment.