From 973e813a39896bba6724cfb8fddd0bb8018be9ad Mon Sep 17 00:00:00 2001 From: Stas Date: Tue, 12 Mar 2024 15:18:51 +0100 Subject: [PATCH] fix: stop pool connection if credentials fail (#321) --- VERSION | 2 +- config/runtime.exs | 3 ++- lib/supavisor/client_handler.ex | 8 +++++++- lib/supavisor/db_handler.ex | 24 +++++++++++++++++++++- priv/repo/seeds_after_migration.exs | 25 +++++++++++++++++++++++ test/integration/proxy_test.exs | 31 +++++++++++++++++++++++++++++ 6 files changed, 89 insertions(+), 4 deletions(-) diff --git a/VERSION b/VERSION index 2c69d9f9..633becb7 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.1.43 +1.1.44 diff --git a/config/runtime.exs b/config/runtime.exs index ab8a2429..51b008a2 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -154,7 +154,8 @@ if config_env() != :test do prom_poll_rate: System.get_env("PROM_POLL_RATE", "15000") |> String.to_integer(), global_upstream_ca: upstream_ca, global_downstream_cert: downstream_cert, - global_downstream_key: downstream_key + global_downstream_key: downstream_key, + reconnect_on_db_close: System.get_env("RECONNECT_ON_DB_CLOSE") == "true" config :supavisor, Supavisor.Repo, url: System.get_env("DATABASE_URL", "ecto://postgres:postgres@localhost:6432/postgres"), diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index c0f78851..76b94a1f 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -498,8 +498,9 @@ defmodule Supavisor.ClientHandler do end # linked DbHandler went down - def handle_event(:info, {:EXIT, db_pid, reason}, _, _) do + def handle_event(:info, {:EXIT, db_pid, reason}, _, data) do Logger.error("ClientHandler: DbHandler #{inspect(db_pid)} exited #{inspect(reason)}") + HH.sock_send(data.sock, Server.error_message("XX000", "DbHandler exited")) {:stop, {:shutdown, :db_handler_exit}} end @@ -521,6 +522,11 @@ defmodule Supavisor.ClientHandler do end end + def handle_event(:info, {:disconnect, reason}, _, _data) do + Logger.warning("ClientHandler: Disconnected due to #{inspect(reason)}") + {:stop, {:shutdown, {:disconnect, reason}}} + end + # emulate handle_cast def handle_event(:cast, {:client_cast, bin, status}, _, data) do Logger.debug("ClientHandler: --> --> bin #{inspect(byte_size(bin))} bytes") diff --git a/lib/supavisor/db_handler.ex b/lib/supavisor/db_handler.ex index 7610468a..a5611268 100644 --- a/lib/supavisor/db_handler.ex +++ b/lib/supavisor/db_handler.ex @@ -245,6 +245,25 @@ defmodule Supavisor.DbHandler do {_, :authentication_md5} -> {:keep_state, data} + {:error_response, ["SFATAL", "VFATAL", "C28P01", reason, _, _, _]} -> + tenant = Supavisor.tenant(data.id) + + for node <- [node() | Node.list()] do + :erpc.cast(node, fn -> + Cachex.del(Supavisor.Cache, {:secrets, tenant, data.user}) + Cachex.del(Supavisor.Cache, {:secrets_check, tenant, data.user}) + + Registry.dispatch(Supavisor.Registry.TenantClients, data.id, fn entries -> + for {client_handler, _meta} <- entries, + do: send(client_handler, {:disconnect, reason}) + end) + end) + end + + Supavisor.stop(data.id) + Logger.error("DbHandler: Auth error #{inspect(reason)}") + {:stop, :invalid_password, data} + {:error_response, error} -> Logger.error("DbHandler: Error auth response #{inspect(error)}") {:keep_state, data} @@ -394,7 +413,10 @@ defmodule Supavisor.DbHandler do def handle_event(_, {closed, _}, state, data) when closed in @sock_closed do Logger.error("DbHandler: Connection closed when state was #{state}") - {:next_state, :connect, data, {:state_timeout, 2_500, :connect}} + + if Application.get_env(:supavisor, :reconnect_on_db_close), + do: {:next_state, :connect, data, {:state_timeout, @reconnect_timeout, :connect}}, + else: {:stop, :db_termination, data} end # linked client_handler went down diff --git a/priv/repo/seeds_after_migration.exs b/priv/repo/seeds_after_migration.exs index 42f781fb..3a99bfe1 100644 --- a/priv/repo/seeds_after_migration.exs +++ b/priv/repo/seeds_after_migration.exs @@ -16,6 +16,29 @@ end _ -> nil end +if !Tenants.get_tenant_by_external_id("is_manager") do + %{ + db_host: db_conf[:hostname], + db_port: db_conf[:port], + db_database: db_conf[:database], + external_id: "is_manager", + default_parameter_status: %{server_version: version}, + require_user: false, + auth_query: "SELECT rolname, rolpassword FROM pg_authid WHERE rolname=$1;", + users: [ + %{ + "db_user" => db_conf[:username], + "db_password" => db_conf[:password], + "pool_size" => 2, + "mode_type" => "transaction", + "is_manager" => true, + "pool_checkout_timeout" => 1000 + } + ] + } + |> Tenants.create_tenant() +end + ["proxy_tenant1", "syn_tenant", "prom_tenant"] |> Enum.each(fn tenant -> if !Tenants.get_tenant_by_external_id(tenant) do @@ -66,6 +89,8 @@ end) {:ok, _} = Repo.transaction(fn -> [ + "drop user if exists dev_postgres;", + "create user dev_postgres with password 'postgres';", "drop table if exists \"public\".\"test\";", "create sequence if not exists test_id_seq;", "create table \"public\".\"test\" ( diff --git a/test/integration/proxy_test.exs b/test/integration/proxy_test.exs index 2c12a9ac..05ed3474 100644 --- a/test/integration/proxy_test.exs +++ b/test/integration/proxy_test.exs @@ -217,6 +217,37 @@ defmodule Supavisor.Integration.ProxyTest do }, _}}} = parse_uri(url) |> single_connection() end + test "change role password", %{origin: origin} do + Process.flag(:trap_exit, true) + db_conf = Application.get_env(:supavisor, Repo) + + conn = fn pass -> + "postgresql://dev_postgres.is_manager:#{pass}@#{db_conf[:hostname]}:#{Application.get_env(:supavisor, :proxy_port_transaction)}/postgres?sslmode=disable" + end + + first_pass = conn.("postgres") + new_pass = conn.("postgres_new") + + {:ok, pid} = parse_uri(first_pass) |> single_connection() + + assert [%Postgrex.Result{rows: [["1"]]}] = P.SimpleConnection.call(pid, {:query, "select 1;"}) + + P.query(origin, "alter user dev_postgres with password 'postgres_new';", []) + Supavisor.stop({{:single, "is_manager"}, "dev_postgres", :transaction, "postgres"}) + + :timer.sleep(1000) + + assert {:error, + {_, + {:stop, + %Postgrex.Error{ + message: "error received in SCRAM server final message: \"Wrong password\"" + }, _}}} = parse_uri(new_pass) |> single_connection() + + {:ok, pid} = parse_uri(new_pass) |> single_connection() + assert [%Postgrex.Result{rows: [["1"]]}] = P.SimpleConnection.call(pid, {:query, "select 1;"}) + end + defp single_connection(db_conf, c_port \\ nil) when is_list(db_conf) do port = c_port || db_conf[:port]