Skip to content

Commit

Permalink
feat: support array functions in where clauses (#1840)
Browse files Browse the repository at this point in the history
Adds support for:
1. Usage of array columns in where clauses
2. Base operators for arrays: `@>`, `<@`, `&&`, `||`
3. Validation for polymorphic operators (`anyarray` pseudo-type and
others), paving way for other functions that accept arrays

Closes #1767, #1766
  • Loading branch information
icehaunter authored Oct 15, 2024
1 parent 4e2ffb5 commit 3ab27a6
Show file tree
Hide file tree
Showing 11 changed files with 1,259 additions and 111 deletions.
5 changes: 5 additions & 0 deletions .changeset/sweet-papayas-shake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Implement support for array columns and operations over those in where clauses
196 changes: 132 additions & 64 deletions packages/sync-service/lib/electric/replication/eval/env.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule Electric.Replication.Eval.Env do
# General info: https://www.postgresql.org/docs/current/extend-type-system.html#EXTEND-TYPES-POLYMORPHIC
# Example of why only hardcoded types are respected: https://github.com/postgres/postgres/blob/e8c334c47abb6c8f648cb50241c2cd65c9e4e6dc/src/backend/parser/parse_coerce.c#L1702
@simple_polymorphic_types ~w|anyelement anyarray anynonarray anyenum anyrange anymultirange|a
@common_polymorphic_types ~w|anycompatible anycompatiblearray anycompatiblenonarray anycompatiblerange anycompatiblemultirange|
@common_polymorphic_types ~w|anycompatible anycompatiblearray anycompatiblenonarray anycompatiblerange anycompatiblemultirange|a

### Types
@type flat_pg_type :: basic_type() | {:composite, map()} | :record | {:enum, term()}
Expand Down Expand Up @@ -120,17 +120,38 @@ defmodule Electric.Replication.Eval.Env do
end)
end

@text_types ~w|text varchar name bpchar|a

@doc """
Parse an unknown value constant as a known type in the current environment.
"""
@spec parse_const(t(), String.t() | nil, pg_type()) :: {:ok, term()} | :error
# Any type can be nullable in general
def parse_const(%__MODULE__{}, nil, _), do: {:ok, nil}
# Text is special-cased as never needing parsing
def parse_const(%__MODULE__{}, value, :text), do: {:ok, value}
def parse_const(%__MODULE__{}, value, :varchar), do: {:ok, value}
def parse_const(%__MODULE__{}, value, :name), do: {:ok, value}
def parse_const(%__MODULE__{}, value, :bpchar), do: {:ok, value}
def parse_const(%__MODULE__{}, value, x) when x in @text_types, do: {:ok, value}

def parse_const(%__MODULE__{}, value, {:array, subtype}) when subtype in @text_types do
{:ok, PgInterop.Array.parse(value)}
rescue
_ -> :error
end

def parse_const(%__MODULE__{funcs: funcs}, value, {:array, subtype}) do
with {:ok, overloads} <- Map.fetch(funcs, {to_string(subtype), 1}),
%{implementation: impl} <- Enum.find(overloads, &(&1.args == [:text])) do
try do
case impl do
{module, fun} -> {:ok, PgInterop.Array.parse(value, &apply(module, fun, [&1]))}
fun -> {:ok, PgInterop.Array.parse(value, &apply(fun, [&1]))}
end
rescue
_ -> :error
end
else
_ -> :error
end
end

def parse_const(%__MODULE__{funcs: funcs}, value, type) do
with {:ok, overloads} <- Map.fetch(funcs, {to_string(type), 1}),
Expand Down Expand Up @@ -183,14 +204,8 @@ defmodule Electric.Replication.Eval.Env do
def is_preferred?(%__MODULE__{known_basic_types: types}, type),
do: get_in(types, [type, :preferred?]) || false

@doc """
Check if a list of inputs can be implicitly coerced to a list of targets.
Note that other functions may not exactly support all of types
"""
@spec can_implicitly_coerce_types?(t(), list(pg_type()), list(pg_type())) :: boolean()
# This function implements logic from https://github.com/postgres/postgres/blob/e8c334c47abb6c8f648cb50241c2cd65c9e4e6dc/src/backend/parser/parse_coerce.c#L556
def can_implicitly_coerce_types?(%__MODULE__{} = env, inputs, targets) do
def get_unified_coercion_targets(%__MODULE__{} = env, inputs, targets, return_type \\ nil) do
# PG has two "groups" of polymorphic types, that need to agree within themselves, but not across
polymorphic_agg = %{simple: [], common: []}

Expand All @@ -200,6 +215,9 @@ defmodule Electric.Replication.Eval.Env do
{input, input}, agg ->
{:cont, agg}

{:unknown, _}, agg ->
{:cont, agg}

{input, target}, agg when target in @simple_polymorphic_types ->
{:cont, Map.update!(agg, :simple, &[{input, target} | &1])}

Expand All @@ -209,9 +227,6 @@ defmodule Electric.Replication.Eval.Env do
{_, :any}, agg ->
{:cont, agg}

{:unknown, _}, agg ->
{:cont, agg}

{:record, {:composite, _}}, agg ->
{:cont, agg}

Expand All @@ -229,90 +244,143 @@ defmodule Electric.Replication.Eval.Env do
end)
|> case do
:error ->
false
:error

%{simple: [], common: []} ->
true
{:ok,
{replace_all_polymorphics(targets, :text, :text),
replace_polymorphics(return_type, :text, :text)}}

%{simple: simple, common: common} ->
simple_polymorphics_agree?(simple) and common_polymorphics_agree?(common, env)
with {:ok, simple_consensus} <- simple_polymorphics_consensus(simple),
{:ok, common_consensus} <- common_polymorphics_consensus(common, env) do
{:ok,
{replace_all_polymorphics(targets, simple_consensus, common_consensus),
replace_polymorphics(return_type, simple_consensus, common_consensus)}}
end
end
end

defp simple_polymorphics_agree?(args, consensus \\ nil)
defp simple_polymorphics_agree?([], _), do: true
defp replace_all_polymorphics(types, simple, common),
do: Enum.map(types, &replace_polymorphics(&1, simple, common))

defp replace_polymorphics(type, simple_consensus, _) when type in [:anyelement, :anynonarray],
do: simple_consensus

defp replace_polymorphics(:anyarray, simple_consensus, _), do: {:array, simple_consensus}
defp replace_polymorphics(:anyenum, simple_consensus, _), do: {:enum, simple_consensus}
defp replace_polymorphics(:anyrange, simple_consensus, _), do: {:range, simple_consensus}

defp replace_polymorphics(:anymultirange, simple_consensus, _),
do: {:multirange, simple_consensus}

defp replace_polymorphics(type, _, common_consensus)
when type in [:anycompatible, :anycompatiblenonarray],
do: common_consensus

defp replace_polymorphics(:anycompatiblearray, _, common_consensus),
do: {:array, common_consensus}

defp replace_polymorphics(:anycompatiblerange, _, common_consensus),
do: {:range, common_consensus}

defp replace_polymorphics(:anycompatiblemultirange, _, common_consensus),
do: {:multirange, common_consensus}

defp replace_polymorphics(target, _, _), do: target

@doc """
Check if a list of inputs can be implicitly coerced to a list of targets.
Note that other functions may not exactly support all of types
"""
@spec can_implicitly_coerce_types?(t(), list(pg_type()), list(pg_type())) :: boolean()
# This function implements logic from https://github.com/postgres/postgres/blob/e8c334c47abb6c8f648cb50241c2cd65c9e4e6dc/src/backend/parser/parse_coerce.c#L556
def can_implicitly_coerce_types?(%__MODULE__{} = env, inputs, targets) do
get_unified_coercion_targets(env, inputs, targets) != :error
end

defp simple_polymorphics_consensus(args, consensus \\ nil)
defp simple_polymorphics_consensus([], consensus), do: {:ok, consensus}

# Check both that the element can satisfy the container limitation, and that the contained type matches
defp simple_polymorphics_agree?([{{:array, elem}, :anyarray} | tail], x)
defp simple_polymorphics_consensus([{{:array, elem}, :anyarray} | tail], x)
when is_nil(x) or x == elem,
do: simple_polymorphics_agree?(tail, elem)
do: simple_polymorphics_consensus(tail, elem)

defp simple_polymorphics_agree?([{{:range, elem}, :anyrange} | tail], x)
defp simple_polymorphics_consensus([{{:range, elem}, :anyrange} | tail], x)
when is_nil(x) or x == elem,
do: simple_polymorphics_agree?(tail, elem)
do: simple_polymorphics_consensus(tail, elem)

defp simple_polymorphics_agree?([{{:multirange, elem}, :anymultirange} | tail], x)
defp simple_polymorphics_consensus([{{:multirange, elem}, :anymultirange} | tail], x)
when is_nil(x) or x == elem,
do: simple_polymorphics_agree?(tail, elem)
do: simple_polymorphics_consensus(tail, elem)

# `:anyarray`, `:anyrange`, and `:anymultirange` basically "unwrap" values, but anything else doesn't
defp simple_polymorphics_agree?([{{:array, _}, :anynonarray} | _], _), do: false
defp simple_polymorphics_agree?([{_, :anynonarray} | _], {:array, _}), do: false
defp simple_polymorphics_consensus([{{:array, _}, :anynonarray} | _], _), do: :error
defp simple_polymorphics_consensus([{_, :anynonarray} | _], {:array, _}), do: :error

defp simple_polymorphics_agree?([{elem, :anynonarray} | tail], x)
defp simple_polymorphics_consensus([{elem, :anynonarray} | tail], x)
when is_nil(x) or elem == x,
do: simple_polymorphics_agree?(tail, elem)
do: simple_polymorphics_consensus(tail, elem)

defp simple_polymorphics_agree?([{{:enum, _} = elem, :anyenum} | tail], x)
defp simple_polymorphics_consensus([{{:enum, _} = elem, :anyenum} | tail], x)
when is_nil(x) or elem == x,
do: simple_polymorphics_agree?(tail, elem)
do: simple_polymorphics_consensus(tail, elem)

defp simple_polymorphics_agree?([{elem, :anyelement} | tail], x) when is_nil(x) or elem == x,
do: simple_polymorphics_agree?(tail, elem)
defp simple_polymorphics_consensus([{elem, :anyelement} | tail], x) when is_nil(x) or elem == x,
do: simple_polymorphics_consensus(tail, elem)

# If all guards failed, then we bail
defp simple_polymorphics_agree?(_, _), do: false
defp simple_polymorphics_consensus(_, _), do: :error

defp common_polymorphics_agree?([], _), do: true
defp common_polymorphics_consensus([], _), do: {:ok, nil}

defp common_polymorphics_agree?(args, env) do
defp common_polymorphics_consensus(args, env) do
# Logic in this loop tries to find common supertype for provided inputs, following same logic as
# https://github.com/postgres/postgres/blob/e8c334c47abb6c8f648cb50241c2cd65c9e4e6dc/src/backend/parser/parse_coerce.c#L1443

{{consensus, _, _}, seen_nonarray?} =
for {input, target} <- args,
input = unwrap(input, target),
category = get_type_category(env, input),
preferred? = is_preferred?(env, input),
nonarray? = target == :anycompatiblenonarray,
reduce: {{nil, nil, false}, false} do
{{nil, _, _}, _} ->
{{input, category, preferred?}, nonarray?}

# Same category, keep preferred
{{_, ^category, true} = old, seen_nonarray?} ->
{old, seen_nonarray? or nonarray?}

{{cur_type, ^category, _} = old, seen_nonarray?} ->
# Take new type if can coerce to it but not the other way
if implicitly_castable?(env, cur_type, input) and
not implicitly_castable?(env, input, cur_type) do
{{input, category, preferred?}, seen_nonarray? or nonarray?}
else
Enum.reduce(args, {{nil, nil, false}, false}, fn {input, target}, acc ->
input = unwrap(input, target)
category = get_type_category(env, input)
preferred? = is_preferred?(env, input)
nonarray? = target == :anycompatiblenonarray

case acc do
{{nil, _, _}, _} ->
{{input, category, preferred?}, nonarray?}

{{_, ^category, true} = old, seen_nonarray?} ->
{old, seen_nonarray? or nonarray?}
end

# Differing category, irreconcilable
{_, _} ->
throw(:unsatisfied_polymorphic_constraint)
end
{{cur_type, ^category, _} = old, seen_nonarray?} ->
# Take new type if can coerce to it but not the other way
if implicitly_castable?(env, cur_type, input) and
not implicitly_castable?(env, input, cur_type) do
{{input, category, preferred?}, seen_nonarray? or nonarray?}
else
{old, seen_nonarray? or nonarray?}
end

# Differing category, irreconcilable
{_, _} ->
throw(:unsatisfied_polymorphic_constraint)
end
end)

# If any of polymorphic variables are `nonarray`, then consensus cannot be array
# and all inputs have to be actually castable to the consensus
not (seen_nonarray? and match?({:array, _}, consensus)) and
Enum.all?(args, fn {input, _} -> implicitly_castable?(env, input, consensus) end)
if not (seen_nonarray? and match?({:array, _}, consensus)) and
Enum.all?(args, fn {input, target} ->
implicitly_castable?(env, unwrap(input, target), consensus)
end) do
{:ok, consensus}
else
:error
end
catch
:unsatisfied_polymorphic_constraint -> false
:unsatisfied_polymorphic_constraint -> :error
end

defp unwrap({:array, value}, :anycompatiblearray), do: value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Electric.Replication.Eval.Env.KnownFunctions do
defpostgres("numeric(text) -> numeric", delegate: &Casting.parse_float8/1)
defpostgres("bool(text) -> bool", delegate: &Casting.parse_bool/1)
defpostgres("uuid(text) -> uuid", delegate: &Casting.parse_uuid/1)
defpostgres("text(text) -> text", delegate: &BasicTypes.noop/1)
defpostgres("date(text) -> date", delegate: &Casting.parse_date/1)
defpostgres("time(text) -> time", delegate: &Casting.parse_time/1)
defpostgres("timestamp(text) -> timestamp", delegate: &Casting.parse_timestamp/1)
Expand Down Expand Up @@ -140,4 +141,55 @@ defmodule Electric.Replication.Eval.Env.KnownFunctions do
def naive_from_timestamptz(tz, datetime),
do: datetime |> DateTime.shift_zone!(tz) |> DateTime.to_naive()
end

## Array functions
defpostgres("anyarray = anyarray -> bool", delegate: &Kernel.==/2)
defpostgres("anyarray <> anyarray -> bool", delegate: &Kernel.!=/2)

defpostgres("anycompatiblearray || anycompatiblearray -> anycompatiblearray",
delegate: &PgInterop.Array.concat_arrays/2
)

defpostgres("array_cat(anycompatiblearray, anycompatiblearray) -> anycompatiblearray",
delegate: &PgInterop.Array.concat_arrays/2
)

defpostgres("anycompatible || anycompatiblearray -> anycompatiblearray",
delegate: &PgInterop.Array.array_prepend/2
)

defpostgres("array_prepend(anycompatible, anycompatiblearray) -> anycompatiblearray",
delegate: &PgInterop.Array.array_prepend/2
)

defpostgres("anycompatiblearray || anycompatible -> anycompatiblearray",
delegate: &PgInterop.Array.array_append/2
)

defpostgres("array_append(anycompatiblearray, anycompatible) -> anycompatiblearray",
delegate: &PgInterop.Array.array_append/2
)

defpostgres("array_ndims(anyarray) -> int4", delegate: &PgInterop.Array.get_array_dim/1)

defpostgres "anyarray @> anyarray -> bool" do
def left_array_contains_right?(left, right) do
MapSet.subset?(MapSet.new(right), MapSet.new(left))
end
end

defpostgres "anyarray <@ anyarray -> bool" do
def right_array_contains_left?(left, right) do
MapSet.subset?(MapSet.new(left), MapSet.new(right))
end
end

defpostgres "anyarray && anyarray -> bool" do
def arrays_overlap?(left, right) when left == [] or right == [], do: false

def arrays_overlap?(left, right) do
left_mapset = MapSet.new(left)
Enum.any?(right, &MapSet.member?(left_mapset, &1))
end
end
end
Loading

0 comments on commit 3ab27a6

Please sign in to comment.