Skip to content

Commit

Permalink
Merge pull request #291 from rabbitmq/md/optimize-transaction-functions
Browse files Browse the repository at this point in the history
khepri_tx_adv: Prefer list prepending to build side effect lists
  • Loading branch information
dumbbell authored Aug 29, 2024
2 parents 0db32fd + b750381 commit b802393
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 41 deletions.
65 changes: 37 additions & 28 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@
split_query_options/1,
split_command_options/1,
split_put_options/1,
insert_or_update_node/5,
delete_matching_nodes/3,
insert_or_update_node/6,
delete_matching_nodes/4,
handle_tx_exception/1,
process_query/3,
process_command/3]).
Expand Down Expand Up @@ -1295,13 +1295,13 @@ apply(
State) ->
{TreeOptions, PutOptions} = split_put_options(TreeAndPutOptions),
Ret = insert_or_update_node(
State, PathPattern, Payload, PutOptions, TreeOptions),
State, PathPattern, Payload, PutOptions, TreeOptions, []),
post_apply(Ret, Meta);
apply(
Meta,
#delete{path = PathPattern, options = TreeOptions},
State) ->
Ret = delete_matching_nodes(State, PathPattern, TreeOptions),
Ret = delete_matching_nodes(State, PathPattern, TreeOptions, []),
post_apply(Ret, Meta);
apply(
Meta,
Expand Down Expand Up @@ -1709,54 +1709,61 @@ failed_to_locate_sproc(Reason) ->
khepri_tx:abort(Reason).

-spec insert_or_update_node(
State, PathPattern, Payload, PutOptions, TreeOptions) -> Ret when
State, PathPattern, Payload, PutOptions, TreeOptions, SideEffects) ->
Ret when
State :: state(),
PathPattern :: khepri_path:native_pattern(),
Payload :: khepri_payload:payload(),
PutOptions :: khepri:put_options(),
TreeOptions :: khepri:tree_options(),
Ret :: {State, Result} | {State, Result, ra_machine:effects()},
SideEffects :: ra_machine:effects(),
Ret :: {State, Result, ra_machine:effects()},
Result :: khepri_machine:common_ret().
%% @private

insert_or_update_node(State, PathPattern, Payload, PutOptions, TreeOptions) ->
insert_or_update_node(
State, PathPattern, Payload, PutOptions, TreeOptions, SideEffects) ->
Tree = get_tree(State),
Ret1 = khepri_tree:insert_or_update_node(
Tree, PathPattern, Payload, PutOptions, TreeOptions),
case Ret1 of
{ok, Tree1, AppliedChanges, Ret2} ->
State1 = set_tree(State, Tree1),
{State2, SideEffects} = create_tree_change_side_effects(
State, State1, Ret2, AppliedChanges),
{State2, {ok, Ret2}, SideEffects};
{State2, SideEffects1} = add_tree_change_side_effects(
State, State1, Ret2, AppliedChanges,
SideEffects),
{State2, {ok, Ret2}, SideEffects1};
Error ->
{State, Error}
{State, Error, SideEffects}
end.

-spec delete_matching_nodes(State, PathPattern, TreeOptions) -> Ret when
-spec delete_matching_nodes(State, PathPattern, TreeOptions, SideEffects) ->
Ret when
State :: state(),
PathPattern :: khepri_path:native_pattern(),
TreeOptions :: khepri:tree_options(),
Ret :: {State, Result} | {State, Result, ra_machine:effects()},
SideEffects :: ra_machine:effects(),
Ret :: {State, Result, ra_machine:effects()},
Result :: khepri_machine:common_ret().
%% @private

delete_matching_nodes(State, PathPattern, TreeOptions) ->
delete_matching_nodes(State, PathPattern, TreeOptions, SideEffects) ->
Tree = get_tree(State),
Ret = khepri_tree:delete_matching_nodes(
Tree, PathPattern, #{}, TreeOptions),
case Ret of
{ok, Tree1, AppliedChanges, Ret2} ->
State1 = set_tree(State, Tree1),
{State2, SideEffects} = create_tree_change_side_effects(
State, State1, Ret2, AppliedChanges),
{State2, {ok, Ret2}, SideEffects};
{State2, SideEffects1} = add_tree_change_side_effects(
State, State1, Ret2, AppliedChanges,
SideEffects),
{State2, {ok, Ret2}, SideEffects1};
Error ->
{State, Error}
{State, Error, SideEffects}
end.

create_tree_change_side_effects(
InitialState, NewState, Ret, KeepWhileAftermath) ->
add_tree_change_side_effects(
InitialState, NewState, Ret, KeepWhileAftermath, SideEffects) ->
%% We make a map where for each affected tree node, we indicate the type
%% of change.
Changes0 = maps:merge(Ret, KeepWhileAftermath),
Expand All @@ -1766,11 +1773,13 @@ create_tree_change_side_effects(
(_, #{} = _NodeProps) -> update;
(_, delete) -> delete
end, Changes0),
ProjectionEffects = create_projection_side_effects(
InitialState, NewState, Changes),
{NewState1, TriggerEffects} = create_trigger_side_effects(
InitialState, NewState, Changes),
{NewState1, ProjectionEffects ++ TriggerEffects}.
NewSideEffects = create_projection_side_effects(
InitialState, NewState, Changes),
{NewState1, NewSideEffects1} = add_trigger_side_effects(
InitialState, NewState, Changes,
NewSideEffects),
SideEffects1 = lists:reverse(NewSideEffects1, SideEffects),
{NewState1, SideEffects1}.

create_projection_side_effects(InitialState, NewState, Changes) ->
InitialTree = get_tree(InitialState),
Expand Down Expand Up @@ -1869,15 +1878,15 @@ evaluate_projection(
Effect = {aux, Trigger},
[Effect | Effects].

create_trigger_side_effects(InitialState, NewState, Changes) ->
add_trigger_side_effects(InitialState, NewState, Changes, SideEffects) ->
%% We want to consider the new state (with the updated tree), but we want
%% to use triggers from the initial state, in case they were updated too.
%% In other words, we want to evaluate triggers in the state they were at
%% the time the change to the tree was requested.
Triggers = get_triggers(InitialState),
case Triggers =:= #{} of
true ->
{NewState, []};
{NewState, SideEffects};
false ->
EmittedTriggers = get_emitted_triggers(InitialState),
#config{store_id = StoreId} = get_config(NewState),
Expand All @@ -1903,7 +1912,7 @@ create_trigger_side_effects(InitialState, NewState, Changes) ->
khepri_event_handler,
handle_triggered_sprocs,
[StoreId, TriggeredStoredProcs]},
{NewState1, [SideEffect]}
{NewState1, [SideEffect | SideEffects]}
end.

list_triggered_sprocs(Tree, Changes, Triggers) ->
Expand Down
25 changes: 12 additions & 13 deletions src/khepri_tx_adv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,10 @@ put_many(PathPattern, Data, Options) ->
{TreeOptions, PutOptions} =
khepri_machine:split_put_options(TreeAndPutOptions),
%% TODO: Ensure `CommandOptions' is unset.
Fun = fun(State) ->
Fun = fun(State, SideEffects) ->
khepri_machine:insert_or_update_node(
State, PathPattern1, Payload1, PutOptions, TreeOptions)
State, PathPattern1, Payload1, PutOptions, TreeOptions,
SideEffects)
end,
handle_state_for_call(Fun).

Expand Down Expand Up @@ -421,9 +422,9 @@ delete_many(PathPattern, Options) ->
khepri_machine:split_command_options(Options),
%% TODO: Ensure `CommandOptions' is empty and `TreeOptions' doesn't
%% contains put options.
Fun = fun(State) ->
Fun = fun(State, SideEffects) ->
khepri_machine:delete_matching_nodes(
State, PathPattern1, TreeOptions)
State, PathPattern1, TreeOptions, SideEffects)
end,
handle_state_for_call(Fun).

Expand Down Expand Up @@ -985,7 +986,10 @@ run(State, StandaloneFun, Args, AllowUpdates)
NewTxProps = erlang:erase(?TX_PROPS),
khepri_machine:ensure_is_state(NewState),
?assertEqual(TxProps, NewTxProps),
{NewState, Ret, NewSideEffects}
%% The side effect list is built using prepends so the list needs to
%% be reversed to process the effects in order.
NewSideEffects1 = lists:reverse(NewSideEffects),
{NewState, Ret, NewSideEffects1}
catch
Class:Reason:Stacktrace ->
_ = erlang:erase(?TX_STATE_KEY),
Expand All @@ -996,14 +1000,9 @@ run(State, StandaloneFun, Args, AllowUpdates)

handle_state_for_call(Fun) ->
{State, SideEffects} = get_tx_state(),
case Fun(State) of
{NewState, Ret, NewSideEffects} ->
set_tx_state(NewState, SideEffects ++ NewSideEffects),
?raise_exception_if_any(Ret);
{NewState, Ret} ->
set_tx_state(NewState, SideEffects),
?raise_exception_if_any(Ret)
end.
{NewState, Ret, SideEffects1} = Fun(State, SideEffects),
set_tx_state(NewState, SideEffects1),
?raise_exception_if_any(Ret).

-spec get_tx_state() -> {State, SideEffects} when
State :: khepri_machine:state(),
Expand Down
52 changes: 52 additions & 0 deletions test/projections.erl
Original file line number Diff line number Diff line change
Expand Up @@ -801,3 +801,55 @@ old_unregister_projection_command_accepted_test() ->
?assertMatch(
{_S1, {ok, #{}}, _SE},
khepri_machine:apply(?META, Command, S0)).

trigger_projection_via_a_transaction_test_() ->
ProjectFun = fun(Path, Payload) -> {Path, Payload} end,
PathPattern = [stock, wood, <<"oak">>],
Data1 = 100,
Data2 = 200,
{setup,
fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end,
fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
[{inorder,
[{"Register the projection",
?_test(
begin
Projection = khepri_projection:new(?MODULE, ProjectFun),
?assertEqual(
ok,
khepri:register_projection(
?FUNCTION_NAME, PathPattern, Projection))
end)},

{"Trigger the projection with a non-transaction command",
?_assertEqual(
ok,
khepri:put(
?FUNCTION_NAME, PathPattern, Data1))},

{"The projection contains the triggered change",
?_assertEqual(Data1, ets:lookup_element(?MODULE, PathPattern, 2))},

{"Trigger the projection with a transaction",
?_assertEqual(
{ok, ok},
khepri:transaction(
?FUNCTION_NAME,
fun() ->
%% Issue a delete and then a put with the new data. This
%% case checks that the side effects which trigger
%% projections are accrued in the correct order within a
%% transaction.
%%
%% If the effects were accrued in the wrong order, the
%% next assertion would fail: if the effect(s) for delete
%% were processed after the put, the record would not
%% exist in the table. Instead it should exist and be
%% updated to `Data2'.
ok = khepri_tx:delete(PathPattern),
ok = khepri_tx:put(PathPattern, Data2)
end))},

{"The projection contains the triggered change",
?_assertEqual(Data2, ets:lookup_element(?MODULE, PathPattern, 2))}]
}]}.

0 comments on commit b802393

Please sign in to comment.