Skip to content

Commit

Permalink
Add TenantManager that spawns the necessary processes per tenant.
Browse files Browse the repository at this point in the history
Modify storage to isolate tenant data.

Endpoint to add a DB

Use a dynamic supervisor for dynamically spawning tenant processes.

Fix format of opts in serve shape plug test

Store timeline information per tenant.

Use ETS table per tenant in ETS inspector.

Create ETS table per tenant in shape cache.

Remove dbg statement

Fix timeline tests

Fix shape status tests.

Fix shape cache test

Fix shape log collector test

Fix consumer test.

Fix shape plug tests.

Fix router test
  • Loading branch information
kevin-dp authored and alco committed Oct 17, 2024
1 parent 4bf713d commit d3ccf09
Show file tree
Hide file tree
Showing 35 changed files with 1,533 additions and 369 deletions.
22 changes: 8 additions & 14 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ end

if config_env() == :test do
config(:logger, level: :info)
config(:electric, test_mode: true)
config(:electric, pg_version_for_tests: env!("POSTGRES_VERSION", :integer, 150_001))
end

Expand All @@ -34,6 +35,8 @@ service_name = env!("ELECTRIC_SERVICE_NAME", :string, "electric")
instance_id = env!("ELECTRIC_INSTANCE_ID", :string, Electric.Utils.uuid4())
version = Electric.version()

test_tenant = "test_tenant"

config :telemetry_poller, :default, period: 500

config :opentelemetry,
Expand Down Expand Up @@ -81,28 +84,19 @@ otel_simple_processor =
config :opentelemetry,
processors: [otel_batch_processor, otel_simple_processor] |> Enum.reject(&is_nil/1)

connection_opts =
if Config.config_env() == :test do
[
if Config.config_env() == :test do
config :electric,
connection_opts: [
hostname: "localhost",
port: 54321,
username: "postgres",
password: "password",
database: "postgres",
sslmode: :disable
]
else
{:ok, database_url_config} =
env!("DATABASE_URL", :string)
|> Electric.ConfigParser.parse_postgresql_uri()

database_ipv6_config =
env!("DATABASE_USE_IPV6", :boolean, false)

database_url_config ++ [ipv6: database_ipv6_config]
end

config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts)
config :electric, test_tenant: test_tenant
end

enable_integration_testing = env!("ENABLE_INTEGRATION_TESTING", :boolean, false)
cache_max_age = env!("CACHE_MAX_AGE", :integer, 60)
Expand Down
104 changes: 61 additions & 43 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
defmodule Electric.Application do
use Application
require Config

@process_registry_name Electric.Registry.Processes
def process_registry, do: @process_registry_name

@spec process_name(atom(), atom()) :: {:via, atom(), atom()}
def process_name(electric_instance_id, module) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id}}}
@spec process_name(atom(), String.t(), atom()) :: {:via, atom(), atom()}
def process_name(electric_instance_id, tenant_id, module) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, tenant_id}}}
end

@spec process_name(atom(), atom(), term()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, module, id) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, id}}}
@spec process_name(atom(), String.t(), atom(), term()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, tenant_id, module, id) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, tenant_id, id}}}
end

@impl true
Expand All @@ -20,27 +21,27 @@ defmodule Electric.Application do

config = configure()

shape_log_collector = Electric.Replication.ShapeLogCollector.name(config.electric_instance_id)

connection_manager_opts = [
electric_instance_id: config.electric_instance_id,
connection_opts: config.connection_opts,
replication_opts: [
publication_name: config.replication_opts.publication_name,
try_creating_publication?: true,
slot_name: config.replication_opts.slot_name,
transaction_received:
{Electric.Replication.ShapeLogCollector, :store_transaction, [shape_log_collector]},
relation_received:
{Electric.Replication.ShapeLogCollector, :handle_relation_msg, [shape_log_collector]}
],
pool_opts: [
name: Electric.DbPool,
pool_size: config.pool_opts.size,
types: PgInterop.Postgrex.Types
],
persistent_kv: config.persistent_kv
]
# shape_log_collector = Electric.Replication.ShapeLogCollector.name(config.electric_instance_id)

# connection_manager_opts = [
# electric_instance_id: config.electric_instance_id,
# connection_opts: config.connection_opts,
# replication_opts: [
# publication_name: config.replication_opts.publication_name,
# try_creating_publication?: true,
# slot_name: config.replication_opts.slot_name,
# transaction_received:
# {Electric.Replication.ShapeLogCollector, :store_transaction, [shape_log_collector]},
# relation_received:
# {Electric.Replication.ShapeLogCollector, :handle_relation_msg, [shape_log_collector]}
# ],
# pool_opts: [
# name: Electric.DbPool,
# pool_size: config.pool_opts.size,
# types: PgInterop.Postgrex.Types
# ],
# persistent_kv: config.persistent_kv
# ]

# The root application supervisor starts the core global processes, including the HTTP
# server and the database connection manager. The latter is responsible for establishing
Expand All @@ -60,30 +61,47 @@ defmodule Electric.Application do
name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()},
{Registry,
name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()},
{Electric.Postgres.Inspector.EtsInspector, pool: Electric.DbPool},
# {Electric.Postgres.Inspector.EtsInspector, pool: Electric.DbPool},
Electric.TenantSupervisor,
Electric.TenantManager,
{Bandit,
plug:
{Electric.Plug.Router,
storage: config.storage,
registry: Registry.ShapeChanges,
shape_cache: {Electric.ShapeCache, config.shape_cache_opts},
get_service_status: &Electric.ServiceStatus.check/0,
inspector: config.inspector,
long_poll_timeout: 20_000,
max_age: Application.fetch_env!(:electric, :cache_max_age),
stale_age: Application.fetch_env!(:electric, :cache_stale_age),
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)},
storage: config.storage, tenant_manager: Electric.TenantManager},
port: Application.fetch_env!(:electric, :service_port),
thousand_island_options: http_listener_options()}
# {Bandit,
# plug:
# {Electric.Plug.Router,
# storage: config.storage,
# registry: Registry.ShapeChanges,
# shape_cache: {Electric.ShapeCache, config.shape_cache_opts},
# get_service_status: &Electric.ServiceStatus.check/0,
# inspector: config.inspector,
# long_poll_timeout: 20_000,
# max_age: Application.fetch_env!(:electric, :cache_max_age),
# stale_age: Application.fetch_env!(:electric, :cache_stale_age),
# allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)},
# port: Application.fetch_env!(:electric, :service_port),
# thousand_island_options: http_listener_options()}
],
prometheus_endpoint(Application.fetch_env!(:electric, :prometheus_port)),
[{Electric.Connection.Supervisor, connection_manager_opts}]
prometheus_endpoint(Application.fetch_env!(:electric, :prometheus_port))
# [{Electric.Connection.Supervisor, connection_manager_opts}]
])

Supervisor.start_link(children,
strategy: :one_for_one,
name: Electric.Supervisor
)
{:ok, sup_pid} =
Supervisor.start_link(children,
strategy: :one_for_one,
name: Electric.Supervisor
)

if Application.get_env(:electric, :test_mode, false) do
test_tenant = Application.fetch_env!(:electric, :test_tenant)
connection_opts = Application.fetch_env!(:electric, :connection_opts)
Electric.TenantManager.create_tenant(test_tenant, connection_opts)
end

{:ok, sup_pid}
end

# This function is called once in the application's start() callback. It reads configuration
Expand Down
35 changes: 23 additions & 12 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Electric.Connection.Manager do
connection_opts: [...],
replication_opts: [...],
pool_opts: [...],
persistent_kv: ...}
timeline_opts: [...]}
]
Supervisor.start_link(children, strategy: :one_for_one)
Expand All @@ -34,8 +34,8 @@ defmodule Electric.Connection.Manager do
:replication_opts,
# Database connection pool options
:pool_opts,
# Application's persistent key-value storage reference
:persistent_kv,
# Options specific to `Electric.Timeline`
:timeline_opts,
# PID of the replication client
:replication_client_pid,
# PID of the Postgres connection lock
Expand All @@ -55,7 +55,8 @@ defmodule Electric.Connection.Manager do
# PostgreSQL system identifier
:pg_system_identifier,
# PostgreSQL timeline ID
:pg_timeline_id
:pg_timeline_id,
:tenant_id
]
end

Expand All @@ -70,17 +71,25 @@ defmodule Electric.Connection.Manager do
| {:connection_opts, Keyword.t()}
| {:replication_opts, Keyword.t()}
| {:pool_opts, Keyword.t()}
| {:persistent_kv, map()}
| {:timeline_opts, Keyword.t()}

@type options :: [option]

@name __MODULE__

@lock_status_logging_interval 10_000

@spec start_link(options) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: @name)
GenServer.start_link(__MODULE__, opts, name: name(opts))
end

def name(electric_instance_id, tenant_id) do
Electric.Application.process_name(electric_instance_id, tenant_id, __MODULE__)
end

def name(opts) do
electric_instance_id = Keyword.fetch!(opts, :electric_instance_id)
tenant_id = Keyword.fetch!(opts, :tenant_id)
name(electric_instance_id, tenant_id)
end

@doc """
Expand Down Expand Up @@ -128,17 +137,18 @@ defmodule Electric.Connection.Manager do

pool_opts = Keyword.fetch!(opts, :pool_opts)

persistent_kv = Keyword.fetch!(opts, :persistent_kv)
timeline_opts = Keyword.fetch!(opts, :timeline_opts)

state =
%State{
connection_opts: connection_opts,
replication_opts: replication_opts,
pool_opts: pool_opts,
persistent_kv: persistent_kv,
timeline_opts: timeline_opts,
pg_lock_acquired: false,
backoff: {:backoff.init(1000, 10_000), nil},
electric_instance_id: Keyword.fetch!(opts, :electric_instance_id)
electric_instance_id: Keyword.fetch!(opts, :electric_instance_id),
tenant_id: Keyword.fetch!(opts, :tenant_id)
}

# Try to acquire the connection lock on the replication slot
Expand Down Expand Up @@ -224,11 +234,12 @@ defmodule Electric.Connection.Manager do
check_result =
Electric.Timeline.check(
{state.pg_system_identifier, state.pg_timeline_id},
state.persistent_kv
state.timeline_opts
)

{:ok, shapes_sup_pid} =
Electric.Connection.Supervisor.start_shapes_supervisor(
tenant_id: state.tenant_id,
purge_all_shapes?: check_result == :timeline_changed
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ defmodule Electric.Connection.Supervisor do
{
Electric.Shapes.Supervisor,
electric_instance_id: app_config.electric_instance_id,
tenant_id: Keyword.fetch!(opts, :tenant_id),
shape_cache: shape_cache_spec,
log_collector: shape_log_collector_spec
},
Expand Down
Loading

0 comments on commit d3ccf09

Please sign in to comment.