Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce broadcast communication #71

Merged
merged 5 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion lib/kino/bridge.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Kino.Bridge do
@moduledoc false

import Kernel, except: [send: 2]

# This module encapsulates the communication with Livebook
# achieved via the group leader. For the implementation of
# that group leader see Livebook.Evaluator.IOProxy
Expand Down Expand Up @@ -86,11 +88,33 @@ defmodule Kino.Bridge do
end
end

@doc """
Broadcasts the given message in Livebook to interested parties.
"""
@spec broadcast(String.t(), String.t(), term()) :: :ok | {:error, request_error()}
def broadcast(topic, subtopic, message) do
with {:ok, reply} <- io_request(:livebook_get_broadcast_target),
{:ok, pid} <- reply do
send(pid, {:runtime_broadcast, topic, subtopic, message})
:ok
end
end

@doc """
Sends message to the given Livebook process.
"""
@spec send(pid(), term()) :: :ok
def send(pid, message) do
# For now we send directly
Kernel.send(pid, message)
:ok
end

defp io_request(request) do
gl = Process.group_leader()
ref = Process.monitor(gl)

send(gl, {:io_request, self(), ref, request})
Kernel.send(gl, {:io_request, self(), ref, request})

result =
receive do
Expand Down
2 changes: 1 addition & 1 deletion lib/kino/js/live/context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Kino.JS.Live.Context do

@doc false
def new() do
%__MODULE__{assigns: %{}, origin: nil, __private__: %{client_pids: []}}
%__MODULE__{assigns: %{}, origin: nil, __private__: %{}}
end

@doc """
Expand Down
25 changes: 4 additions & 21 deletions lib/kino/js/live_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ defmodule Kino.JS.LiveServer do
defdelegate call(pid, term, timeout), to: GenServer

def broadcast_event(ctx, event, payload) do
for pid <- ctx.__private__.client_pids do
send(pid, {:event, event, payload, %{ref: ctx.__private__.ref}})
end

ref = ctx.__private__.ref
Kino.Bridge.broadcast("js_live", ref, {:event, event, payload, %{ref: ref}})
:ok
end

Expand All @@ -35,7 +33,7 @@ defmodule Kino.JS.LiveServer do
{:ok, ctx}
end

{:ok, %{module: module, client_monitor_refs: [], ctx: ctx}}
{:ok, %{module: module, ctx: ctx}}
end

@impl true
Expand All @@ -52,16 +50,11 @@ defmodule Kino.JS.LiveServer do

@impl true
def handle_info({:connect, pid, %{origin: origin}}, state) do
ref = Process.monitor(pid)

state = update_in(state.ctx.__private__.client_pids, &[pid | &1])
state = update_in(state.client_monitor_refs, &[ref | &1])

ctx = %{state.ctx | origin: origin}
{:ok, data, ctx} = state.module.handle_connect(ctx)
ctx = %{ctx | origin: nil}

send(pid, {:connect_reply, data, %{ref: state.ctx.__private__.ref}})
Kino.Bridge.send(pid, {:connect_reply, data, %{ref: state.ctx.__private__.ref}})

{:noreply, %{state | ctx: ctx}}
end
Expand All @@ -74,16 +67,6 @@ defmodule Kino.JS.LiveServer do
{:noreply, %{state | ctx: ctx}}
end

def handle_info({:DOWN, ref, :process, pid, _reason} = msg, state) do
if ref in state.client_monitor_refs do
state = update_in(state.ctx.__private__.client_pids, &List.delete(&1, pid))
state = update_in(state.client_monitor_refs, &List.delete(&1, ref))
{:noreply, state}
else
apply_handle_info(msg, state)
end
end

def handle_info(msg, state) do
apply_handle_info(msg, state)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/kino/js_data_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule Kino.JSDataStore do
@impl true
def handle_info({:connect, pid, %{origin: _origin, ref: ref}}, state) do
data = state.ref_with_data[ref]
send(pid, {:connect_reply, data, %{ref: ref}})
Kino.Bridge.send(pid, {:connect_reply, data, %{ref: ref}})

{:noreply, state}
end
Expand Down
87 changes: 40 additions & 47 deletions test/kino/data_table_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
defmodule Kino.DataTableTest do
use ExUnit.Case, async: true
use Kino.LivebookCase, async: true

import KinoTest.JS.Live

describe "new/1" do
test "raises an error when records have invalid data type" do
Expand Down Expand Up @@ -32,28 +34,28 @@ defmodule Kino.DataTableTest do

test "sorting is enabled by default when a list is given" do
widget = Kino.DataTable.new([])
data = connect_self(widget)
data = connect(widget)

assert %{features: [:pagination, :sorting]} = data
end

test "sorting is disabled by default when non-list is given" do
widget = Kino.DataTable.new(MapSet.new())
data = connect_self(widget)
data = connect(widget)

assert %{features: [:pagination]} = data
end

test "sorting is enabled when set explicitly with :enable_sorting" do
widget = Kino.DataTable.new(MapSet.new(), sorting_enabled: true)
data = connect_self(widget)
data = connect(widget)

assert %{features: [:pagination, :sorting]} = data
end

test "initial data respects current query parameters" do
widget = Kino.DataTable.new(@people_entries)
data = connect_self(widget)
data = connect(widget)

assert %{
content: %{
Expand All @@ -76,7 +78,7 @@ defmodule Kino.DataTableTest do
{:event, "order_by", %{"key" => "0", "order" => "desc"}, %{origin: self()}}
)

data = connect_self(widget)
data = connect(widget)

assert %{
content: %{
Expand All @@ -102,7 +104,7 @@ defmodule Kino.DataTableTest do
]

widget = Kino.DataTable.new(entries)
data = connect_self(widget)
data = connect(widget)

assert %{
content: %{
Expand All @@ -118,7 +120,7 @@ defmodule Kino.DataTableTest do
]

widget = Kino.DataTable.new(entries)
data = connect_self(widget)
data = connect(widget)

assert %{
content: %{
Expand All @@ -142,7 +144,7 @@ defmodule Kino.DataTableTest do
]

widget = Kino.DataTable.new(entries)
data = connect_self(widget)
data = connect(widget)

assert %{
content: %{
Expand All @@ -161,7 +163,7 @@ defmodule Kino.DataTableTest do
]

widget = Kino.DataTable.new(entries, show_underscored: true)
data = connect_self(widget)
data = connect(widget)

assert %{
content: %{
Expand All @@ -183,7 +185,7 @@ defmodule Kino.DataTableTest do
]

widget = Kino.DataTable.new(entries)
data = connect_self(widget)
data = connect(widget)

assert %{
content: %{
Expand All @@ -199,7 +201,7 @@ defmodule Kino.DataTableTest do

test "sends only relevant fields if user-specified keys are given" do
widget = Kino.DataTable.new(@people_entries, keys: [:id])
data = connect_self(widget)
data = connect(widget)

assert data.content.rows == [
%{fields: %{"0" => "3"}},
Expand All @@ -210,7 +212,7 @@ defmodule Kino.DataTableTest do

test "preserves data order by default" do
widget = Kino.DataTable.new(@people_entries)
data = connect_self(widget)
data = connect(widget)

assert %{
content: %{
Expand All @@ -233,34 +235,32 @@ defmodule Kino.DataTableTest do

test "supports sorting by other columns" do
widget = Kino.DataTable.new(@people_entries)
connect_self(widget)

send(
widget.pid,
{:event, "order_by", %{"key" => "1", "order" => "desc"}, %{origin: self()}}
)

assert {:event, "update_content",
%{
columns: [
%{key: "0", label: ":id"},
%{key: "1", label: ":name"}
],
rows: [
%{fields: %{"0" => "2", "1" => ~s/"Terry Jeffords"/}},
%{fields: %{"0" => "1", "1" => ~s/"Jake Peralta"/}},
%{fields: %{"0" => "3", "1" => ~s/"Amy Santiago"/}}
],
order: :desc,
order_by: "0"
}}
# Get initial data to populate the key-string mapping
connect(widget)

push_event(widget, "order_by", %{"key" => "1", "order" => "desc"})

assert_broadcast_event(widget, "update_content", %{
columns: [
%{key: "0", label: ":id"},
%{key: "1", label: ":name"}
],
rows: [
%{fields: %{"0" => "2", "1" => ~s/"Terry Jeffords"/}},
%{fields: %{"0" => "1", "1" => ~s/"Jake Peralta"/}},
%{fields: %{"0" => "3", "1" => ~s/"Amy Santiago"/}}
],
order: :desc,
order_by: "1"
})
end

test "supports pagination" do
entries = for n <- 1..25, do: %{n: n}

widget = Kino.DataTable.new(entries)
data = connect_self(widget)
data = connect(widget)

assert %{
content: %{
Expand All @@ -270,19 +270,12 @@ defmodule Kino.DataTableTest do
}
} = data

send(widget.pid, {:event, "show_page", %{"page" => 2}, %{origin: self()}})

assert_receive {:event, "update_content",
%{
page: 2,
max_page: 3,
rows: [%{fields: %{"0" => "11"}} | _]
}, %{}}
end
push_event(widget, "show_page", %{"page" => 2})

defp connect_self(widget) do
send(widget.pid, {:connect, self(), %{origin: self()}})
assert_receive {:connect_reply, %{} = data, %{}}
data
assert_broadcast_event(widget, "update_content", %{
page: 2,
max_page: 3,
rows: [%{fields: %{"0" => "11"}} | _]
})
end
end
Loading