Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronized updates #109

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,30 @@ on(obs) do x
end
```

### Synchronous Updates

If you have a function which combines the contents of multiple Observables you may want to update all of their contents before calling the attached function. A common example for this is broadcasting over array Observables.

```julia
input1 = Observable([1, 2])
input2 = Observable([1, 2])
output = map((a, b) -> tuple.(a, b), input1, input2)
input1[] = [1, 2, 3]
ERROR: DimensionMismatch: arrays could not be broadcast to a common size; got a dimension with lengths 3 and 2
input2[] = [1, 2, 3]
```

After the second update `output` contains `[(1, 1), (2, 2), (3, 3)]` as it should, but it would be nice to avoid the intermediate update which errors. To do that the `@combine_updates` macro can be used.

```julia
@combine_updates begin
input1[] = [1, 2, 3, 4]
input2[] = [1, 2, 3, 4]
end
```

This will update the observables in two steps. First, it will update content of each observable and mark every listener as out of date. Then it will go through all listeners and call the ones marked out of date, respecting priority and `Consume`. You can also do this manually with `prepare_update!(observable, value)` and `execute_update!(observable)`.

### How is it different from Reactive.jl?

The main difference is `Signal`s are manipulated mostly by converting one signal to another. For example, with signals, you can construct a changing UI by creating a `Signal` of UI objects and rendering them as the signal changes. On the other hand, you can use an Observable both as an input and an output. You can arbitrarily attach outputs to inputs allowing structuring code in a [signals-and-slots](http://doc.qt.io/qt-4.8/signalsandslots.html) kind of pattern.
Expand Down
153 changes: 139 additions & 14 deletions src/Observables.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module Observables

export Observable, on, off, onany, connect!, obsid, async_latest, throttle
export Consume, ObserverFunction, AbstractObservable
export prepare_update!, execute_update!, @combine_updates

import Base.Iterators.filter

Expand All @@ -22,6 +23,49 @@ abstract type AbstractObservable{T} end
const addhandler_callbacks = []
const removehandler_callbacks = []

@enum CallbackState::Int8 begin
UPTODATE
CONSUMED
OUTOFDATE
end

struct Consume
x::Bool
end
Consume() = Consume(true)

if VERSION >= v"1.8.0"
mutable struct Callback
const f::Any
state::CallbackState
end
else
mutable struct Callback
f::Any
state::CallbackState
end
end
Callback(f::Any) = Callback(f, UPTODATE)
(cb::Callback)(val) = Base.invokelatest(cb.f, val)

function update(cb::Callback, val)
if cb.state == UPTODATE # no need to update again
return false
elseif cb.state == CONSUMED # last update was blocking
return true
else
output = Base.invokelatest(cb.f, val)
if output isa Consume && output.x
cb.state = CONSUMED
return true
else
cb.state = UPTODATE
return false
end
end
end


"""
obs = Observable(val; ignore_equal_values=false)
obs = Observable{T}(val; ignore_equal_values=false)
Expand All @@ -30,17 +74,16 @@ Like a `Ref`, but updates can be watched by adding a handler using [`on`](@ref)
Set `ignore_equal_values=true` to not trigger an event for `observable[] = new_value` if `isequal(observable[], new_value)`.
"""
mutable struct Observable{T} <: AbstractObservable{T}

listeners::Vector{Pair{Int, Any}}
listeners::Vector{Pair{Int, Callback}}
inputs::Vector{Any} # for map!ed Observables
ignore_equal_values::Bool
val::T

function Observable{T}(; ignore_equal_values::Bool=false) where {T}
return new{T}(Pair{Int, Any}[], [], ignore_equal_values)
return new{T}(Pair{Int, Callback}[], [], ignore_equal_values)
end
function Observable{T}(@nospecialize(val); ignore_equal_values::Bool=false) where {T}
return new{T}(Pair{Int, Any}[], [], ignore_equal_values, val)
return new{T}(Pair{Int, Callback}[], [], ignore_equal_values, val)
end
end

Expand Down Expand Up @@ -112,7 +155,7 @@ end


# Optimized version of Base.searchsortedlast (optimized for our use case of pairs)
function pair_searchsortedlast(values::Vector{Pair{Int, Any}}, prio::Int)::Int
function pair_searchsortedlast(values::Vector{Pair{Int, Callback}}, prio::Int)::Int
u = 1
lo = 0
hi = length(values) + u
Expand All @@ -128,9 +171,10 @@ function pair_searchsortedlast(values::Vector{Pair{Int, Any}}, prio::Int)::Int
end

function register_callback(@nospecialize(observable), priority::Int, @nospecialize(f))
ls = listeners(observable)::Vector{Pair{Int, Any}}
ls = listeners(observable)::Vector{Pair{Int, Callback}}
idx = pair_searchsortedlast(ls, priority)
p = Pair{Int, Any}(priority, f) # faster than priority => f because of convert
# faster than priority => f because of convert
p = Pair{Int, Callback}(priority, Callback(f))
insert!(ls, idx + 1, p)
return
end
Expand All @@ -152,11 +196,6 @@ Base.convert(::Type{T}, x) where {T<:Observable} = T(x)
Base.convert(::Type{Observable{Any}}, x::AbstractObservable{Any}) = x
Base.convert(::Type{Observables.Observable{Any}}, x::Observables.Observable{Any}) = x

struct Consume
x::Bool
end
Consume() = Consume(true)

"""
notify(observable::AbstractObservable)

Expand All @@ -165,8 +204,8 @@ Returns true if an event got consumed before notifying every listener.
"""
function Base.notify(@nospecialize(observable::AbstractObservable))
val = observable[]
for (_, f) in listeners(observable)::Vector{Pair{Int, Any}}
result = Base.invokelatest(f, val)
for (_, f) in listeners(observable)::Vector{Pair{Int, Callback}}
result = f(val)
if result isa Consume && result.x
# stop calling callbacks if event got consumed
return true
Expand All @@ -175,6 +214,92 @@ function Base.notify(@nospecialize(observable::AbstractObservable))
return false
end

# Handling "synchronized" updates
"""
prepare_update!(observable, value)

Sets `observable.val = value` and marks its listeners as `OUTOFDATE`. To run
the listeners, call `execute_update!(observable)`
"""
function prepare_update!(observable::Observable, val)
observable.val = val
for (_, cb) in listeners(observable)::Vector{Pair{Int, Callback}}
cb.state = OUTOFDATE
end
return
end

"""
execute_update!(observable::observable)

Iterates through each listener of the observable. If the listener is marked as
`OUTOFDATE` it executes and updates its state to `CONSUMED` or `UPTODATE`
depending on the return type of the listener. Listeners marked as `UPTODATE`
are skipped and those marked as `CONSUMED` result in termination of the
iteration.
"""
function execute_update!(observable::Observable)
val = observable[]
for (_, cb) in listeners(observable)::Vector{Pair{Int, Callback}}
if update(cb, val)
# stop calling callbacks if event got consumed
return true
end
end
return false
end

"""
@combine_updates begin
obs1[] = val1
obs2[] = val2
end

This macro delays the execution of listeners until the end of the enclosing
block. It also avoids executing the same listener multiple times if multiple
enclosed observables trigger it.

The code generated from the above example is

begin
prepare_update!(obs1, val1)
prepare_update!(obs2, val1)
execute_update!(obs1)
execute_update!(obs2)
end
"""
macro combine_updates(block::Expr)
observables = Symbol[]
if block.head != :block
error("Expression should be a begin ... end block.")
end

_replace_observable_update!.(block.args, (observables,))

for name in unique(observables)
push!(block.args, :(Observables.execute_update!($name)))
end

return esc(block)
end

_replace_observable_update!(::Any, ::Vector{Symbol}) = nothing
function _replace_observable_update!(expr::Expr, observables::Vector{Symbol})
if expr.head == Symbol("=") && expr.args[1] isa Expr && expr.args[1].head == :ref
# keep track of observable
name = expr.args[1].args[1]
push!(observables, name)

# switch to prepare_update! call
expr.head = :call
expr.args[1] = :(Observables.prepare_update!)
insert!(expr.args, 2, name)
else
_replace_observable_update!.(expr.args, (observables,))
end
return
end

function print_value(io::IO, x::Observable{T}; print_listeners=false) where T
print(io, "Observable")
real_eltype = T
Expand Down