Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add button for reevaluating apps on change and add retry for errors #2066

Merged
merged 2 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/livebook/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2182,8 +2182,8 @@ defmodule Livebook.Session do
state
end

defp handle_action(state, {:clean_up_input_values, input_values}) do
for {_input_id, value} <- input_values do
defp handle_action(state, {:clean_up_input_values, input_infos}) do
for {_input_id, %{value: value}} <- input_infos do
case value do
value when is_file_input_value(value) ->
schedule_file_deletion(state, value.file_ref)
Expand Down
125 changes: 72 additions & 53 deletions lib/livebook/session/data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule Livebook.Session.Data do
:persistence_warnings,
:section_infos,
:cell_infos,
:input_values,
:input_infos,
:bin_entries,
:runtime,
:smart_cell_definitions,
Expand All @@ -52,7 +52,7 @@ defmodule Livebook.Session.Data do
dirty: boolean(),
section_infos: %{Section.id() => section_info()},
cell_infos: %{Cell.id() => cell_info()},
input_values: %{input_id() => term()},
input_infos: %{input_id() => input_info()},
bin_entries: list(cell_bin_entry()),
runtime: Runtime.t(),
smart_cell_definitions: list(Runtime.smart_cell_definition()),
Expand Down Expand Up @@ -108,8 +108,8 @@ defmodule Livebook.Session.Data do
evaluation_end: DateTime.t() | nil,
evaluation_number: non_neg_integer(),
outputs_batch_number: non_neg_integer(),
bound_to_input_ids: MapSet.t(input_id()),
new_bound_to_input_ids: MapSet.t(input_id()),
bound_to_inputs: %{input_id() => input_hash()},
new_bound_to_inputs: %{input_id() => input_hash()},
identifiers_used: list(identifier :: term()) | :unknown,
identifiers_defined: %{(identifier :: term()) => version :: term()},
data: t()
Expand All @@ -132,6 +132,8 @@ defmodule Livebook.Session.Data do

@type input_id :: String.t()

@type input_info :: %{value: term(), hash: input_hash()}

@type client :: {User.id(), client_id()}

@type client_id :: Livebook.Utils.id()
Expand All @@ -144,6 +146,8 @@ defmodule Livebook.Session.Data do
# got stale.
@type snapshot :: term()

@type input_hash :: term()

@type input_reading :: {input_id(), input_value :: term()}

@type secrets :: %{(name :: String.t()) => Secret.t()}
Expand Down Expand Up @@ -232,7 +236,7 @@ defmodule Livebook.Session.Data do
| {:set_smart_cell_parents, Cell.t(), Section.t(),
parent :: {Cell.t(), Section.t()} | nil}
| {:report_delta, client_id(), Cell.t(), cell_source_tag(), Delta.t()}
| {:clean_up_input_values, %{input_id() => term()}}
| {:clean_up_input_values, %{input_id() => input_info()}}
| :app_report_status
| :app_recover
| :app_terminate
Expand Down Expand Up @@ -290,7 +294,7 @@ defmodule Livebook.Session.Data do
persistence_warnings: [],
section_infos: initial_section_infos(notebook),
cell_infos: initial_cell_infos(notebook),
input_values: initial_input_values(notebook),
input_infos: initial_input_infos(notebook),
bin_entries: [],
runtime: default_runtime,
smart_cell_definitions: [],
Expand Down Expand Up @@ -322,14 +326,14 @@ defmodule Livebook.Session.Data do
do: {cell.id, new_cell_info(cell, %{})}
end

defp initial_input_values(notebook) do
defp initial_input_infos(notebook) do
for section <- Notebook.all_sections(notebook),
cell <- section.cells,
Cell.evaluable?(cell),
output <- cell.outputs,
attrs <- Cell.find_inputs_in_output(output),
into: %{},
do: {attrs.id, attrs.default}
do: {attrs.id, input_info(attrs.default)}
end

@doc """
Expand Down Expand Up @@ -451,7 +455,7 @@ defmodule Livebook.Session.Data do
data
|> with_actions()
|> delete_section(section, delete_cells)
|> garbage_collect_input_values()
|> garbage_collect_input_infos()
|> update_validity_and_evaluation()
|> update_smart_cell_bases(data)
|> set_dirty()
Expand All @@ -467,7 +471,7 @@ defmodule Livebook.Session.Data do
data
|> with_actions()
|> delete_cell(cell, section)
|> garbage_collect_input_values()
|> garbage_collect_input_infos()
|> update_validity_and_evaluation()
|> update_smart_cell_bases(data)
|> set_dirty()
Expand Down Expand Up @@ -556,7 +560,7 @@ defmodule Livebook.Session.Data do
data
|> with_actions()
|> add_cell_output(cell, output)
|> garbage_collect_input_values()
|> garbage_collect_input_infos()
|> mark_dirty_if_persisting_outputs()
|> wrap_ok()
else
Expand All @@ -571,7 +575,7 @@ defmodule Livebook.Session.Data do
|> with_actions()
|> add_cell_output(cell, output)
|> finish_cell_evaluation(cell, section, metadata)
|> garbage_collect_input_values()
|> garbage_collect_input_infos()
|> update_validity_and_evaluation()
|> update_smart_cell_bases(data)
|> mark_dirty_if_persisting_outputs()
Expand All @@ -595,9 +599,10 @@ defmodule Livebook.Session.Data do
def apply_operation(data, {:bind_input, _client_id, cell_id, input_id}) do
with {:ok, cell, _section} <- Notebook.fetch_cell_and_section(data.notebook, cell_id),
Cell.evaluable?(cell),
:evaluating <- data.cell_infos[cell.id].eval.status,
true <- Map.has_key?(data.input_values, input_id),
false <- MapSet.member?(data.cell_infos[cell.id].eval.new_bound_to_input_ids, input_id) do
cell_info <- data.cell_infos[cell.id],
:evaluating <- cell_info.eval.status,
true <- Map.has_key?(cell_info.eval.data.input_infos, input_id),
false <- Map.has_key?(cell_info.eval.new_bound_to_inputs, input_id) do
data
|> with_actions()
|> bind_input(cell, input_id)
Expand Down Expand Up @@ -714,7 +719,7 @@ defmodule Livebook.Session.Data do
data
|> with_actions()
|> erase_outputs()
|> garbage_collect_input_values()
|> garbage_collect_input_infos()
|> update_smart_cell_bases(data)
|> wrap_ok()
end
Expand Down Expand Up @@ -821,7 +826,7 @@ defmodule Livebook.Session.Data do
end

def apply_operation(data, {:set_input_value, _client_id, input_id, value}) do
with true <- Map.has_key?(data.input_values, input_id) do
with true <- Map.has_key?(data.input_infos, input_id) do
data
|> with_actions()
|> set_input_value(input_id, value)
Expand Down Expand Up @@ -1208,17 +1213,17 @@ defmodule Livebook.Session.Data do
defp add_cell_output({data, _} = data_actions, cell, output) do
{[indexed_output], _counter} = Notebook.index_outputs([output], 0)

new_input_values =
new_input_infos =
indexed_output
|> Cell.find_inputs_in_output()
|> Map.new(fn attrs -> {attrs.id, attrs.default} end)
|> Map.new(fn attrs -> {attrs.id, input_info(attrs.default)} end)

{data, _} =
data_actions =
data_actions
|> set!(
notebook: Notebook.add_cell_output(data.notebook, cell.id, output),
input_values: Map.merge(new_input_values, data.input_values)
input_infos: Map.merge(new_input_infos, data.input_infos)
)

if data.cell_infos[cell.id].eval.status == :evaluating do
Expand All @@ -1228,7 +1233,7 @@ defmodule Livebook.Session.Data do

data_actions
|> update_cell_eval_info!(cell.id, fn eval_info ->
update_in(eval_info.data.input_values, &Map.merge(new_input_values, &1))
update_in(eval_info.data.input_infos, &Map.merge(new_input_infos, &1))
end)
else
data_actions
Expand All @@ -1247,7 +1252,7 @@ defmodule Livebook.Session.Data do
evaluation_time_ms: metadata.evaluation_time_ms,
identifiers_used: metadata.identifiers_used,
identifiers_defined: metadata.identifiers_defined,
bound_to_input_ids: eval_info.new_bound_to_input_ids,
bound_to_inputs: eval_info.new_bound_to_inputs,
evaluation_end: DateTime.utc_now(),
code_markers: metadata.code_markers
}
Expand Down Expand Up @@ -1440,7 +1445,7 @@ defmodule Livebook.Session.Data do
evaluation_number: eval_info.evaluation_number + 1,
outputs_batch_number: eval_info.outputs_batch_number + 1,
evaluation_digest: info.sources.primary.digest,
new_bound_to_input_ids: MapSet.new(),
new_bound_to_inputs: %{},
# Keep the notebook state before evaluation
data: data,
# This is a rough estimate, the exact time is measured in the
Expand All @@ -1465,9 +1470,11 @@ defmodule Livebook.Session.Data do
defp bind_input(data_actions, cell, input_id) do
data_actions
|> update_cell_eval_info!(cell.id, fn eval_info ->
hash = eval_info.data.input_infos[input_id].hash

%{
eval_info
| new_bound_to_input_ids: MapSet.put(eval_info.new_bound_to_input_ids, input_id)
| new_bound_to_inputs: Map.put(eval_info.new_bound_to_inputs, input_id, hash)
}
end)
end
Expand Down Expand Up @@ -1829,7 +1836,7 @@ defmodule Livebook.Session.Data do

defp set_input_value({data, _} = data_actions, input_id, value) do
data_actions
|> set!(input_values: Map.put(data.input_values, input_id, value))
|> set!(input_infos: Map.put(data.input_infos, input_id, input_info(value)))
end

defp set_runtime(data_actions, prev_data, runtime) do
Expand Down Expand Up @@ -1975,20 +1982,20 @@ defmodule Livebook.Session.Data do
{data, actions ++ [action]}
end

defp garbage_collect_input_values({data, _} = data_actions) do
defp garbage_collect_input_infos({data, _} = data_actions) do
if any_section_evaluating?(data) do
# Wait if evaluation is ongoing as it may render inputs
data_actions
else
used_input_ids = data.notebook |> initial_input_values() |> Map.keys()
{input_values, unused_input_values} = Map.split(data.input_values, used_input_ids)
used_input_ids = data.notebook |> initial_input_infos() |> Map.keys()
{input_infos, unused_input_infos} = Map.split(data.input_infos, used_input_ids)

if unused_input_values == %{} do
if unused_input_infos == %{} do
data_actions
else
data_actions
|> set!(input_values: input_values)
|> add_action({:clean_up_input_values, unused_input_values})
|> set!(input_infos: input_infos)
|> add_action({:clean_up_input_values, unused_input_infos})
end
end
end
Expand Down Expand Up @@ -2079,8 +2086,8 @@ defmodule Livebook.Session.Data do
evaluation_end: nil,
evaluation_number: 0,
outputs_batch_number: 0,
bound_to_input_ids: MapSet.new(),
new_bound_to_input_ids: MapSet.new(),
bound_to_inputs: %{},
new_bound_to_inputs: %{},
identifiers_used: [],
identifiers_defined: %{},
snapshot: nil,
Expand Down Expand Up @@ -2208,7 +2215,7 @@ defmodule Livebook.Session.Data do
|> Notebook.evaluable_cells_with_section()
|> Enum.filter(fn {cell, _} ->
info = data.cell_infos[cell.id]
MapSet.member?(info.eval.bound_to_input_ids, input_id)
Map.has_key?(info.eval.bound_to_inputs, input_id)
end)
end

Expand Down Expand Up @@ -2260,18 +2267,27 @@ defmodule Livebook.Session.Data do

parent_snapshots = Enum.map(parent_ids, &cell_snapshots[&1])

bound_input_values =
bound_input_current_hashes =
for(
input_id <- info.eval.bound_to_input_ids,
do: {input_id, data.input_values[input_id]}
{input_id, _hash} <- info.eval.bound_to_inputs,
current_input_info = data.input_infos[input_id],
do: {input_id, current_input_info.hash}
)
|> Enum.sort()

deps = {is_branch?, parent_snapshots, identifier_versions, bound_input_values}
deps = {is_branch?, parent_snapshots, identifier_versions, bound_input_current_hashes}

:erlang.phash2(deps)
end

defp input_info(value) do
%{value: value, hash: input_hash(value)}
end

defp input_hash(value) do
:erlang.phash2(value)
end

defp identifier_deps(cell_id, graph, data) do
info = data.cell_infos[cell_id]

Expand Down Expand Up @@ -2405,18 +2421,7 @@ defmodule Livebook.Session.Data do
|> Notebook.evaluable_cells_with_section()
|> Enum.filter(fn {cell, _section} ->
info = data.cell_infos[cell.id]

case data.mode do
:default ->
match?(
%{status: :ready, validity: :stale, reevaluates_automatically: true},
info.eval
)

:app ->
match?(%{status: :ready, validity: :stale}, info.eval) and
data.app_data.status.execution in [:executing, :executed]
end
match?(%{status: :ready, validity: :stale, reevaluates_automatically: true}, info.eval)
end)

cell_ids = for {cell, _section} <- cells_to_reevaluate, do: cell.id
Expand All @@ -2438,12 +2443,12 @@ defmodule Livebook.Session.Data do
|> Notebook.evaluable_cells_with_section()
|> Enum.find_value(:executed, fn {cell, _section} ->
case data.cell_infos[cell.id].eval do
%{status: :evaluating} -> :executing
%{status: :queued} -> :executing
%{validity: :aborted} -> :error
%{interrupted: true} -> :interrupted
%{errored: true} -> :error
%{validity: :fresh} -> :executing
%{status: :evaluating} -> :executing
%{status: :queued} -> :executing
_ -> nil
end
end)
Expand Down Expand Up @@ -2603,7 +2608,21 @@ defmodule Livebook.Session.Data do
_ -> data
end

Map.fetch(data.input_values, input_id)
with {:ok, info} <- Map.fetch(data.input_infos, input_id), do: {:ok, info.value}
end

@doc """
Returns the set of inputs which values changed since they have been
read by any of the cells.
"""
@spec changed_input_ids(t()) :: MapSet.t(input_id())
def changed_input_ids(data) do
for {_cell_id, %{eval: eval_info}} <- data.cell_infos,
{input_id, read_hash} <- eval_info.bound_to_inputs,
input_info = data.input_infos[input_id],
read_hash != input_info.hash,
into: MapSet.new(),
do: input_id
end

@doc """
Expand Down
Loading
Loading