diff --git a/.changeset/sweet-papayas-shake.md b/.changeset/sweet-papayas-shake.md new file mode 100644 index 0000000000..5d691dcada --- /dev/null +++ b/.changeset/sweet-papayas-shake.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Implement support for array columns and operations over those in where clauses diff --git a/packages/sync-service/lib/electric/replication/eval/env.ex b/packages/sync-service/lib/electric/replication/eval/env.ex index 84eb3b2353..e2adeba415 100644 --- a/packages/sync-service/lib/electric/replication/eval/env.ex +++ b/packages/sync-service/lib/electric/replication/eval/env.ex @@ -33,7 +33,7 @@ defmodule Electric.Replication.Eval.Env do # General info: https://www.postgresql.org/docs/current/extend-type-system.html#EXTEND-TYPES-POLYMORPHIC # Example of why only hardcoded types are respected: https://github.com/postgres/postgres/blob/e8c334c47abb6c8f648cb50241c2cd65c9e4e6dc/src/backend/parser/parse_coerce.c#L1702 @simple_polymorphic_types ~w|anyelement anyarray anynonarray anyenum anyrange anymultirange|a - @common_polymorphic_types ~w|anycompatible anycompatiblearray anycompatiblenonarray anycompatiblerange anycompatiblemultirange| + @common_polymorphic_types ~w|anycompatible anycompatiblearray anycompatiblenonarray anycompatiblerange anycompatiblemultirange|a ### Types @type flat_pg_type :: basic_type() | {:composite, map()} | :record | {:enum, term()} @@ -120,6 +120,8 @@ defmodule Electric.Replication.Eval.Env do end) end + @text_types ~w|text varchar name bpchar|a + @doc """ Parse an unknown value constant as a known type in the current environment. """ @@ -127,10 +129,29 @@ defmodule Electric.Replication.Eval.Env do # Any type can be nullable in general def parse_const(%__MODULE__{}, nil, _), do: {:ok, nil} # Text is special-cased as never needing parsing - def parse_const(%__MODULE__{}, value, :text), do: {:ok, value} - def parse_const(%__MODULE__{}, value, :varchar), do: {:ok, value} - def parse_const(%__MODULE__{}, value, :name), do: {:ok, value} - def parse_const(%__MODULE__{}, value, :bpchar), do: {:ok, value} + def parse_const(%__MODULE__{}, value, x) when x in @text_types, do: {:ok, value} + + def parse_const(%__MODULE__{}, value, {:array, subtype}) when subtype in @text_types do + {:ok, PgInterop.Array.parse(value)} + rescue + _ -> :error + end + + def parse_const(%__MODULE__{funcs: funcs}, value, {:array, subtype}) do + with {:ok, overloads} <- Map.fetch(funcs, {to_string(subtype), 1}), + %{implementation: impl} <- Enum.find(overloads, &(&1.args == [:text])) do + try do + case impl do + {module, fun} -> {:ok, PgInterop.Array.parse(value, &apply(module, fun, [&1]))} + fun -> {:ok, PgInterop.Array.parse(value, &apply(fun, [&1]))} + end + rescue + _ -> :error + end + else + _ -> :error + end + end def parse_const(%__MODULE__{funcs: funcs}, value, type) do with {:ok, overloads} <- Map.fetch(funcs, {to_string(type), 1}), @@ -183,14 +204,8 @@ defmodule Electric.Replication.Eval.Env do def is_preferred?(%__MODULE__{known_basic_types: types}, type), do: get_in(types, [type, :preferred?]) || false - @doc """ - Check if a list of inputs can be implicitly coerced to a list of targets. - - Note that other functions may not exactly support all of types - """ - @spec can_implicitly_coerce_types?(t(), list(pg_type()), list(pg_type())) :: boolean() # This function implements logic from https://github.com/postgres/postgres/blob/e8c334c47abb6c8f648cb50241c2cd65c9e4e6dc/src/backend/parser/parse_coerce.c#L556 - def can_implicitly_coerce_types?(%__MODULE__{} = env, inputs, targets) do + def get_unified_coercion_targets(%__MODULE__{} = env, inputs, targets, return_type \\ nil) do # PG has two "groups" of polymorphic types, that need to agree within themselves, but not across polymorphic_agg = %{simple: [], common: []} @@ -200,6 +215,9 @@ defmodule Electric.Replication.Eval.Env do {input, input}, agg -> {:cont, agg} + {:unknown, _}, agg -> + {:cont, agg} + {input, target}, agg when target in @simple_polymorphic_types -> {:cont, Map.update!(agg, :simple, &[{input, target} | &1])} @@ -209,9 +227,6 @@ defmodule Electric.Replication.Eval.Env do {_, :any}, agg -> {:cont, agg} - {:unknown, _}, agg -> - {:cont, agg} - {:record, {:composite, _}}, agg -> {:cont, agg} @@ -229,90 +244,143 @@ defmodule Electric.Replication.Eval.Env do end) |> case do :error -> - false + :error %{simple: [], common: []} -> - true + {:ok, + {replace_all_polymorphics(targets, :text, :text), + replace_polymorphics(return_type, :text, :text)}} %{simple: simple, common: common} -> - simple_polymorphics_agree?(simple) and common_polymorphics_agree?(common, env) + with {:ok, simple_consensus} <- simple_polymorphics_consensus(simple), + {:ok, common_consensus} <- common_polymorphics_consensus(common, env) do + {:ok, + {replace_all_polymorphics(targets, simple_consensus, common_consensus), + replace_polymorphics(return_type, simple_consensus, common_consensus)}} + end end end - defp simple_polymorphics_agree?(args, consensus \\ nil) - defp simple_polymorphics_agree?([], _), do: true + defp replace_all_polymorphics(types, simple, common), + do: Enum.map(types, &replace_polymorphics(&1, simple, common)) + + defp replace_polymorphics(type, simple_consensus, _) when type in [:anyelement, :anynonarray], + do: simple_consensus + + defp replace_polymorphics(:anyarray, simple_consensus, _), do: {:array, simple_consensus} + defp replace_polymorphics(:anyenum, simple_consensus, _), do: {:enum, simple_consensus} + defp replace_polymorphics(:anyrange, simple_consensus, _), do: {:range, simple_consensus} + + defp replace_polymorphics(:anymultirange, simple_consensus, _), + do: {:multirange, simple_consensus} + + defp replace_polymorphics(type, _, common_consensus) + when type in [:anycompatible, :anycompatiblenonarray], + do: common_consensus + + defp replace_polymorphics(:anycompatiblearray, _, common_consensus), + do: {:array, common_consensus} + + defp replace_polymorphics(:anycompatiblerange, _, common_consensus), + do: {:range, common_consensus} + + defp replace_polymorphics(:anycompatiblemultirange, _, common_consensus), + do: {:multirange, common_consensus} + + defp replace_polymorphics(target, _, _), do: target + + @doc """ + Check if a list of inputs can be implicitly coerced to a list of targets. + + Note that other functions may not exactly support all of types + """ + @spec can_implicitly_coerce_types?(t(), list(pg_type()), list(pg_type())) :: boolean() + # This function implements logic from https://github.com/postgres/postgres/blob/e8c334c47abb6c8f648cb50241c2cd65c9e4e6dc/src/backend/parser/parse_coerce.c#L556 + def can_implicitly_coerce_types?(%__MODULE__{} = env, inputs, targets) do + get_unified_coercion_targets(env, inputs, targets) != :error + end + + defp simple_polymorphics_consensus(args, consensus \\ nil) + defp simple_polymorphics_consensus([], consensus), do: {:ok, consensus} # Check both that the element can satisfy the container limitation, and that the contained type matches - defp simple_polymorphics_agree?([{{:array, elem}, :anyarray} | tail], x) + defp simple_polymorphics_consensus([{{:array, elem}, :anyarray} | tail], x) when is_nil(x) or x == elem, - do: simple_polymorphics_agree?(tail, elem) + do: simple_polymorphics_consensus(tail, elem) - defp simple_polymorphics_agree?([{{:range, elem}, :anyrange} | tail], x) + defp simple_polymorphics_consensus([{{:range, elem}, :anyrange} | tail], x) when is_nil(x) or x == elem, - do: simple_polymorphics_agree?(tail, elem) + do: simple_polymorphics_consensus(tail, elem) - defp simple_polymorphics_agree?([{{:multirange, elem}, :anymultirange} | tail], x) + defp simple_polymorphics_consensus([{{:multirange, elem}, :anymultirange} | tail], x) when is_nil(x) or x == elem, - do: simple_polymorphics_agree?(tail, elem) + do: simple_polymorphics_consensus(tail, elem) # `:anyarray`, `:anyrange`, and `:anymultirange` basically "unwrap" values, but anything else doesn't - defp simple_polymorphics_agree?([{{:array, _}, :anynonarray} | _], _), do: false - defp simple_polymorphics_agree?([{_, :anynonarray} | _], {:array, _}), do: false + defp simple_polymorphics_consensus([{{:array, _}, :anynonarray} | _], _), do: :error + defp simple_polymorphics_consensus([{_, :anynonarray} | _], {:array, _}), do: :error - defp simple_polymorphics_agree?([{elem, :anynonarray} | tail], x) + defp simple_polymorphics_consensus([{elem, :anynonarray} | tail], x) when is_nil(x) or elem == x, - do: simple_polymorphics_agree?(tail, elem) + do: simple_polymorphics_consensus(tail, elem) - defp simple_polymorphics_agree?([{{:enum, _} = elem, :anyenum} | tail], x) + defp simple_polymorphics_consensus([{{:enum, _} = elem, :anyenum} | tail], x) when is_nil(x) or elem == x, - do: simple_polymorphics_agree?(tail, elem) + do: simple_polymorphics_consensus(tail, elem) - defp simple_polymorphics_agree?([{elem, :anyelement} | tail], x) when is_nil(x) or elem == x, - do: simple_polymorphics_agree?(tail, elem) + defp simple_polymorphics_consensus([{elem, :anyelement} | tail], x) when is_nil(x) or elem == x, + do: simple_polymorphics_consensus(tail, elem) # If all guards failed, then we bail - defp simple_polymorphics_agree?(_, _), do: false + defp simple_polymorphics_consensus(_, _), do: :error - defp common_polymorphics_agree?([], _), do: true + defp common_polymorphics_consensus([], _), do: {:ok, nil} - defp common_polymorphics_agree?(args, env) do + defp common_polymorphics_consensus(args, env) do # Logic in this loop tries to find common supertype for provided inputs, following same logic as # https://github.com/postgres/postgres/blob/e8c334c47abb6c8f648cb50241c2cd65c9e4e6dc/src/backend/parser/parse_coerce.c#L1443 {{consensus, _, _}, seen_nonarray?} = - for {input, target} <- args, - input = unwrap(input, target), - category = get_type_category(env, input), - preferred? = is_preferred?(env, input), - nonarray? = target == :anycompatiblenonarray, - reduce: {{nil, nil, false}, false} do - {{nil, _, _}, _} -> - {{input, category, preferred?}, nonarray?} - - # Same category, keep preferred - {{_, ^category, true} = old, seen_nonarray?} -> - {old, seen_nonarray? or nonarray?} - - {{cur_type, ^category, _} = old, seen_nonarray?} -> - # Take new type if can coerce to it but not the other way - if implicitly_castable?(env, cur_type, input) and - not implicitly_castable?(env, input, cur_type) do - {{input, category, preferred?}, seen_nonarray? or nonarray?} - else + Enum.reduce(args, {{nil, nil, false}, false}, fn {input, target}, acc -> + input = unwrap(input, target) + category = get_type_category(env, input) + preferred? = is_preferred?(env, input) + nonarray? = target == :anycompatiblenonarray + + case acc do + {{nil, _, _}, _} -> + {{input, category, preferred?}, nonarray?} + + {{_, ^category, true} = old, seen_nonarray?} -> {old, seen_nonarray? or nonarray?} - end - # Differing category, irreconcilable - {_, _} -> - throw(:unsatisfied_polymorphic_constraint) - end + {{cur_type, ^category, _} = old, seen_nonarray?} -> + # Take new type if can coerce to it but not the other way + if implicitly_castable?(env, cur_type, input) and + not implicitly_castable?(env, input, cur_type) do + {{input, category, preferred?}, seen_nonarray? or nonarray?} + else + {old, seen_nonarray? or nonarray?} + end + + # Differing category, irreconcilable + {_, _} -> + throw(:unsatisfied_polymorphic_constraint) + end + end) # If any of polymorphic variables are `nonarray`, then consensus cannot be array # and all inputs have to be actually castable to the consensus - not (seen_nonarray? and match?({:array, _}, consensus)) and - Enum.all?(args, fn {input, _} -> implicitly_castable?(env, input, consensus) end) + if not (seen_nonarray? and match?({:array, _}, consensus)) and + Enum.all?(args, fn {input, target} -> + implicitly_castable?(env, unwrap(input, target), consensus) + end) do + {:ok, consensus} + else + :error + end catch - :unsatisfied_polymorphic_constraint -> false + :unsatisfied_polymorphic_constraint -> :error end defp unwrap({:array, value}, :anycompatiblearray), do: value diff --git a/packages/sync-service/lib/electric/replication/eval/env/known_functions.ex b/packages/sync-service/lib/electric/replication/eval/env/known_functions.ex index 41dffe223f..1994bbd906 100644 --- a/packages/sync-service/lib/electric/replication/eval/env/known_functions.ex +++ b/packages/sync-service/lib/electric/replication/eval/env/known_functions.ex @@ -15,6 +15,7 @@ defmodule Electric.Replication.Eval.Env.KnownFunctions do defpostgres("numeric(text) -> numeric", delegate: &Casting.parse_float8/1) defpostgres("bool(text) -> bool", delegate: &Casting.parse_bool/1) defpostgres("uuid(text) -> uuid", delegate: &Casting.parse_uuid/1) + defpostgres("text(text) -> text", delegate: &BasicTypes.noop/1) defpostgres("date(text) -> date", delegate: &Casting.parse_date/1) defpostgres("time(text) -> time", delegate: &Casting.parse_time/1) defpostgres("timestamp(text) -> timestamp", delegate: &Casting.parse_timestamp/1) @@ -140,4 +141,55 @@ defmodule Electric.Replication.Eval.Env.KnownFunctions do def naive_from_timestamptz(tz, datetime), do: datetime |> DateTime.shift_zone!(tz) |> DateTime.to_naive() end + + ## Array functions + defpostgres("anyarray = anyarray -> bool", delegate: &Kernel.==/2) + defpostgres("anyarray <> anyarray -> bool", delegate: &Kernel.!=/2) + + defpostgres("anycompatiblearray || anycompatiblearray -> anycompatiblearray", + delegate: &PgInterop.Array.concat_arrays/2 + ) + + defpostgres("array_cat(anycompatiblearray, anycompatiblearray) -> anycompatiblearray", + delegate: &PgInterop.Array.concat_arrays/2 + ) + + defpostgres("anycompatible || anycompatiblearray -> anycompatiblearray", + delegate: &PgInterop.Array.array_prepend/2 + ) + + defpostgres("array_prepend(anycompatible, anycompatiblearray) -> anycompatiblearray", + delegate: &PgInterop.Array.array_prepend/2 + ) + + defpostgres("anycompatiblearray || anycompatible -> anycompatiblearray", + delegate: &PgInterop.Array.array_append/2 + ) + + defpostgres("array_append(anycompatiblearray, anycompatible) -> anycompatiblearray", + delegate: &PgInterop.Array.array_append/2 + ) + + defpostgres("array_ndims(anyarray) -> int4", delegate: &PgInterop.Array.get_array_dim/1) + + defpostgres "anyarray @> anyarray -> bool" do + def left_array_contains_right?(left, right) do + MapSet.subset?(MapSet.new(right), MapSet.new(left)) + end + end + + defpostgres "anyarray <@ anyarray -> bool" do + def right_array_contains_left?(left, right) do + MapSet.subset?(MapSet.new(left), MapSet.new(right)) + end + end + + defpostgres "anyarray && anyarray -> bool" do + def arrays_overlap?(left, right) when left == [] or right == [], do: false + + def arrays_overlap?(left, right) do + left_mapset = MapSet.new(left) + Enum.any?(right, &MapSet.member?(left_mapset, &1)) + end + end end diff --git a/packages/sync-service/lib/electric/replication/eval/lookups.ex b/packages/sync-service/lib/electric/replication/eval/lookups.ex index 70c01533ac..06b51cd33b 100644 --- a/packages/sync-service/lib/electric/replication/eval/lookups.ex +++ b/packages/sync-service/lib/electric/replication/eval/lookups.ex @@ -2,6 +2,67 @@ defmodule Electric.Replication.Eval.Lookups do alias Electric.Utils alias Electric.Replication.Eval.Env + @doc """ + Given a list of types, get a candidate type that best represents the union of all types. + + Algorithm implements Postgres type resolution for `UNION`, `CASE`, and related constructs, like + `ARRAY[]` constructor syntax. They are outlined in + [documentation](https://www.postgresql.org/docs/current/typeconv-union-case.html). + + 1. If all inputs are of the same type, and it is not unknown, resolve as that type. + 2. If any input is of a domain type, treat it as being of the domain's base type for all subsequent steps. + 3. If all inputs are of type unknown, resolve as type text (the preferred type of the string category). + Otherwise, unknown inputs are ignored for the purposes of the remaining rules. + 4. If the non-unknown inputs are not all of the same type category, fail. + 5. Select the first non-unknown input type as the candidate type, then consider each other non-unknown input type, left to right. + If the candidate type can be implicitly converted to the other type but not vice-versa, select the other type as the new + candidate type. Then continue considering the remaining inputs. + If, at any stage of this process, a preferred type is selected, stop considering additional inputs. + 6. Convert all inputs to the final candidate type. Fail if there is not an implicit conversion from a given input type to the candidate type. + """ + def pick_union_type(args, env) do + Enum.reduce_while(args, {:ok, :unknown}, fn + # Skip unknowns + x, state when (is_map_key(x, :type) and x.type == :unknown) or not is_map_key(x, :type) -> + {:cont, state} + + # Take the first non-unknown type + %{type: type}, {:ok, :unknown} -> + {:cont, {:ok, type}} + + # Skip duplicates + %{type: type}, {:ok, candidate} when type == candidate -> + {:cont, {:ok, candidate}} + + %{type: type}, {:ok, candidate} -> + cond do + # Fail if categories differ + Env.get_type_category(env, type) != Env.get_type_category(env, candidate) -> + {:halt, {:error, type, candidate}} + + # Preferred type wins, but keep going to check for errors + Env.is_preferred?(env, type) -> + {:cont, {:ok, type}} + + # Preferred type wins, but keep going to check for errors + Env.is_preferred?(env, candidate) -> + {:cont, {:ok, candidate}} + + # If implicit coercion is possible in one direction, pick that type + Env.can_implicitly_coerce_types?(env, [candidate], [type]) and + not Env.can_implicitly_coerce_types?(env, [type], [candidate]) -> + {:cont, {:ok, type}} + + true -> + {:cont, {:ok, type}} + end + end) + |> case do + {:ok, :unknown} -> {:ok, :text} + result -> result + end + end + @doc """ Given multiple possible operator overloads (same name and arity), try to find a concrete implementation that matches the argument types. @@ -108,7 +169,12 @@ defmodule Electric.Replication.Eval.Lookups do end defp filter_overloads_on_implicit_conversion(choices, args, _, env) do - Enum.filter(choices, &Env.can_implicitly_coerce_types?(env, args, &1.args)) + Enum.reduce(choices, [], fn choice, acc -> + case Env.get_unified_coercion_targets(env, args, choice.args, choice.returns) do + {:ok, {targets, return}} -> [%{choice | args: targets, returns: return} | acc] + :error -> acc + end + end) end # "Most exact" here is defined as "how many arguments of the function do match exactly with provided" diff --git a/packages/sync-service/lib/electric/replication/eval/parser.ex b/packages/sync-service/lib/electric/replication/eval/parser.ex index 1523dd67c0..ad1a90cb49 100644 --- a/packages/sync-service/lib/electric/replication/eval/parser.ex +++ b/packages/sync-service/lib/electric/replication/eval/parser.ex @@ -19,7 +19,20 @@ defmodule Electric.Replication.Eval.Parser do end defmodule Func do - defstruct [:args, :type, :implementation, :name, strict?: true, immutable?: true, location: 0] + defstruct [ + :args, + :type, + :implementation, + :name, + strict?: true, + immutable?: true, + # So this parameter is (1) internal for now, i.e. `defpostgres` in known functions cannot set it, + # and (2) is a bit of a hack. This allows us to specify that this function should be applied to each element of an array, + # without supporting essentially anonymous functions in our AST for an equivalent of `Enum.map/2`. + map_over_array_in_pos: nil, + variadic_arg: nil, + location: 0 + ] end @valid_types (Electric.Postgres.supported_types() ++ @@ -84,6 +97,11 @@ defmodule Electric.Replication.Eval.Parser do end end + defp maybe_parse_and_validate_tree(nil, _, _), do: nil + + defp maybe_parse_and_validate_tree(node, refs, env), + do: do_parse_and_validate_tree(node, refs, env) + @spec do_parse_and_validate_tree(struct(), map(), map()) :: {:ok, %UnknownConst{} | tree_part()} | {:error, {non_neg_integer(), String.t()}} @@ -99,6 +117,108 @@ defmodule Electric.Replication.Eval.Parser do defp do_parse_and_validate_tree(%PgQuery.A_Const{val: {kind, struct}, location: loc}, _, _), do: make_const(kind, Map.fetch!(struct, kind), loc) + defp do_parse_and_validate_tree( + %PgQuery.A_ArrayExpr{elements: elements, location: loc}, + refs, + env + ) do + element_len = length(elements) + + with {:ok, elements} <- + Utils.map_while_ok(elements, &do_parse_and_validate_tree(&1, refs, env)) do + case Lookups.pick_union_type(elements, env) do + {:ok, type} -> + with {:ok, elements} <- + cast_unknowns(elements, List.duplicate(type, element_len), env), + {:ok, elements} <- + try_cast_implicit(elements, List.duplicate(type, element_len), env) do + maybe_reduce(%Func{ + args: [elements], + type: {:array, maybe_array_type(type)}, + location: loc, + name: "build_array", + implementation: & &1, + strict?: false, + variadic_arg: 0 + }) + end + + {:error, type, candidate} -> + {:error, + {loc, "ARRAY types #{readable(type)} and #{readable(candidate)} cannot be matched"}} + end + end + end + + defp do_parse_and_validate_tree( + %PgQuery.A_Indirection{arg: arg, indirection: indirection}, + refs, + env + ) do + with {:ok, %{type: {:array, inner_type} = array_type} = arg} <- + do_parse_and_validate_tree(arg, refs, env), + {:ok, indirections} <- + Utils.map_while_ok(indirection, &do_parse_and_validate_tree(&1, refs, env)) do + # If any of the indirections are slices, every access is treated as a slice access + # (e.g. `a[1:2][3]` is treated by PG as `a[1:2][1:3]` implicitly). + if Enum.any?(indirections, &(&1.type == {:internal, :slice})) do + maybe_reduce(%Func{ + location: arg.location, + args: [arg, indirections], + type: array_type, + name: "slice_access", + implementation: &PgInterop.Array.slice_access/2, + variadic_arg: 1 + }) + else + maybe_reduce(%Func{ + location: arg.location, + args: [arg, indirections], + type: inner_type, + name: "index_access", + implementation: &PgInterop.Array.index_access/2, + variadic_arg: 1 + }) + end + end + end + + defp do_parse_and_validate_tree( + %PgQuery.A_Indices{is_slice: is_slice, lidx: lower_idx, uidx: upper_idx}, + refs, + env + ) do + with {:ok, lower_idx} <- + maybe_parse_and_validate_tree(lower_idx, refs, env) || + {:ok, %Const{value: :unspecified, type: {:internal, :slice_boundary}, location: 0}}, + {:ok, upper_idx} <- + maybe_parse_and_validate_tree(upper_idx, refs, env) || + {:ok, %Const{value: :unspecified, type: {:internal, :slice_boundary}, location: 0}}, + {:ok, [lower_idx, upper_idx]} <- + cast_unknowns([lower_idx, upper_idx], List.duplicate(:int8, 2), env), + {:ok, [lower_idx, upper_idx]} <- round_numerics([lower_idx, upper_idx]), + {:ok, [lower_idx, upper_idx]} <- + try_cast_implicit([lower_idx, upper_idx], List.duplicate(:int8, 2), env) do + if is_slice do + maybe_reduce(%Func{ + location: upper_idx.location, + args: [lower_idx, upper_idx], + type: {:internal, :slice}, + name: "internal_slice", + implementation: &build_slice_structure/2 + }) + else + maybe_reduce(%Func{ + location: upper_idx.location, + args: [upper_idx], + type: {:internal, :index}, + name: "internal_index", + implementation: &build_index_structure/1 + }) + end + end + end + defp do_parse_and_validate_tree(%PgQuery.ColumnRef{fields: fields, location: loc}, refs, _) do ref = Enum.map(fields, &unwrap_node_string/1) @@ -217,19 +337,50 @@ defmodule Electric.Replication.Eval.Parser do # They all treat lexpr and rexpr differently, so we're just deferring to a concrete function implementation here for clarity. defp do_parse_and_validate_tree(%PgQuery.A_Expr{kind: kind, location: loc} = expr, refs, env) do case {kind, expr.lexpr} do - {:AEXPR_OP, nil} -> handle_unary_operator(expr, refs, env) - {:AEXPR_OP, _} -> handle_binary_operator(expr, refs, env) + {:AEXPR_OP, nil} -> + handle_unary_operator(expr, refs, env) + + {:AEXPR_OP, _} -> + handle_binary_operator(expr, refs, env) + # LIKE and ILIKE are expressed plainly as operators by the parser - {:AEXPR_LIKE, _} -> handle_binary_operator(expr, refs, env) - {:AEXPR_ILIKE, _} -> handle_binary_operator(expr, refs, env) - {:AEXPR_DISTINCT, _} -> handle_distinct(expr, refs, env) - {:AEXPR_NOT_DISTINCT, _} -> handle_distinct(expr, refs, env) - {:AEXPR_IN, _} -> handle_in(expr, refs, env) - {:AEXPR_BETWEEN, _} -> handle_between(expr, refs, env) - {:AEXPR_BETWEEN_SYM, _} -> handle_between(expr, refs, env) - {:AEXPR_NOT_BETWEEN, _} -> handle_between(expr, refs, env) - {:AEXPR_NOT_BETWEEN_SYM, _} -> handle_between(expr, refs, env) - _ -> {:error, {loc, "expression #{identifier(expr.name)} is not currently supported"}} + {:AEXPR_LIKE, _} -> + handle_binary_operator(expr, refs, env) + + {:AEXPR_ILIKE, _} -> + handle_binary_operator(expr, refs, env) + + {:AEXPR_DISTINCT, _} -> + handle_distinct(expr, refs, env) + + {:AEXPR_NOT_DISTINCT, _} -> + handle_distinct(expr, refs, env) + + {:AEXPR_IN, _} -> + handle_in(expr, refs, env) + + {:AEXPR_BETWEEN, _} -> + handle_between(expr, refs, env) + + {:AEXPR_BETWEEN_SYM, _} -> + handle_between(expr, refs, env) + + {:AEXPR_NOT_BETWEEN, _} -> + handle_between(expr, refs, env) + + {:AEXPR_NOT_BETWEEN_SYM, _} -> + handle_between(expr, refs, env) + + {:AEXPR_OP_ANY, _} -> + handle_any_or_all(expr, refs, env) + + {:AEXPR_OP_ALL, _} -> + handle_any_or_all(expr, refs, env) + + _ -> + {:error, + {loc, + "expression #{identifier(expr.name)} of #{inspect(kind)} is not currently supported"}} end end @@ -303,8 +454,11 @@ defmodule Electric.Replication.Eval.Parser do {Map.get(node, :location, 0), "#{type_module |> Module.split() |> List.last()} is not supported in this context"}} - defp get_type_from_pg_name(%PgQuery.TypeName{names: _, array_bounds: [_ | _]} = cast), - do: {:error, {cast.location, "Electric currently doesn't support array types"}} + defp get_type_from_pg_name(%PgQuery.TypeName{array_bounds: [_ | _]} = cast) do + with {:ok, type} <- get_type_from_pg_name(%{cast | array_bounds: []}) do + {:ok, {:array, type}} + end + end defp get_type_from_pg_name(%PgQuery.TypeName{names: names, location: loc}) do case Enum.map(names, &unwrap_node_string/1) do @@ -381,6 +535,48 @@ defmodule Electric.Replication.Eval.Parser do end end + defp handle_any_or_all(%PgQuery.A_Expr{} = expr, refs, env) do + with {:ok, lexpr} <- do_parse_and_validate_tree(expr.lexpr, refs, env), + {:ok, rexpr} <- do_parse_and_validate_tree(expr.rexpr, refs, env), + {:ok, fake_rexpr} <- get_fake_array_elem(rexpr), + {:ok, choices} <- find_available_operators(expr.name, 2, expr.location, env), + # Get a fake element type for the array, if possible, to pick correct operator overload + {:ok, %{args: [lexpr_type, rexpr_type], returns: :bool} = concrete} <- + Lookups.pick_concrete_operator_overload(choices, [lexpr, fake_rexpr], env), + {:ok, args} <- cast_unknowns([lexpr, rexpr], [lexpr_type, {:array, rexpr_type}], env), + {:ok, [lexpr, rexpr]} <- cast_implicit(args, [lexpr_type, {:array, rexpr_type}], env), + {:ok, bool_array} <- + concrete + |> from_concrete([lexpr, rexpr]) + |> Map.put(:map_over_array_in_pos, 1) + |> maybe_reduce() do + {name, impl} = + case expr.kind do + :AEXPR_OP_ANY -> {"any", &Enum.any?/1} + :AEXPR_OP_ALL -> {"all", &Enum.all?/1} + end + + maybe_reduce(%Func{ + implementation: impl, + name: name, + type: :bool, + args: [bool_array] + }) + else + {:error, {_loc, _msg}} = error -> error + :error -> {:error, {expr.location, "Could not select an operator overload"}} + {:ok, _} -> {:error, {expr.location, "ANY/ALL requires operator that returns bool"}} + end + end + + defp get_fake_array_elem(%UnknownConst{} = unknown), do: {:ok, unknown} + + defp get_fake_array_elem(%{type: {:array, inner_type}} = expr), + do: {:ok, %{expr | type: inner_type}} + + defp get_fake_array_elem(other), + do: {:error, {other.location, "argument of ANY must be an array"}} + defp handle_between(%PgQuery.A_Expr{} = expr, refs, env) do # It can only be a list here because that's how PG parses SQL. It it's a subquery, then it # wouldn't be `A_Expr`. @@ -475,7 +671,7 @@ defmodule Electric.Replication.Eval.Parser do {:error, _} -> {:error, {const.location, - "could not cast value #{inspect(value)} from #{type} to #{target_type}"}} + "could not cast value #{inspect(value)} from #{readable(type)} to #{readable(target_type)}"}} end end end @@ -514,17 +710,40 @@ defmodule Electric.Replication.Eval.Parser do :error -> case {from_type, to_type} do - {:unknown, _} -> {:ok, {__MODULE__, :cast_null}} - {_, :unknown} -> {:ok, {__MODULE__, :cast_null}} - {:text, to_type} -> find_cast_in_function(env, to_type) - {from_type, :text} -> find_cast_out_function(env, from_type) - {from_type, to_type} -> find_explicit_cast(env, from_type, to_type) + {:unknown, _} -> + {:ok, {__MODULE__, :cast_null}} + + {_, :unknown} -> + {:ok, {__MODULE__, :cast_null}} + + {{:array, t1}, {:array, t2}} -> + case find_cast_function(env, t1, t2) do + {:ok, :as_is} -> {:ok, :as_is} + {:ok, impl} -> {:ok, :array_cast, impl} + :error -> :error + end + + {:text, to_type} -> + find_cast_in_function(env, to_type) + + {from_type, :text} -> + find_cast_out_function(env, from_type) + + {from_type, to_type} -> + find_explicit_cast(env, from_type, to_type) end end end def cast_null(nil), do: nil + defp find_cast_in_function(env, {:array, to_type}) do + with {:ok, [%{args: [:text], implementation: impl}]} <- + Map.fetch(env.funcs, {"#{to_type}", 1}) do + {:ok, &PgInterop.Array.parse(&1, impl)} + end + end + defp find_cast_in_function(env, to_type) do case Map.fetch(env.funcs, {"#{to_type}", 1}) do {:ok, [%{args: [:text], implementation: impl}]} -> {:ok, impl} @@ -549,6 +768,17 @@ defmodule Electric.Replication.Eval.Parser do {:ok, :as_is} -> {:ok, %{arg | type: target_type}} + {:ok, :array_cast, impl} -> + {:ok, + %Func{ + location: loc, + type: maybe_array_type(target_type), + args: [arg], + implementation: impl, + map_over_array_in_pos: 0, + name: "#{readable(type)}_to_#{readable(target_type)}" + }} + {:ok, impl} -> {:ok, %Func{ @@ -556,20 +786,86 @@ defmodule Electric.Replication.Eval.Parser do type: target_type, args: [arg], implementation: impl, - name: "#{type}_to_#{target_type}" + name: "#{readable(type)}_to_#{readable(target_type)}" }} :error -> - {:error, {loc, "unknown cast from type #{type} to type #{target_type}"}} + {:error, + {loc, "unknown cast from type #{readable(type)} to type #{readable(target_type)}"}} end end + defp readable(:unknown), do: "unknown" + defp readable({:array, type}), do: "#{readable(type)}[]" + defp readable({:internal, type}), do: "internal type #{readable(type)}" + defp readable(type), do: to_string(type) + + defp try_cast_implicit(processed_args, arg_list, env) do + {:ok, + Enum.zip_with(processed_args, arg_list, fn + %{type: type} = arg, type -> + arg + + %{type: {:internal, _}} = arg, _ -> + arg + + %{type: from_type} = arg, to_type -> + case Map.fetch(env.implicit_casts, {from_type, to_type}) do + {:ok, :as_is} -> + arg + + {:ok, impl} -> + %Func{ + location: arg.location, + type: to_type, + args: [arg], + implementation: impl, + name: "#{from_type}_to_#{to_type}" + } + |> maybe_reduce() + |> case do + {:ok, val} -> val + error -> throw(error) + end + + :error -> + throw( + {:error, + {arg.location, "#{readable(from_type)} cannot be matched to #{readable(to_type)}"}} + ) + end + end)} + catch + {:error, {_loc, _message}} = error -> error + end + defp cast_implicit(processed_args, arg_list, env) do {:ok, Enum.zip_with(processed_args, arg_list, fn %{type: type} = arg, type -> arg + %{type: {:array, from_type}} = arg, {:array, to_type} -> + case Map.fetch!(env.implicit_casts, {from_type, to_type}) do + :as_is -> + arg + + impl -> + %Func{ + location: arg.location, + type: to_type, + args: [arg], + implementation: impl, + name: "#{from_type}_to_#{to_type}", + map_over_array_in_pos: 0 + } + |> maybe_reduce() + |> case do + {:ok, val} -> val + error -> throw(error) + end + end + %{type: from_type} = arg, to_type -> case Map.fetch!(env.implicit_casts, {from_type, to_type}) do :as_is -> @@ -603,7 +899,7 @@ defmodule Electric.Replication.Eval.Parser do %UnknownConst{value: value, location: loc}, type -> case Env.parse_const(env, value, type) do {:ok, value} -> %Const{type: type, location: loc, value: value} - :error -> throw({:error, {loc, "invalid syntax for type #{type}: #{value}"}}) + :error -> throw({:error, {loc, "invalid syntax for type #{readable(type)}: #{value}"}}) end arg, _ -> @@ -628,7 +924,7 @@ defmodule Electric.Replication.Eval.Parser do {:ok, %Const{type: :int8, value: value, location: loc}} {:fval, value} -> - {:ok, %Const{type: :numeric, value: value, location: loc}} + {:ok, %Const{type: :numeric, value: String.to_float(value), location: loc}} {:boolval, value} -> {:ok, %Const{type: :bool, value: value, location: loc}} @@ -664,12 +960,26 @@ defmodule Electric.Replication.Eval.Parser do {:ok, %Func{} | %Const{}} | {:error, {non_neg_integer(), String.t()}} defp maybe_reduce(%Func{immutable?: false} = func), do: {:ok, func} - defp maybe_reduce(%Func{args: args} = func) do + defp maybe_reduce(%Func{args: args, variadic_arg: position} = func) do {args, {any_nils?, all_const?}} = - Enum.map_reduce(args, {false, true}, fn - %Const{value: nil}, {_any_nils?, all_const?} -> {nil, {true, all_const?}} - %Const{value: value}, {any_nils?, all_const?} -> {value, {any_nils?, all_const?}} - _, {any_nils?, _all_const?} -> {:not_used, {any_nils?, false}} + args + |> Enum.with_index() + |> Enum.map_reduce({false, true}, fn + {arg, ^position}, {any_nils?, all_const?} -> + Enum.map_reduce(arg, {any_nils?, all_const?}, fn + %Const{value: nil}, {_any_nils?, all_const?} -> {nil, {true, all_const?}} + %Const{value: value}, {any_nils?, all_const?} -> {value, {any_nils?, all_const?}} + _, {any_nils?, _all_const?} -> {:not_used, {any_nils?, false}} + end) + + {%Const{value: nil}, _}, {_any_nils?, all_const?} -> + {nil, {true, all_const?}} + + {%Const{value: value}, _}, {any_nils?, all_const?} -> + {value, {any_nils?, all_const?}} + + _, {any_nils?, _all_const?} -> + {:not_used, {any_nils?, false}} end) cond do @@ -687,14 +997,40 @@ defmodule Electric.Replication.Eval.Parser do end end - defp try_applying(%Func{args: args, implementation: impl} = func) do + defp try_applying( + %Func{args: args, implementation: impl, map_over_array_in_pos: map_over_array_in_pos} = + func + ) do value = - case impl do - {module, function} -> apply(module, function, args) - function -> apply(function, args) + case {impl, map_over_array_in_pos} do + {{module, function}, nil} -> + apply(module, function, args) + + {function, nil} -> + apply(function, args) + + {{module, function}, 0} -> + Utils.deep_map(hd(args), &apply(module, function, [&1 | tl(args)])) + + {function, 0} -> + Utils.deep_map(hd(args), &apply(function, [&1 | tl(args)])) + + {{module, function}, pos} -> + Utils.deep_map( + Enum.at(args, pos), + &apply(module, function, List.replace_at(args, pos, &1)) + ) + + {function, pos} -> + Utils.deep_map(Enum.at(args, pos), &apply(function, List.replace_at(args, pos, &1))) end - {:ok, %Const{value: value, type: func.type, location: func.location}} + {:ok, + %Const{ + value: value, + type: if(not is_nil(map_over_array_in_pos), do: {:array, func.type}, else: func.type), + location: func.location + }} rescue e -> IO.puts(Exception.format(:error, e, __STACKTRACE__)) @@ -730,7 +1066,16 @@ defmodule Electric.Replication.Eval.Parser do defp find_refs(tree, acc \\ %{}) defp find_refs(%Const{}, acc), do: acc defp find_refs(%Ref{path: path, type: type}, acc), do: Map.put_new(acc, path, type) - defp find_refs(%Func{args: args}, acc), do: Enum.reduce(args, acc, &find_refs/2) + + defp find_refs(%Func{args: args, variadic_arg: nil}, acc), + do: Enum.reduce(args, acc, &find_refs/2) + + defp find_refs(%Func{args: args, variadic_arg: position}, acc), + do: + Enum.reduce(Enum.with_index(args), acc, fn + {arg, ^position}, acc -> Enum.reduce(arg, acc, &find_refs/2) + {arg, _}, acc -> find_refs(arg, acc) + end) defp unsnake(string) when is_binary(string), do: :binary.replace(string, "_", " ", [:global]) @@ -748,4 +1093,31 @@ defmodule Electric.Replication.Eval.Parser do location: tree_part.location }) end + + defp maybe_array_type({:array, type}), do: type + defp maybe_array_type(type), do: type + + defp build_index_structure(index) do + {:index, index} + end + + defp build_slice_structure(lower_idx, upper_idx) do + {:slice, lower_idx, upper_idx} + end + + defp round_numerics(args) do + Utils.map_while_ok(args, fn + %{type: x} = arg when x in [:numeric, :float4, :float8] -> + maybe_reduce(%Func{ + location: arg.location, + type: :int8, + args: [arg], + implementation: &Kernel.round/1, + name: "round" + }) + + arg -> + {:ok, arg} + end) + end end diff --git a/packages/sync-service/lib/electric/replication/eval/runner.ex b/packages/sync-service/lib/electric/replication/eval/runner.ex index e0efa9aca3..9e1d61c5f5 100644 --- a/packages/sync-service/lib/electric/replication/eval/runner.ex +++ b/packages/sync-service/lib/electric/replication/eval/runner.ex @@ -1,5 +1,6 @@ defmodule Electric.Replication.Eval.Runner do require Logger + alias Electric.Utils alias Electric.Replication.Eval.Expr alias Electric.Replication.Eval.Env alias Electric.Replication.Eval.Parser.{Const, Func, Ref} @@ -51,13 +52,22 @@ defmodule Electric.Replication.Eval.Runner do defp do_execute(%Const{value: value}, _), do: value defp do_execute(%Ref{path: path}, refs), do: Map.fetch!(refs, path) - defp do_execute(%Func{} = func, refs) do + defp do_execute(%Func{variadic_arg: vararg_position} = func, refs) do {args, has_nils?} = - Enum.map_reduce(func.args, false, fn val, has_nils? -> - case do_execute(val, refs) do - nil -> {nil, true} - val -> {val, has_nils?} - end + Enum.map_reduce(Enum.with_index(func.args), false, fn + {val, ^vararg_position}, has_nils? -> + Enum.map_reduce(val, has_nils?, fn val, has_nils? -> + case do_execute(val, refs) do + nil -> {nil, true} + val -> {val, has_nils?} + end + end) + + {val, _}, has_nils? -> + case do_execute(val, refs) do + nil -> {nil, true} + val -> {val, has_nils?} + end end) # Strict functions don't get applied to nils, so if it's strict and any of the arguments is nil @@ -68,10 +78,31 @@ defmodule Electric.Replication.Eval.Runner do end end - defp try_apply(%Func{implementation: impl} = func, args) do - case impl do - {module, fun} -> apply(module, fun, args) - fun -> apply(fun, args) + defp try_apply( + %Func{implementation: impl, map_over_array_in_pos: map_over_array_in_pos} = func, + args + ) do + case {impl, map_over_array_in_pos} do + {{module, fun}, nil} -> + apply(module, fun, args) + + {fun, nil} -> + apply(fun, args) + + {{module, function}, 0} -> + Utils.deep_map(hd(args), &apply(module, function, [&1 | tl(args)])) + + {function, 0} -> + Utils.deep_map(hd(args), &apply(function, [&1 | tl(args)])) + + {{module, function}, pos} -> + Utils.deep_map( + Enum.at(args, pos), + &apply(module, function, List.replace_at(args, pos, &1)) + ) + + {function, pos} -> + Utils.deep_map(Enum.at(args, pos), &apply(function, List.replace_at(args, pos, &1))) end rescue _ -> diff --git a/packages/sync-service/lib/electric/utils.ex b/packages/sync-service/lib/electric/utils.ex index 9667b50fde..f26756ecb4 100644 --- a/packages/sync-service/lib/electric/utils.ex +++ b/packages/sync-service/lib/electric/utils.ex @@ -92,6 +92,20 @@ defmodule Electric.Utils do end end + @doc """ + Apply a function to each element of an enumerable, recursively if the element is an enumerable itself. + + ## Examples + + iex> deep_map([1, [2, [3]], 4], &(&1 * 2)) + [2, [4, [6]], 8] + """ + @spec deep_map(Enumerable.t(elem), (elem -> result)) :: list(result) + when elem: var, result: var + def deep_map(enum, fun) when is_function(fun, 1) do + Enum.map(enum, &if(Enumerable.impl_for(&1), do: deep_map(&1, fun), else: fun.(&1))) + end + @doc """ Return a list of values from `enum` that are the maximal elements as calculated by the given `fun`. diff --git a/packages/sync-service/lib/pg_interop/array.ex b/packages/sync-service/lib/pg_interop/array.ex new file mode 100644 index 0000000000..e57651b910 --- /dev/null +++ b/packages/sync-service/lib/pg_interop/array.ex @@ -0,0 +1,365 @@ +defmodule PgInterop.Array do + defguardp is_space(c) when c in [?\s, ?\t, ?\n, ?\r, ?\v, ?\f] + + @doc ~S""" + Parse a Postgres string-serialized array into a list of strings, unwrapping the escapes. Parses nested arrays. + If a casting function is provided, it will be applied to each element. + + Parsing follows SOME of the same rules as the postgres parser, in particular: + 1. at most 6 nesting levels are allowed, + 2. arrays must be of uniform dimension, i.e. all sub-arrays must have the same number of elements if at the same depth. + + This implementation also breaks away from the postgres parser in that some bugs are NOT reimplemented: + - `select '{{1},{{2}}}'::text[];` yields `{{{1}},{{2}}}` in PG, we raise an error + - `select '{{{1}},{2}}'::text[];` yields `{}` in PG, we raise an error + - `select '{{{1}},{2},{{3}}}::text[];` yields `{{{1}},{{NULL}},{{3}}}` in PG, we raise an error + + ## Examples + + iex> ~S|{"(\"2023-06-15 11:18:05.372698+00\",)"}| |> parse() + [~s|("2023-06-15 11:18:05.372698+00",)|] + + iex> ~S|{"(\"2023-06-15 11:18:05.372698+00\",)","(\"2023-06-15 11:18:05.372698+00\",)"}| |> parse() + [~s|("2023-06-15 11:18:05.372698+00",)|, ~s|("2023-06-15 11:18:05.372698+00",)|] + + iex> ~S|{hello, world, null, "null"}| |> parse() + ["hello", "world", nil, "null"] + + iex> ~S|{"2023-06-15 11:18:05.372698+00",2023-06-15 11:18:05.372698+00}| |> parse(fn x -> {:ok, n, _} = DateTime.from_iso8601(x); n end) + [~U[2023-06-15 11:18:05.372698Z], ~U[2023-06-15 11:18:05.372698Z]] + + iex> ~s|{ "1" , 3 , "2" , 3 3 }| |> parse() + ["1", "3", "2", "3 3"] + + iex> ~s|{ {{1, 1}, { "2" , 2 }} ,{{"3", 3}, {4, 4} }, { {5, 5},{6, 6} }}| |> parse(&String.to_integer/1) + [[[1, 1], [2, 2]], [[3, 3], [4, 4]], [[5, 5], [6, 6]]] + + iex> ~s|{ "1" , "2" , 3 3 , , 4}| |> parse() + ** (RuntimeError) Unexpected ',' character + + iex> ~s|{ "1" , 3, "2" , 3 3 , }| |> parse() + ** (RuntimeError) Unexpected '}' character + + iex> ~s|{ {1} ,{ 2 }, {3 }} }| |> parse() + ** (RuntimeError) Invalid array syntax + + iex> ~s|{{{1} ,{ 2 }, {3 }} | |> parse() + ** (RuntimeError) Unexpected end of input + + iex> ~s|{"}| |> parse() + ** (RuntimeError) Unexpected end of input + + iex> ~s|{{1},2,{3}}| |> parse(&String.to_integer/1) + ** (RuntimeError) Unexpected array element + + iex> ~s|{{{{{{{1}}}}}}}| |> parse() + ** (RuntimeError) number of dimensions (7) exceeds maximum of 6 + + iex> ~s|{ {1} ,{ {2} }, {3 }}| |> parse() + ** (RuntimeError) Inconsistent array dimensions + + iex> ~s|{ {{1}} ,{2}, {3 }}| |> parse() + ** (RuntimeError) Inconsistent array dimensions + """ + def parse(str, casting_fun \\ & &1) + + def parse("{}", _), do: [] + + def parse(str, casting_fun) do + case parse_nested_arrays(str, casting_fun, %{cur_dim: 1}) do + {result, "", _} -> + result + + {result, rest, _} -> + if String.match?(rest, ~r/^\s$/) do + result + else + raise "Invalid array syntax" + end + end + end + + defp parse_nested_arrays(<> <> rest, fun, dim_info) when is_space(c), + do: parse_nested_arrays(rest, fun, dim_info) + + defp parse_nested_arrays(_, _, %{cur_dim: dim}) when dim > 6, + do: raise("number of dimensions (#{dim}) exceeds maximum of 6") + + defp parse_nested_arrays(_, _, %{cur_dim: dim, max_dim: max_dim}) when dim > max_dim, + do: raise("Inconsistent array dimensions") + + defp parse_nested_arrays("{" <> rest, fun, %{cur_dim: dim} = dim_info) do + # we're in an array, need to parse all the elements at this level + case String.trim_leading(rest) do + "" -> + raise "Unexpected end of input" + + "{" <> _ = rest -> + parse_all_nested_arrays(rest, fun, [], 0, dim_info) + + _ -> + # If we know max dimension but see a non-array element, before that, we know it's inconsistent + if is_map_key(dim_info, :max_dim) and dim_info.max_dim > dim do + raise "Inconsistent array dimensions" + end + + {result, rest, dim_size} = parse_all_elements(rest, fun) + + # If we've been at this depth, validate that new array is consistent with the previous ones, + # if not, save it + case Map.fetch(dim_info, dim) do + {:ok, ^dim_size} -> + {result, rest, dim_info} + + :error -> + {result, rest, Map.put(dim_info, dim, dim_size)} + + {:ok, _} -> + raise "Inconsistent array dimensions" + end + end + end + + defp parse_nested_arrays(_, _, _), do: raise("Unexpected array element") + + defp parse_all_nested_arrays(str, fun, acc, dim_size, %{cur_dim: dim} = dim_info) do + {result, rest, dim_info} = parse_nested_arrays(str, fun, %{dim_info | cur_dim: dim + 1}) + dim_info = %{dim_info | cur_dim: dim} + + # First time we reach this branch is when we followed all open braces at the start + # of the string, so we know the maximum dimension of the array + dim_info = Map.put_new(dim_info, :max_dim, dim + 1) + + case scan_until_next_boundary(rest) do + # If next boundary is a comma, we're at the same depth, so keep parsing + {?,, rest} -> + parse_all_nested_arrays(rest, fun, [result | acc], dim_size + 1, dim_info) + + # If next boundary is a closing brace, we're done with this array, so update what we can + {?}, rest} -> + dim_size = dim_size + 1 + + # If we've been at this depth, validate that new array is consistent with the previous ones, + # if not, save it + case Map.fetch(dim_info, dim) do + {:ok, ^dim_size} -> + {Enum.reverse([result | acc]), rest, dim_info} + + :error -> + {Enum.reverse([result | acc]), rest, Map.put(dim_info, dim, dim_size)} + + {:ok, _} -> + raise "Inconsistent array dimensions" + end + end + end + + defp parse_all_elements(str, fun, acc \\ [], dim \\ 0) + defp parse_all_elements("", _, _, _), do: raise("Unexpected end of input") + + defp parse_all_elements(<> <> rest, fun, acc, dim) when is_space(c), + do: parse_all_elements(rest, fun, acc, dim) + + defp parse_all_elements(str, fun, acc, dim) do + {type, {elem, rest}} = scan_next_element(str) + + case scan_until_next_boundary(rest) do + {?,, rest} -> parse_all_elements(rest, fun, [apply_fun(type, elem, fun) | acc], dim + 1) + {?}, rest} -> {Enum.reverse([apply_fun(type, elem, fun) | acc]), rest, dim + 1} + end + end + + defp scan_next_element(<> <> _), do: raise("Unexpected '{' character") + defp scan_next_element(<> <> rest), do: {:quoted, scan_until_quote(rest, "")} + defp scan_next_element(rest), do: {:unquoted, scan_until_comma_or_end(rest, "", "")} + + defp scan_until_quote("", _), do: raise("Unexpected end of input") + defp scan_until_quote(<> <> rest, acc), do: {acc, rest} + defp scan_until_quote(~S'\"' <> str, acc), do: scan_until_quote(str, acc <> ~S'"') + defp scan_until_quote(~S'\\' <> str, acc), do: scan_until_quote(str, acc <> ~S'\\') + defp scan_until_quote(<> <> str, acc), do: scan_until_quote(str, acc <> <>) + + defp scan_until_comma_or_end("", _, _), do: raise("Unexpected end of input") + + defp scan_until_comma_or_end(<> <> _, "", _) when c in [?,, ?}], + do: raise("Unexpected '#{[c]}' character") + + defp scan_until_comma_or_end("}" <> _ = rest, acc, _acc_whitespace), do: {acc, rest} + defp scan_until_comma_or_end(<> <> _ = str, acc, _acc_whitespace), do: {acc, str} + + defp scan_until_comma_or_end(<> <> str, "", "") when is_space(c), + do: scan_until_comma_or_end(str, "", "") + + defp scan_until_comma_or_end(<> <> str, acc, acc_whitespace) when is_space(c), + do: scan_until_comma_or_end(str, acc, acc_whitespace <> <>) + + defp scan_until_comma_or_end(<> <> str, acc, acc_whitespace), + do: scan_until_comma_or_end(str, acc <> acc_whitespace <> <>, "") + + defp scan_until_next_boundary(""), do: raise("Unexpected end of input") + defp scan_until_next_boundary(<> <> rest) when c in [?,, ?}], do: {c, rest} + + defp scan_until_next_boundary(<> <> rest) when is_space(c), + do: scan_until_next_boundary(rest) + + defp scan_until_next_boundary(<> <> _), do: raise("Unexpected '#{[c]}' character") + + defp apply_fun(:quoted, elem, fun), do: fun.(elem) + + defp apply_fun(:unquoted, elem, fun) do + if String.downcase(elem) == "null", do: nil, else: fun.(elem) + end + + @doc ~S""" + Serialize a list of strings into a postgres string-serialized array into a list of strings, wrapping the contents + + ## Examples + + iex> [~s|("2023-06-15 11:18:05.372698+00",)|] |> serialize() + ~S|{"(\"2023-06-15 11:18:05.372698+00\",)"}| + + iex> [~s|("2023-06-15 11:18:05.372698+00",)|, ~s|("2023-06-15 11:18:05.372698+00",)|] |> serialize() + ~S|{"(\"2023-06-15 11:18:05.372698+00\",)","(\"2023-06-15 11:18:05.372698+00\",)"}| + + iex> str = ~S|{"(\"2023-06-15 11:18:05.372698+00\",)","(\"2023-06-15 11:18:05.372698+00\",)"}| + iex> str |> parse() |> serialize() + str + """ + def serialize(array, quote_char \\ ?") when is_list(array) do + array + |> Enum.map_join(",", fn + nil -> "null" + val when is_binary(val) -> val |> String.replace(~S|"|, ~S|\"|) |> enclose(<>) + end) + |> enclose("{", "}") + end + + defp enclose(str, left, right \\ nil) do + left <> str <> (right || left) + end + + @doc """ + Access a slice or index of a postgres array. + + ## Examples + + iex> ~S|{1,2,3,4,5}| |> parse(&String.to_integer/1) |> slice_access([{:slice, nil, 3}]) + [1, 2, 3] + + iex> ~S|{1,2,3,4,5}| |> parse(&String.to_integer/1) |> slice_access([{:slice, 3, nil}]) + [3, 4, 5] + + iex> ~S|{1,2,3,4,5}| |> parse(&String.to_integer/1) |> slice_access([{:slice, 3, 4}]) + [3, 4] + + iex> ~S|{{1,2},{3,4}}| |> parse(&String.to_integer/1) |> slice_access([{:slice, nil, nil}, {:index, 2}]) + [[1, 2], [3, 4]] + + iex> ~S|{{1,2},{3,4}}| |> parse(&String.to_integer/1) |> slice_access([{:slice, nil, nil}, {:slice, 2, 2}]) + [[2], [4]] + + iex> ~S|{{1,2},{3,4}}| |> parse(&String.to_integer/1) |> slice_access([{:slice, nil, nil}, {:slice, -1, 1}]) + [[1], [3]] + + iex> ~S|{{1,2},{3,4}}| |> parse(&String.to_integer/1) |> slice_access([{:slice, nil, nil}, {:slice, 1, -1}]) + [] + """ + @spec slice_access( + list(), + list({:slice, nil | integer(), nil | integer()} | {:index, integer()}) + ) :: list() + def slice_access(array, instructions) do + do_slice_access(array, instructions) + catch + :out_of_bounds -> [] + end + + defp do_slice_access(elem, [_ | _]) when not is_list(elem), do: throw(:out_of_bounds) + defp do_slice_access(array, []), do: array + + defp do_slice_access(array, [{:slice, nil, nil} | rest]), + do: Enum.map(array, &do_slice_access(&1, rest)) + + defp do_slice_access(_array, [{:slice, lower_idx, upper_idx} | _]) + when is_integer(lower_idx) and is_integer(upper_idx) and + (lower_idx > upper_idx or upper_idx < 1), + do: throw(:out_of_bounds) + + defp do_slice_access(array, [{:slice, lower_idx, upper_idx} | rest]), + do: + array + |> Enum.slice((normalize_idx(lower_idx) || 0)..(normalize_idx(upper_idx) || -1)//1) + |> Enum.map(&do_slice_access(&1, rest)) + + defp do_slice_access(array, [{:index, idx} | rest]), + do: do_slice_access(array, [{:slice, 1, idx} | rest]) + + @doc """ + Access an index of a postgres array. If the index is out of bounds or array has more dimensions than the indices provided, returns `nil`. + + ## Examples + + iex> ~S|{1,2,3,4,5}| |> parse(&String.to_integer/1) |> index_access([{:index, 3}]) + 3 + + iex> ~S|{{1,2},{3,4}}| |> parse(&String.to_integer/1) |> index_access([{:index, 2}, {:index, 1}]) + 3 + + iex> ~S|{{1,2},{3,4}}| |> parse(&String.to_integer/1) |> index_access([{:index, 3}]) + nil + """ + @spec index_access(list(), list({:index, integer()})) :: list() + def index_access(array, list_of_indices) do + Enum.reduce_while(list_of_indices, array, fn + _, nil -> {:halt, nil} + {:index, idx}, _acc when idx < 1 -> {:halt, nil} + {:index, idx}, acc -> {:cont, Enum.at(acc, idx - 1)} + end) + |> case do + [] -> nil + result -> result + end + end + + defp normalize_idx(nil), do: nil + defp normalize_idx(pg_index) when pg_index < 1, do: 0 + defp normalize_idx(pg_index), do: pg_index - 1 + + def concat_arrays(arr1, []), do: arr1 + def concat_arrays([], arr2), do: arr2 + + def concat_arrays(arr1, arr2) do + case {get_array_dim(arr1), get_array_dim(arr2)} do + {d1, d1} -> arr1 ++ arr2 + {d1, d2} when d2 - d1 == 1 -> [arr1 | arr2] + {d1, d2} when d1 - d2 == 1 -> arr1 ++ [arr2] + {d1, d2} -> raise "Incompatible array dimensions: #{d1} and #{d2}" + end + end + + @doc """ + Get the dimension of a postgres array. + + ## Examples + + iex> ~S|{}| |> parse() |> get_array_dim() + nil + + iex> ~S|{1,2,3,4,5}| |> parse() |> get_array_dim() + 1 + + iex> ~S|{{1,2},{3,4}}| |> parse() |> get_array_dim() + 2 + """ + @spec get_array_dim(list()) :: non_neg_integer() + def get_array_dim(arr, dim \\ 0) + def get_array_dim([], _), do: nil + def get_array_dim([hd | _], dim), do: get_array_dim(hd, dim + 1) + def get_array_dim(_, dim), do: dim + + def array_prepend(elem, []), do: [elem] + def array_prepend(elem, [hd | tl]) when not is_list(hd), do: [elem, hd | tl] + + def array_append([], elem), do: [elem] + def array_append([hd | _] = list, elem) when not is_list(hd), do: list ++ [elem] +end diff --git a/packages/sync-service/test/electric/replication/eval/parser_test.exs b/packages/sync-service/test/electric/replication/eval/parser_test.exs index eb0ee8c479..93125cc64b 100644 --- a/packages/sync-service/test/electric/replication/eval/parser_test.exs +++ b/packages/sync-service/test/electric/replication/eval/parser_test.exs @@ -459,6 +459,88 @@ defmodule Electric.Replication.Eval.ParserTest do assert {:error, "At location 2: argument of IS TRUE must be bool, not int4"} = Parser.parse_and_validate_expression(~S|1 IS TRUE|, %{}, env) end + + test "should parse array constants" do + # TODO: Does not support arbitrary bounds input syntax yet, + # e.g. '[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}'::int[] + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S|'{{1 },{2},{"3"}}'::int[]|) + + assert %Const{value: [[1], [2], [3]], type: {:array, :int4}} = result + + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S|ARRAY[ARRAY[1, 2], ARRAY['3', 2 + 2]]|) + + assert %Const{value: [[1, 2], [3, 4]], type: {:array, :int4}} = result + + assert {:error, _} = + Parser.parse_and_validate_expression(~S|ARRAY[1, ARRAY['3', 2 + 2]]|) + end + + test "should recast a nested array" do + # as-is recast + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S|('{{1},{2},{"3"}}'::int[])::bigint[]|) + + assert %Const{value: [[1], [2], [3]], type: {:array, :int8}} = result + + # with a cast function + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S|('{{1},{2},{"3"}}'::text[])::bigint[]|) + + assert %Const{value: [[1], [2], [3]], type: {:array, :int8}} = result + end + + test "should work with array access" do + # Including mixed notation, float constants, and text castable to ints + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S|('{1,2,3}'::int[])[1][1:'2'][2.2:2.3][:]|) + + assert %Const{value: [], type: {:array, :int4}} = result + + # Returns NULL if any of indices are NULL + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression( + ~S|('{1,2,3}'::int[])[1][1:'2'][2.2:2.3][:][NULL:NULL]| + ) + + assert %Const{value: nil, type: {:array, :int4}} = result + + # Also works when there are no slices + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression( + ~S|('{{{1}},{{2}},{{3}}}'::int[])[1]['1'][1.4]| + ) + + assert %Const{value: 1, type: :int4} = result + + # And correctly works with expressions as indices + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S|('{{1},{2},{3}}'::int[])[2][2 - 1]|) + + assert %Const{value: 2, type: :int4} = result + end + + test "should support array ANY/ALL" do + assert {:error, "At location 9: argument of ANY must be an array"} = + Parser.parse_and_validate_expression(~S|3 > ANY (3)|) + + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S|3 > ANY ('{1, 2, 3}')|) + + assert %Const{value: true, type: :bool} = result + + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S|1::bigint = ANY ('{1,2}'::int[])|) + + assert %Const{value: true, type: :bool} = result + + # Including implicit casts and nested arrays + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S|4.1 > ALL ('{{1}, {2}, {3}}'::int[])|) + + assert %Const{value: true, type: :bool} = result + end end describe "parse_and_validate_expression/3 default env" do @@ -471,5 +553,48 @@ defmodule Electric.Replication.Eval.ParserTest do assert {:ok, _} = Parser.parse_and_validate_expression(~S|id <= 1|, %{["id"] => :int8}) assert {:ok, _} = Parser.parse_and_validate_expression(~S|id = 1|, %{["id"] => :int8}) end + + test "implements common array operators: @>, <@, &&, ||" do + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S|'{1,2,3}'::int[] @> '{2,1,2}'|) + + assert %Const{value: true, type: :bool} = result + + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S|'{1,2,3}'::int[] <@ '{1,2,2}'::int[]|) + + assert %Const{value: false, type: :bool} = result + + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S|'{1,2,1}'::int[] && '{2,3,4}'::int[]|) + + assert %Const{value: true, type: :bool} = result + + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S"'{1,2,1}'::int[] || '{2,3,4}'") + + assert %Const{value: [1, 2, 1, 2, 3, 4], type: {:array, :int4}} = result + + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S"('1'::bigint || '{2,3,4}'::int[]) || 5") + + assert %Const{value: [1, 2, 3, 4, 5], type: {:array, :int8}} = result + + assert {:ok, %Expr{eval: result}} = + Parser.parse_and_validate_expression(~S"array_ndims('{{1,2,3},{4,5,6}}')") + + assert %Const{value: 2, type: :int4} = result + end + + test "does correct operator inference for array polymorphic types" do + assert {:error, "At location 17: Could not select an operator overload"} = + Parser.parse_and_validate_expression(~S|'{1,2,3}'::int[] @> '{2,1,2}'::bigint[]|) + + assert {:error, "At location 17: Could not select an operator overload"} = + Parser.parse_and_validate_expression(~S|'{1,2,3}'::int[] @> '{2,1,2}'::text|) + + assert {:error, "At location 17: Could not select an operator overload"} = + Parser.parse_and_validate_expression(~S/'{1,2,3}'::int[] || '{2,1,2}'::text[]/) + end end end diff --git a/packages/sync-service/test/electric/replication/eval/runner_test.exs b/packages/sync-service/test/electric/replication/eval/runner_test.exs index 37572a2a1f..4795c749e4 100644 --- a/packages/sync-service/test/electric/replication/eval/runner_test.exs +++ b/packages/sync-service/test/electric/replication/eval/runner_test.exs @@ -61,5 +61,50 @@ defmodule Electric.Replication.Eval.RunnerTest do |> Parser.parse_and_validate_expression!(%{["test"] => :int4}) |> Runner.execute(%{["test"] => "test"}) end + + test "should work with array types" do + assert {:ok, [[1, 2], [3, 4]]} = + ~S|ARRAY[ARRAY[1, x], ARRAY['3', 2 + 2]]| + |> Parser.parse_and_validate_expression!(%{["x"] => :int4}) + |> Runner.execute(%{["x"] => 2}) + + assert {:ok, true} = + ~S|x @> ARRAY[y]| + |> Parser.parse_and_validate_expression!(%{ + ["x"] => {:array, :int4}, + ["y"] => :int4 + }) + |> Runner.execute(%{["y"] => 1, ["x"] => [1, 2]}) + + assert {:ok, true} = + ~S|x::float[] = y::int4[]::float[]| + |> Parser.parse_and_validate_expression!(%{ + ["x"] => {:array, :int4}, + ["y"] => :text + }) + |> Runner.execute(%{["y"] => "{1,2}", ["x"] => [1, 2]}) + + assert {:ok, true} = + ~S|y = ANY (x)| + |> Parser.parse_and_validate_expression!(%{ + ["x"] => {:array, :int4}, + ["y"] => :int8 + }) + |> Runner.execute(%{["y"] => 1, ["x"] => [1, 2]}) + + assert {:ok, [[1, 2], [3, 4]]} = + ~S/(ARRAY[1] || ARRAY[2]) || x/ + |> Parser.parse_and_validate_expression!(%{ + ["x"] => {:array, :float8} + }) + |> Runner.execute(%{["x"] => [[3, 4]]}) + + assert {:error, _} = + ~S/(ARRAY[1] || ARRAY[2]) || x/ + |> Parser.parse_and_validate_expression!(%{ + ["x"] => {:array, :int4} + }) + |> Runner.execute(%{["x"] => [[[3, 4]]]}) + end end end diff --git a/packages/sync-service/test/pg_interop/array_test.exs b/packages/sync-service/test/pg_interop/array_test.exs new file mode 100644 index 0000000000..5b1163ef5d --- /dev/null +++ b/packages/sync-service/test/pg_interop/array_test.exs @@ -0,0 +1,5 @@ +defmodule PgInterop.ArrayTest do + use ExUnit.Case + + doctest PgInterop.Array, import: true, only: [parse: 2] +end