Skip to content

Commit

Permalink
Send session events over socket
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-dixon committed Aug 8, 2017
1 parent 916b7bf commit 770a644
Show file tree
Hide file tree
Showing 8 changed files with 608 additions and 121 deletions.
5 changes: 4 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
[org.clojure/spec.alpha "0.1.109"]
[org.clojure/clojurescript "1.9.562"]
[org.clojure/core.async "0.3.442"]
[com.cerner/clara-rules "0.15.1"]
[com.cerner/clara-rules "0.16.0-SNAPSHOT"]
[com.cognitect/transit-clj "0.8.300"]
[com.cognitect/transit-cljs "0.8.239"]
[com.taoensso/sente "1.11.0"]
[reagent "0.6.0"]]

:plugins [[lein-cljsbuild "1.1.4"]
Expand Down
242 changes: 208 additions & 34 deletions src/cljc/precept/core.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
[precept.listeners :as l]
[precept.state :refer [fact-id rules store state] :as s]
[clara.rules :refer [fire-rules]]
[precept.serialize.facts :as serialize]
[precept.spec.core :refer [validate]]
[precept.spec.sub :as sub]
[precept.spec.lang :as lang]
#?(:clj [clojure.core.async :refer [<! >! put! take! chan go go-loop]])
#?(:cljs [cljs.core.async :refer [put! take! chan <! >!]])
[taoensso.sente :as sente]
#?(:clj [clojure.core.async :refer [<! >! put! take! chan go go-loop] :as async])
#?(:cljs [cljs.core.async :refer [put! take! chan <! >!] :as async])
#?(:cljs [reagent.core :as r]))
#?(:cljs (:require-macros [cljs.core.async.macros :refer [go go-loop]])))

Expand Down Expand Up @@ -82,6 +84,7 @@
(def action-ch (chan 1))
(def session->store (chan 1))
(def done-ch (chan 1))
(def init-session-ch (chan 1))

(defn update-session-history
"First history entry is most recent"
Expand All @@ -93,24 +96,37 @@

(defn swap-session!
[next]
(trace "Swapping session!")
(swap! s/state assoc :session next))

(defn swap-session-sync! [next f]
(swap! s/state assoc :session next)
(put! init-session-ch true)
(f)
(:session @s/state))

(defn dispatch! [f] (put! action-ch f))

(defn transactor []
(defn transactor
"Takes from action channel and puts to `session->store` channel. Parks until `done-ch`
returns a message indicating:
1. A transaction of facts (via `then`) was inserted into the (global) session
(`(:session precept.state/state)`)
2. `fire-rules` was called on `(:session precept.state/state)`
3. changes were read from the session and applied to the store"
[]
(go-loop []
(let [action (<! action-ch)]
(do (trace " ---> Kicking off!" (hash action))
(>! session->store action)
(do (>! session->store action)
(<! done-ch)
(recur)))))

(defn init-transactor
"Parks until session is initialized in (:session precept.state/state) then spawns
a transactor that takes from `action-ch`."
[]
(go (<! init-session-ch)
(transactor)))

(defn apply-changes-to-session [in]
(let [out (chan)]
(go-loop []
Expand All @@ -123,14 +139,28 @@

(defn read-changes-from-session
"Reads changes from session channel, fires rules, and puts resultant changes
on changes channel. Updates session state atom with new session."
on changes channel. Updates session state atom with new session. Changes are returned
keyed by :added, :removed as Tuple records."
[in]
(let [out (chan)]
(go-loop []
(let [session (<! in)
ops (l/vec-ops session)
next-session (l/replace-listener session)
_ (trace "Ops!" ops)]
ops (-> (l/ops session) (l/diff-ops))
next-session (if (:connected? @s/*devtools)
(do
(>! (:event-sink @s/*devtools)
{:fire-rules-complete true
:total-events (dec (:event-number @s/*event-coords))})
(swap! s/*event-coords update :state-number inc)
(swap! s/*event-coords assoc :state-id (util/guid)
:event-number 0)
(l/replace-listener
session
(l/create-devtools-listeners
(:event-sink @s/*devtools)
s/*event-coords
[])))
(l/replace-listener session))]
(swap-session! next-session)
(>! out ops)
(recur)))
Expand Down Expand Up @@ -181,9 +211,8 @@
[in]
(let [out (chan)]
(go-loop []
(let [ops (<! in)
_ (trace "Removals" (:removed ops))]
(apply-removals-to-view-model! (remove util/impl-fact? (:removed ops)))
(let [ops (<! in)]
(apply-removals-to-view-model! (mapv util/record->vec (:removed ops)))
(>! out ops)
(recur)))
out))
Expand All @@ -192,17 +221,89 @@
"Reads ops from channel and applies additions to store"
[in]
(go-loop []
(let [ops (<! in)
_ (trace "Additions" (:added ops))]
(apply-additions-to-view-model! (remove util/impl-fact? (:added ops)))
(let [ops (<! in)]
(apply-additions-to-view-model! (mapv util/record->vec (:added ops)))
(>! done-ch :done)
(recur)))
nil)

(transactor)
;; Create session/store update pipeline
(init-transactor)

(def realized-session (apply-changes-to-session session->store))
(def changes-out (read-changes-from-session realized-session))
(def removals-out (apply-removals-to-store changes-out))

(def session-mult (async/mult realized-session))

(defn create-fired-session-ch
"Returns core.async channel that receives the session after the rules have been fired and
before its listener has been replaced."
[]
(async/tap session-mult (chan)))

(defn batch-complete?
"Returns true if the event is fire rules complete or it exists in the batch and
the maximum event number received is equal to `:total-events` from fire rules complete"
[event batch]
(let [max-event-number-recd (max (:event-number event)
(apply max (map :event-number batch)))
fire-rules-complete-event
(or (first (filter :fire-rules-complete batch))
(when (some-> :fire-rules-complete event)
event))
total-events (:total-events fire-rules-complete-event)]
(and (integer? total-events)
(integer? max-event-number-recd)
(= total-events max-event-number-recd))))

(defn create-dto>socket-router
"Returns a go-loop that takes from a channel with events emitted by
`precept.listeners/PersistentSessionEventMessager and calls the provided send function,
intended for use with a precept-devtools socket. Batches events per call to `fire-rules`."
[in send!]
(go-loop [batch []]
(let [event (<! in)]
(if (batch-complete? event batch)
(do
(send! [:devtools/update (serialize/serialize batch)])
(recur []))
(recur (conj batch event))))))


(def changes-out (read-changes-from-session (create-fired-session-ch)))

(def changes-mult (async/mult changes-out))

(defn changes-xf [f]
(fn [rf]
(fn
([] (rf))
([result] (rf result))
([result input]
(let [ret {:added (f (:added input))
:removed (f (:removed input))}]
(rf result ret))))))

(defn create-change-report-ch
"Returns core.async channel with operational changes from session.
Removes Precept implementation facts when called with no arguments.
May be called with a function that will be applied to all :added and :removed facts.
Usage:
```clj
(def nil-values-ch
(create-changes-report-ch
(filter (fn [{:keys [e a v t]} record] (nil? v))
%)))
(go-loop []
(let [changes (<! nil-values-ch)]
(println \"Facts with nil values added / removed:\" changes)))
=> Facts with nil values added / removed: {:added () :removed ()}
```"
([] (create-change-report-ch util/remove-impl-attrs))
([f] (async/tap changes-mult (chan 1 (changes-xf f)))))

(def removals-out (apply-removals-to-store (create-change-report-ch)))

(apply-additions-to-store removals-out)

;; TODO. Find equivalent in CLJ
Expand Down Expand Up @@ -232,40 +333,113 @@
"Returns lens that points to a path in the store. Sub is handled by a rule."
([req]
(let [name (first req)
existing (util/find-sub-by-name name)
_ (trace "New sub name / existing if any" name existing)]
existing (util/find-sub-by-name name)]
(or (:lens existing) (register req)))))

(defn then
"Inserts facts into current session"
[facts]
(dispatch! (fn [session] (util/insert session facts))))

(def default-devtools-host "0.0.0.0:3232")

(defn connect-devtools-socket!
[options init-cb]
(let [options (if (map? options) options {})
host (or (:host options) default-devtools-host)
path (or (:path options) "/chsk")
{:keys [chsk ch-recv send-fn state]} (sente/make-channel-socket!
path
{:host host
:type :auto})]
(def chsk chsk)
(def ch-chsk ch-recv)
(def chsk-send! send-fn)
(def chsk-state state)
(def devtools-socket-ch (chan 5000))
(def devtools-socket-router (create-dto>socket-router devtools-socket-ch send-fn))

(swap! s/*devtools assoc
:event-sink devtools-socket-ch
:host host
:path path)

(defmulti handle-message first)
(defmethod handle-message :chsk/ws-ping [_])

(defmethod handle-message :devtools/init [[msg-name msg]]
(println "[precept-devtools] Server sent: " msg))

(defmulti handle-event :id)
(defmethod handle-event :chsk/handshake [_])

(defmethod handle-event :chsk/state [{:keys [?data]}]
(let [[last-state this-state] ?data]
(when (:first-open? this-state)
(do (println (str "[precept-devtools] Connected to "
(:host @s/*devtools) (:path @s/*devtools) "."))
(swap! s/*devtools assoc :connected? true)
(init-cb (:event-sink @s/*devtools))))))

(defmethod handle-event :chsk/recv [{:keys [?data]}]
(handle-message ?data))

(sente/start-chsk-router! ch-recv handle-event)))

(defn start-with-devtools!
[{:keys [session facts devtools] :as options}]
(connect-devtools-socket! devtools
(fn [ch]
(do
(swap-session-sync!
(l/replace-listener
(:session options)
(l/create-devtools-listeners ch s/*event-coords []))
#(dispatch! (fn [session] (util/insert session (:facts options)))))
(swap! s/state assoc :started? true)))))

(defn start!
"Initializes session with facts.
- :session - the `session` from which changes will be tracked
- :facts - initial facts
- :devtools (optional) - Either a boolean or a map of options for connecting to a
running instance of a Precept devtools server. Default: `nil`.
Supported devtools options:
- `:host` - String with host and port separated by `:`.
Defaults to default Devtools server address and port `0.0.0.0:3232`.
- `:path` - String of path for server socket.
Defaults to default Devtools server path `/chsk`.
Once initialized, facts are synced to a reagent ratom (`state/store`) and accessed via
subscriptions.
"
[{:keys [session facts] :as options}]
(let [opts (or options (hash-map))]
[{:keys [session facts devtools] :as options}]
(if devtools
(start-with-devtools! options)
(do
(swap-session-sync!
(l/replace-listener (:session opts))
#(dispatch! (fn [session] (util/insert session (:facts opts)))))
(l/replace-listener (:session options))
#(dispatch! (fn [session] (util/insert session (:facts options)))))
(swap! s/state assoc :started? true))))

(defn resume!
"Used to reload session."
[{:keys [session facts] :as options}]
(let [opts (or options (hash-map))]
(if (:started? @s/state)
(swap-session-sync!
(l/replace-listener (:session opts))
#(dispatch! (fn [session] (util/insert session (:facts opts)))))
session)))
(defn resume-with-devtools!
[options]
(swap-session-sync!
(l/replace-listener (:session options)
(l/create-devtools-listeners (:event-sink @s/*devtools) s/*event-coords []))
#(dispatch! (fn [session] (util/insert session (:facts options))))))


(defn resume!
"Resets session with provided facts if `start!` has been called, otherwise returns the session
received as an argument unmodified. Avoids duplicate session creation on page refresh in
development when there is stale session metadata in the compiler."
[{:keys [session facts devtools] :as options}]
(if (not (:started? @s/state))
session
(if (:connected? @s/*devtools)
(resume-with-devtools! options)
(swap-session-sync!
(l/replace-listener (:session options))
#(dispatch! (fn [session] (util/insert session (:facts options))))))))
Loading

0 comments on commit 770a644

Please sign in to comment.