Skip to content

Commit

Permalink
Add support for Kino.JS.Live as event source
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatanklosko committed Jul 1, 2023
1 parent 5b71560 commit 8202d38
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 4 deletions.
17 changes: 14 additions & 3 deletions lib/kino/control.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule Kino.Control do

@opaque interval :: {:interval, milliseconds :: non_neg_integer()}

@type event_source :: t() | Kino.Input.t() | interval()
@type event_source :: t() | Kino.Input.t() | interval() | Kino.JS.Live.t()

defp new(attrs) do
ref = Kino.Output.random_ref()
Expand Down Expand Up @@ -283,6 +283,10 @@ defmodule Kino.Control do
* `Kino.Input` - emitting value on value change
* `Kino.JS.Live` - emitting value programmatically
* `interval` - emitting value periodically, see `interval/1`
You can then consume the stream to access its events.
The stream is typically consumed via `Kino.listen/2`.
Expand All @@ -307,7 +311,10 @@ defmodule Kino.Control do
def stream(sources) when is_list(sources) do
for source <- sources, do: assert_stream_source!(source)

tagged_topics = for %{attrs: %{ref: ref}} <- sources, do: {nil, ref}
tagged_topics =
for(%{attrs: %{ref: ref}} <- sources, do: {nil, ref}) ++
for(%Kino.JS.Live{ref: ref} <- sources, do: {nil, ref})

tagged_intervals = for {:interval, ms} <- sources, do: {nil, ms}

build_stream(tagged_topics, tagged_intervals, fn nil, event -> event end)
Expand Down Expand Up @@ -345,14 +352,18 @@ defmodule Kino.Control do
end
end

tagged_topics = for {tag, %{attrs: %{ref: ref}}} <- entries, do: {tag, ref}
tagged_topics =
for({tag, %{attrs: %{ref: ref}}} <- entries, do: {tag, ref}) ++
for({tag, %Kino.JS.Live{ref: ref}} <- entries, do: {tag, ref})

tagged_intervals = for {tag, {:interval, ms}} <- entries, do: {tag, ms}

build_stream(tagged_topics, tagged_intervals, fn tag, event -> {tag, event} end)
end

defp assert_stream_source!(%Kino.Control{}), do: :ok
defp assert_stream_source!(%Kino.Input{}), do: :ok
defp assert_stream_source!(%Kino.JS.Live{}), do: :ok
defp assert_stream_source!({:interval, ms}) when is_number(ms) and ms > 0, do: :ok

defp assert_stream_source!(item) do
Expand Down
12 changes: 11 additions & 1 deletion lib/kino/js/live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ defmodule Kino.JS.Live do
quote location: :keep do
@behaviour Kino.JS.Live

import Kino.JS.Live.Context, only: [assign: 2, update: 3, broadcast_event: 3, send_event: 4]
import Kino.JS.Live.Context,
only: [assign: 2, update: 3, broadcast_event: 3, send_event: 4, emit_event: 2]

@before_compile Kino.JS.Live
end
Expand Down Expand Up @@ -328,6 +329,8 @@ defmodule Kino.JS.Live do

case Kino.start_child({Kino.JS.Live.Server, {module, init_arg, ref}}) do
{:ok, pid} ->
subscription_manager = Kino.SubscriptionManager.cross_node_name()
Kino.Bridge.monitor_object(pid, subscription_manager, {:clear_topic, ref})
%__MODULE__{module: module, pid: pid, ref: ref}

{:error, reason} ->
Expand Down Expand Up @@ -394,3 +397,10 @@ defmodule Kino.JS.Live do
Process.monitor(kino.pid)
end
end

defimpl Enumerable, for: Kino.JS.Live do
def reduce(kino, acc, fun), do: Enumerable.reduce(Kino.Control.stream([kino]), acc, fun)
def member?(_kino, _value), do: {:error, __MODULE__}
def count(_kino), do: {:error, __MODULE__}
def slice(_kino), do: {:error, __MODULE__}
end
17 changes: 17 additions & 0 deletions lib/kino/js/live/context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,21 @@ defmodule Kino.JS.Live.Context do
def send_event(%__MODULE__{} = ctx, client_id, event, payload \\ nil) when is_binary(event) do
Kino.JS.Live.Server.send_event(ctx, client_id, event, payload)
end

@doc """
Emits an event to processes subscribed to this kino.
Consumers may subscribe to events emitted by the given instance of
`Kino.JS.Live` using functions in the `Kino.Control` module, such
as `Kino.Control.stream/1`.
## Examples
emit_event(ctx, %{event: :click, counter: 1})
"""
@spec emit_event(t(), term()) :: :ok
def emit_event(%__MODULE__{} = ctx, event) do
Kino.JS.Live.Server.emit_event(ctx, event)
end
end
5 changes: 5 additions & 0 deletions lib/kino/js/live/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ defmodule Kino.JS.Live.Server do
:ok
end

def emit_event(ctx, item) do
send(Kino.SubscriptionManager, {:event, ctx.__private__.ref, item})
:ok
end

@impl true
def init({module, init_arg, ref}) do
{:ok, ctx, _opts} = call_init(module, init_arg, ref)
Expand Down
13 changes: 13 additions & 0 deletions test/kino/control_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ defmodule Kino.ControlTest do
events = button |> Kino.Control.stream() |> Enum.to_list()
assert events == [%{origin: "client1"}]
end

test "supports Kino.JS.Live" do
kino = Kino.TestModules.LiveCounter.new(0)

spawn(fn ->
Process.sleep(1)
Kino.TestModules.LiveCounter.bump(kino, 1)
Kino.TestModules.LiveCounter.bump(kino, 2)
end)

events = kino |> Kino.Control.stream() |> Enum.take(2)
assert events == [%{event: :bump, by: 1}, %{event: :bump, by: 2}]
end
end

describe "stream/1 with a list of sources" do
Expand Down
1 change: 1 addition & 0 deletions test/support/test_modules/live_counter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ defmodule Kino.TestModules.LiveCounter do

defp bump_count(ctx, by) do
broadcast_event(ctx, "bump", %{by: by})
emit_event(ctx, %{event: :bump, by: by})
update(ctx, :count, &(&1 + by))
end

Expand Down

0 comments on commit 8202d38

Please sign in to comment.