diff --git a/lib/kino/control.ex b/lib/kino/control.ex index ffe4d342..4eb5bbb7 100644 --- a/lib/kino/control.ex +++ b/lib/kino/control.ex @@ -36,7 +36,7 @@ defmodule Kino.Control do @opaque interval :: {:interval, milliseconds :: non_neg_integer()} - @type event_source :: t() | Kino.Input.t() | interval() + @type event_source :: t() | Kino.Input.t() | interval() | Kino.JS.Live.t() defp new(attrs) do ref = Kino.Output.random_ref() @@ -283,6 +283,10 @@ defmodule Kino.Control do * `Kino.Input` - emitting value on value change + * `Kino.JS.Live` - emitting value programmatically + + * `interval` - emitting value periodically, see `interval/1` + You can then consume the stream to access its events. The stream is typically consumed via `Kino.listen/2`. @@ -307,7 +311,10 @@ defmodule Kino.Control do def stream(sources) when is_list(sources) do for source <- sources, do: assert_stream_source!(source) - tagged_topics = for %{attrs: %{ref: ref}} <- sources, do: {nil, ref} + tagged_topics = + for(%{attrs: %{ref: ref}} <- sources, do: {nil, ref}) ++ + for(%Kino.JS.Live{ref: ref} <- sources, do: {nil, ref}) + tagged_intervals = for {:interval, ms} <- sources, do: {nil, ms} build_stream(tagged_topics, tagged_intervals, fn nil, event -> event end) @@ -345,7 +352,10 @@ defmodule Kino.Control do end end - tagged_topics = for {tag, %{attrs: %{ref: ref}}} <- entries, do: {tag, ref} + tagged_topics = + for({tag, %{attrs: %{ref: ref}}} <- entries, do: {tag, ref}) ++ + for({tag, %Kino.JS.Live{ref: ref}} <- entries, do: {tag, ref}) + tagged_intervals = for {tag, {:interval, ms}} <- entries, do: {tag, ms} build_stream(tagged_topics, tagged_intervals, fn tag, event -> {tag, event} end) @@ -353,6 +363,7 @@ defmodule Kino.Control do defp assert_stream_source!(%Kino.Control{}), do: :ok defp assert_stream_source!(%Kino.Input{}), do: :ok + defp assert_stream_source!(%Kino.JS.Live{}), do: :ok defp assert_stream_source!({:interval, ms}) when is_number(ms) and ms > 0, do: :ok defp assert_stream_source!(item) do diff --git a/lib/kino/js/live.ex b/lib/kino/js/live.ex index aad6916b..dcf2e21f 100644 --- a/lib/kino/js/live.ex +++ b/lib/kino/js/live.ex @@ -294,7 +294,8 @@ defmodule Kino.JS.Live do quote location: :keep do @behaviour Kino.JS.Live - import Kino.JS.Live.Context, only: [assign: 2, update: 3, broadcast_event: 3, send_event: 4] + import Kino.JS.Live.Context, + only: [assign: 2, update: 3, broadcast_event: 3, send_event: 4, emit_event: 2] @before_compile Kino.JS.Live end @@ -328,6 +329,8 @@ defmodule Kino.JS.Live do case Kino.start_child({Kino.JS.Live.Server, {module, init_arg, ref}}) do {:ok, pid} -> + subscription_manager = Kino.SubscriptionManager.cross_node_name() + Kino.Bridge.monitor_object(pid, subscription_manager, {:clear_topic, ref}) %__MODULE__{module: module, pid: pid, ref: ref} {:error, reason} -> @@ -394,3 +397,10 @@ defmodule Kino.JS.Live do Process.monitor(kino.pid) end end + +defimpl Enumerable, for: Kino.JS.Live do + def reduce(kino, acc, fun), do: Enumerable.reduce(Kino.Control.stream([kino]), acc, fun) + def member?(_kino, _value), do: {:error, __MODULE__} + def count(_kino), do: {:error, __MODULE__} + def slice(_kino), do: {:error, __MODULE__} +end diff --git a/lib/kino/js/live/context.ex b/lib/kino/js/live/context.ex index 8a63aaf3..d6f74e0f 100644 --- a/lib/kino/js/live/context.ex +++ b/lib/kino/js/live/context.ex @@ -81,4 +81,21 @@ defmodule Kino.JS.Live.Context do def send_event(%__MODULE__{} = ctx, client_id, event, payload \\ nil) when is_binary(event) do Kino.JS.Live.Server.send_event(ctx, client_id, event, payload) end + + @doc """ + Emits an event to processes subscribed to this kino. + + Consumers may subscribe to events emitted by the given instance of + `Kino.JS.Live` using functions in the `Kino.Control` module, such + as `Kino.Control.stream/1`. + + ## Examples + + emit_event(ctx, %{event: :click, counter: 1}) + + """ + @spec emit_event(t(), term()) :: :ok + def emit_event(%__MODULE__{} = ctx, event) do + Kino.JS.Live.Server.emit_event(ctx, event) + end end diff --git a/lib/kino/js/live/server.ex b/lib/kino/js/live/server.ex index f05efbdb..ce8ea9a8 100644 --- a/lib/kino/js/live/server.ex +++ b/lib/kino/js/live/server.ex @@ -35,6 +35,11 @@ defmodule Kino.JS.Live.Server do :ok end + def emit_event(ctx, item) do + send(Kino.SubscriptionManager, {:event, ctx.__private__.ref, item}) + :ok + end + @impl true def init({module, init_arg, ref}) do {:ok, ctx, _opts} = call_init(module, init_arg, ref) diff --git a/test/kino/control_test.exs b/test/kino/control_test.exs index 7316334b..3a97ad1f 100644 --- a/test/kino/control_test.exs +++ b/test/kino/control_test.exs @@ -109,6 +109,19 @@ defmodule Kino.ControlTest do events = button |> Kino.Control.stream() |> Enum.to_list() assert events == [%{origin: "client1"}] end + + test "supports Kino.JS.Live" do + kino = Kino.TestModules.LiveCounter.new(0) + + spawn(fn -> + Process.sleep(1) + Kino.TestModules.LiveCounter.bump(kino, 1) + Kino.TestModules.LiveCounter.bump(kino, 2) + end) + + events = kino |> Kino.Control.stream() |> Enum.take(2) + assert events == [%{event: :bump, by: 1}, %{event: :bump, by: 2}] + end end describe "stream/1 with a list of sources" do diff --git a/test/support/test_modules/live_counter.ex b/test/support/test_modules/live_counter.ex index 04b6b002..64fa49d5 100644 --- a/test/support/test_modules/live_counter.ex +++ b/test/support/test_modules/live_counter.ex @@ -68,6 +68,7 @@ defmodule Kino.TestModules.LiveCounter do defp bump_count(ctx, by) do broadcast_event(ctx, "bump", %{by: by}) + emit_event(ctx, %{event: :bump, by: by}) update(ctx, :count, &(&1 + by)) end