diff --git a/project.clj b/project.clj index 4417f13..9c23e69 100644 --- a/project.clj +++ b/project.clj @@ -5,10 +5,12 @@ :url "http://www.eclipse.org/legal/epl-v10.html"} :dependencies [[org.clojure/clojure "1.7.0"] [org.clojure/clojurescript "1.7.228"] - #_[datascript "0.15.0"] - #_[com.datomic/datomic-free "0.9.5407"] [org.clojure/core.match "0.3.0-alpha4"]] :plugins [[lein-cljsbuild "1.1.3"]] + :profiles {:dev {:dependencies [[datascript "0.15.0"] + [com.datomic/datomic-free "0.9.5344"] + [org.clojure/core.async "0.2.391"] + [org.clojure/tools.namespace "0.2.11"]]}} :cljsbuild { :builds [ {:id "posh" :source-paths ["src/"] diff --git a/src/posh/clj/datascript.clj b/src/posh/clj/datascript.clj index f20b0ff..14dbfd0 100644 --- a/src/posh/clj/datascript.clj +++ b/src/posh/clj/datascript.clj @@ -1,6 +1,7 @@ (ns posh.clj.datascript (:require [posh.plugin-base :as base] [posh.lib.ratom :as rx] + [posh.lib.datascript :as ldb] [datascript.core :as d])) (def dcfg @@ -14,7 +15,9 @@ :listen! d/listen! :conn? d/conn? :ratom rx/atom - :make-reaction rx/make-reaction}] + :make-reaction rx/make-reaction + :conn->schema ldb/conn->schema + :additional-listeners ldb/add-schema-listener!}] (assoc dcfg :pull (partial base/safe-pull dcfg)))) (base/add-plugin dcfg) diff --git a/src/posh/clj/datomic.clj b/src/posh/clj/datomic.clj index 473e2ad..90c29f6 100644 --- a/src/posh/clj/datomic.clj +++ b/src/posh/clj/datomic.clj @@ -1,32 +1,137 @@ (ns posh.clj.datomic (:require [posh.plugin-base :as base] [posh.lib.ratom :as rx] - [datomic.api :as d])) + [clojure.core.async :as async + :refer [thread offer! seq [db-after ^datomic.Datom d] + [(.e d) (d/ident db-after (.a d)) (.v d) (.tx d) (.added d)]) -(defn- conn? [x] (instance? datomic.Connection x)) +; TODO import stuartsierra.component ? +(defprotocol Lifecycle + (start [this]) + (stop [this])) -; TODO maybe we don't want blocking here?) -(defn- transact!* [& args] @(apply d/transact args)) +(defn normalized-tx-report [{:keys [db-after] :as tx-report}] + (update tx-report :tx-data + (fn [datoms] (mapv #(datom->seq db-after %) datoms)))) -(defn- listen! - ([conn callback] (listen! conn (rand) callback)) +(defn run-listeners! [pconn tx-report'] + (try (doseq [[_ callback] @(:listeners pconn)] (callback tx-report')) + (catch Throwable e (debug "WARNING:" e)))) + +(defrecord PoshableConnection [datomic-conn listeners deduplicate-tx-idents interrupted?] + Lifecycle + (start [this] + (assert (instance? datomic.Connection datomic-conn)) + (assert (instance? clojure.lang.IAtom listeners)) + (assert (instance? clojure.lang.IAtom interrupted?)) + ; See `transact!*` as to why the below schema+entity is required. + @(d/transact datomic-conn + [{:db/id (d/tempid :db.part/db) + :db.install/_attribute :db.part/db + :db/ident :posh.clj.datomic.tx-notifier/value + :db/cardinality :db.cardinality/one + :db/valueType :db.type/uuid} + {:db/id (d/tempid :db.part/db) + :db/ident ::tx-notifier}]) + (thread + (loop [] + (when-not @interrupted? + ; `poll` because if `take`, still won't be nil or stop waiting when conn is released + ; the poll time is how long it will take to shut down when that time comes + (when-let [{:keys [db-after] :as tx-report} + (.poll ^java.util.concurrent.BlockingQueue + (d/tx-report-queue datomic-conn) + 1 + java.util.concurrent.TimeUnit/SECONDS)] + (try (let [{:keys [tx-data] :as tx-report'} (normalized-tx-report tx-report) + last-tx-item (last tx-data) + tx-ident (when (and last-tx-item + (= (d/ident db-after (get last-tx-item 0)) ::tx-notifier) + (= (d/ident db-after (get last-tx-item 1)) :posh.clj.datomic.tx-notifier/value)) + (get last-tx-item 2))] + (try (debug "tx-report received in PoshableConnection") + (when-not (get @deduplicate-tx-idents tx-ident) + (run-listeners! this tx-report')) + (finally + (swap! deduplicate-tx-idents #(disj % tx-ident))))) + (catch Throwable e (debug "WARNING:" e))) + (recur))))) + this) + (stop [this] + (reset! interrupted? true) + (swap! deduplicate-tx-idents empty) + this)) + +(defn ->poshable-conn [datomic-conn] + {:pre [(instance? datomic.Connection datomic-conn)]} + (let [listeners (atom nil)] + (with-meta (start (PoshableConnection. datomic-conn listeners (atom #{}) (atom false))) + {:listeners listeners}))) + +(defn conn? [x] (instance? PoshableConnection x)) + +(defn ->conn [x] + (if (conn? x) + (:datomic-conn x) + x)) + +(defn assert-pconn [x] (assert (instance? PoshableConnection x))) + +(defn listen! + ([conn callback] (listen! conn (gensym) callback)) ([conn key callback] {:pre [(conn? conn)]} - (TODO "Need to figure out how to listen to Datomic connection in the same way as DataScript") + (swap! (:listeners (meta conn)) assoc key callback) key)) +(defn db* [x] + (cond (instance? datomic.Database x) + x + (conn? x) + (-> x :datomic-conn d/db) + (instance? datomic.Connection x) + (d/db x) + :else x #_(throw (ex-info "Object cannot be converted into DB" {:obj x})))) + +(defn q* [q x & args] + (apply d/q q (db* x) args)) + +(defn transact!* + "The main point of the additions onto Datomic's base `transact` fn is to wait for related + listeners to be run before returning." + [conn tx] + {:pre [(conn? conn)]} + (let [; In order to ensure listeners are run only once (i.e. deduplicate them), + ; we have to transmit to the report queue in a race-condition-free way + ; some sort of unique ID we know ahead of time. I'd like to just use the + ; txn ID, but this is not given ahead of time. Thus we must pass a squuid + ; to the transaction. + ; This is cleaner than e.g. using channels because they introduce race + ; conditions in this situation. + tx-ident (d/squuid) + _ (swap! (:deduplicate-tx-idents conn) conj tx-ident) + tx-report @(d/transact (->conn conn) + (conj (vec tx) [:db/add ::tx-notifier :posh.clj.datomic.tx-notifier/value tx-ident])) + tx-report' (normalized-tx-report tx-report) + _ (run-listeners! conn tx-report')] + tx-report')) + (def dcfg - (let [dcfg {:db d/db + (let [dcfg {:db db* :pull* d/pull - :q d/q + :q q* :filter d/filter :with d/with :entid d/entid :transact! transact!* :listen! listen! :conn? conn? + :->poshable-conn ->poshable-conn :ratom rx/atom :make-reaction rx/make-reaction}] (assoc dcfg :pull (partial base/safe-pull dcfg)))) diff --git a/src/posh/core.cljc b/src/posh/core.cljc index 05158e7..1f33443 100644 --- a/src/posh/core.cljc +++ b/src/posh/core.cljc @@ -34,7 +34,6 @@ :filters {}}) ;; {db-id {:filter pred :as-of t :with tx-data :since t}} - (defn add-db ([posh-tree db-id conn schema] (add-db posh-tree db-id conn schema nil)) ([{:keys [dcfg conns schemas dbs cache graph] :as posh-tree} diff --git a/src/posh/lib/datascript.cljc b/src/posh/lib/datascript.cljc new file mode 100644 index 0000000..3ea372e --- /dev/null +++ b/src/posh/lib/datascript.cljc @@ -0,0 +1,9 @@ +(ns posh.lib.datascript) + +(defn conn->schema [conn] (:schema @conn)) + +(defn add-schema-listener! [conn posh-atom db-id] + (add-watch conn :posh-schema-listener + (fn [_ _ old-state new-state] + (when (not= (:schema old-state) (:schema new-state)) + (swap! posh-atom assoc-in [:schema db-id] (:schema new-state)))))) diff --git a/src/posh/lib/db.cljc b/src/posh/lib/db.cljc index 4257f72..2cb6322 100644 --- a/src/posh/lib/db.cljc +++ b/src/posh/lib/db.cljc @@ -53,7 +53,7 @@ db (if with (:db-after ((:with dcfg) db with)) db) ;; with tx-data db (if filter ((:filter dcfg) db (if (symbol? filter) - #?(:clj (resolve filter) :cljs nil) + #?(:clj (resolve filter) :cljs nil) ; TODO why use resolve? filter)) db) ;; filter pred-sym ] diff --git a/src/posh/lib/ratom.cljc b/src/posh/lib/ratom.cljc index efe0933..ca05a67 100644 --- a/src/posh/lib/ratom.cljc +++ b/src/posh/lib/ratom.cljc @@ -1,26 +1,27 @@ (ns posh.lib.ratom "Ported to .cljc from reagent.ratom by alexandergunnarson." (:refer-clojure :exclude [atom run!]) - (:require [clojure.set :as s]) + (:require [#?(:clj clojure.core + :cljs cljs.core) :as core] + [clojure.set :as s] + [posh.lib.util + :refer [#?(:clj if-cljs)]]) #?(:cljs (:require-macros [posh.lib.ratom - :refer [getm setm! getf setf! add! array-list alength* aset* aget*]])) + :refer [getm setm! getum setum! getf setf! add! array-list alength* aset* aget* umut run!]])) #?(:clj (:import [java.util ArrayList] [clojure.lang IDeref IAtom IRef IMeta IHashEq]))) -;;; Misc utils +; According to `posh.lib.ratom-test/ratom-perf`, +; the unsynchronized-mutable CLJ version takes ~177.315377 ms, while +; the clojure.core/atom version takes ~345.78108 ms -(defn upper-first [s] (apply str (.toUpperCase (str (first s))) (rest s))) +; If you want to use mutability here, you need to make sure all transacts (really, listener updates) +; and all derefs are on the same thread. Changing the appropriate macros from atomic to mutable (or +; vice versa) is pretty easy — much more so than changing all the places where the macro affects. -(defn cljs-env? - "Given an &env from a macro, tells whether it is expanding into CLJS." - [env] - (boolean (:ns env))) +;;; Misc utils -#?(:clj -(defmacro if-cljs - "Return @then if the macro is generating CLJS code and @else for CLJ code." - {:from "https://groups.google.com/d/msg/clojurescript/iBY5HaQda4A/w1lAQi9_AwsJ"} - ([env then else] `(if (cljs-env? ~env) ~then ~else)))) +(defn upper-first [s] (apply str (.toUpperCase (str (first s))) (rest s))) ;;; Mutability @@ -38,21 +39,51 @@ IDeref (deref [this] val))) -#?(:clj (defmacro mut [x] `(Mutable. ~x))) +#?(:clj (defmacro mut "Mutable" [x] #_`(Mutable. ~x) + `(clojure.core/atom ~x))) + +#?(:clj (defmacro umut "Unsynchronized mutable" [x] + (if-cljs &env x `(clojure.core/atom ~x)))) #?(:clj (defmacro getm "Get mutable" [x] (if-cljs &env x - `(.get ~(with-meta x {:tag 'posh.lib.ratom.Mutable}))))) + #_`(.get ~(with-meta x {:tag 'posh.lib.ratom.Mutable})) + `(deref ~x)))) #?(:clj (defmacro setm! "Set mutable" [x v] (if-cljs &env `(set! ~x ~v) - `(.set ~(with-meta x {:tag 'posh.lib.ratom.Mutable}) ~v)))) + #_`(.set ~(with-meta x {:tag 'posh.lib.ratom.Mutable}) ~v) + `(reset! ~x ~v)))) + +#?(:clj +(defmacro getum + "Get unsynchronized-mutable" + [x] + (if-cljs &env x + #_x + `(deref ~x)))) + +#?(:clj +(defmacro getum* + "Get unsynchronized-mutable, handling nil" + [x] + (if-cljs &env x + #_x + `(let [x# ~x] (if (nil? x#) x# (deref x#)))))) + +#?(:clj +(defmacro setum! + "Set unsynchronized-mutable" + [x v] + (if-cljs &env `(set! ~x ~v) + #_`(set! ~x ~v) + `(reset! ~x ~v)))) #?(:clj (defmacro getf @@ -78,27 +109,32 @@ #?(:clj (defmacro add! [x v] (if-cljs &env `(.push ~x ~v) - `(.add ~(with-meta x {:tag 'java.util.ArrayList}) ~v)))) + #_`(.add ~(with-meta x {:tag 'java.util.ArrayList}) ~v) + `(swap! ~x conj ~v)))) #?(:clj (defmacro array-list [& args] (if-cljs &env `(array ~@args) - `(doto (ArrayList.) ~@(for [arg args] `(.add ~arg)))))) + #_`(doto (ArrayList.) ~@(for [arg args] `(.add ~arg))) + `(clojure.core/atom [~@args])))) #?(:clj (defmacro alength* [x] (if-cljs &env `(alength ~x) - `(.size ~(with-meta x {:tag 'java.util.ArrayList}))))) + #_`(.size ~(with-meta x {:tag 'java.util.ArrayList})) + `(count (deref ~x))))) #?(:clj (defmacro aget* [x i] (if-cljs &env `(aget ~x) - `(.get ~(with-meta x {:tag 'java.util.ArrayList}) ~i)))) + #_`(.get ~(with-meta x {:tag 'java.util.ArrayList}) ~i) + `(get (deref ~x) ~i)))) #?(:clj (defmacro aset* [x i v] (if-cljs &env `(aset ~x ~i ~v) - `(.set ~(with-meta x {:tag 'java.util.ArrayList}) ~i ~v)))) + #_`(.set ~(with-meta x {:tag 'java.util.ArrayList}) ~i ~v) + `(swap! ~x assoc ~i ~v)))) ;;; Interfaces and (certain) types @@ -109,10 +145,10 @@ #?(:clj (deftype HasCaptured - [^:unsynchronized-mutable captured] + [^:mutable captured] IHasCaptured - (getCaptured [this] captured) - (setCaptured [this v] (set! captured v)))) + (getCaptured [this] (getum captured)) + (setCaptured [this v] (setum! captured v)))) #?(:clj (definterface IHasWatches @@ -207,14 +243,15 @@ (swap! -running + (- (count new) (count old)))) new) -(defn- add-w [#?(:clj ^IHasWatches this :cljs this) key f] +(defn- add-w [#?(:clj ^IHasWatches this :cljs this) k f] (let [w (getf this watches)] - (setf! this watches (check-watches w (assoc w key f))) + (setf! this watches (check-watches w (assoc w k f))) (setf! this watchesArr nil))) -(defn- remove-w [#?(:clj ^IHasWatches this :cljs this) key] +(defn- remove-w [#?(:clj ^IHasWatches this :cljs this) k] (let [w (getf this watches)] - (setf! this watches (check-watches w (dissoc w key))) + ((get w k)) ; Zero-arity is called upon removal for cleanup + (setf! this watches (check-watches w (dissoc w k))) (setf! this watchesArr nil))) (defn- notify-w [#?(:clj ^IHasWatches this :cljs this) old new] @@ -222,7 +259,7 @@ a (if (nil? w) ;; Copy watches to array(-list) for speed (->> (getf this watches) - (reduce-kv #(doto %1 (add! %2) (add! %3)) #?(:clj (ArrayList.) :cljs #js[])) + (reduce-kv #(doto %1 (add! %2) (add! %3)) (array-list)) (setf! this watchesArr)) w)] (let [len (alength* a)] @@ -247,7 +284,7 @@ (when (nil? (getm rea-queue)) (setm! rea-queue (array-list)) #_(:cljs (reagent.impl.batching/schedule))) - (add! (getm rea-queue) r)) + (let [x (getm rea-queue)] (add! x r))) ; For anti-reflection (defn flush! [] (loop [] @@ -267,16 +304,11 @@ (defprotocol IReactiveAtom) (deftype RAtom - #?(:clj [^:unsynchronized-mutable state - meta - validator - ^:unsynchronized-mutable watches - ^:unsynchronized-mutable watchesArr] - :cljs [^:mutable state - meta - validator - ^:mutable watches - ^:mutable watchesArr]) + [^:mutable state + meta + validator + ^:mutable watches + ^:mutable watchesArr] #?(:cljs IAtom) IReactiveAtom @@ -286,23 +318,23 @@ IDeref (#?(:clj deref :cljs -deref) [this] (notify-deref-watcher! this) - state) + (getum state)) #?(:clj IAtom :cljs IReset) (#?(:clj reset :cljs -reset!) [a new-value] (when-not (nil? validator) (assert (validator new-value) "Validator rejected reference state")) (let [old-value state] - (set! state new-value) - (when-not (nil? watches) + (setum! state new-value) + (when-not (nil? (getum watches)) (notify-w a old-value new-value)) new-value)) #?(:cljs ISwap) - (#?(:clj swap :cljs -swap!) [a f] (#?(:clj .reset :cljs -reset!) a (f state))) - (#?(:clj swap :cljs -swap!) [a f x] (#?(:clj .reset :cljs -reset!) a (f state x))) - (#?(:clj swap :cljs -swap!) [a f x y] (#?(:clj .reset :cljs -reset!) a (f state x y))) - (#?(:clj swap :cljs -swap!) [a f x y more] (#?(:clj .reset :cljs -reset!) a (apply f state x y more))) + (#?(:clj swap :cljs -swap!) [a f] (#?(:clj .reset :cljs -reset!) a (f (getum state)))) + (#?(:clj swap :cljs -swap!) [a f x] (#?(:clj .reset :cljs -reset!) a (f (getum state) x))) + (#?(:clj swap :cljs -swap!) [a f x y] (#?(:clj .reset :cljs -reset!) a (f (getum state) x y))) + (#?(:clj swap :cljs -swap!) [a f x y more] (#?(:clj .reset :cljs -reset!) a (apply f (getum state) x y more))) IMeta (#?(:clj meta :cljs -meta) [_] meta) @@ -320,15 +352,15 @@ #?@(:clj [IHasWatches - (getWatches [this] watches) - (setWatches [this v] (set! watches v)) - (getWatchesArr [this] watchesArr) - (setWatchesArr [this v] (set! watchesArr v))])) + (getWatches [this] (getum watches)) + (setWatches [this v] (setum! watches v)) + (getWatchesArr [this] (getum watchesArr)) + (setWatchesArr [this v] (setum! watchesArr v))])) (defn atom "Like clojure.core/atom, except that it keeps track of derefs." - ([x] (RAtom. x nil nil nil nil)) - ([x & {:keys [meta validator]}] (RAtom. x meta validator nil nil))) + ([x] (RAtom. (umut x) nil nil (umut nil) (umut nil))) + ([x & {:keys [meta validator]}] (RAtom. (umut x) meta validator (umut nil) (umut nil)))) ;;; track @@ -365,39 +397,38 @@ (getArgs []))) (deftype Track - #?(:clj [^:unsynchronized-mutable f args - ^:unsynchronized-mutable reaction] - :cljs [f args ^:mutable reaction]) + [^:mutable f args ; Note: `f` is not marked mutable in Reagent but is nonetheless mutable + ^:mutable reaction] IReactiveAtom IDeref (#?(:clj deref :cljs -deref) [this] - (if-some [^IDeref r reaction] + (if-some [^IDeref r (getum reaction)] (#?(:clj .deref :cljs -deref) r) - (cached-reaction #(apply f args) f args this nil))) + (cached-reaction #(apply (getum f) args) (getum f) args this nil))) #?(:clj Object :cljs IEquiv) (#?(:clj equals :cljs -equiv) [_ other] (and (instance? Track other) - (= f (getf ^Track other f)) + (= (getum f) (getf ^Track other f)) (= args (getf ^Track other args)))) #?(:clj IHashEq :cljs IHash) - (#?(:clj hasheq :cljs -hash) [_] (hash [f args])) + (#?(:clj hasheq :cljs -hash) [_] (hash [(getum f) args])) #?(:cljs IPrintWithWriter) #?(:cljs (-pr-writer [a w opts] (pr-atom a w opts "Track:"))) #?@(:clj [IHasF - (getF [this] f) - (setF [this v] (set! f v)) + (getF [this] (getum f)) + (setF [this v] (setum! f v)) ITrack (getArgs [this] args)])) (defn make-track [f args] - (Track. f args nil)) + (Track. (umut f) args (umut nil))) (defn make-track! [f args] (let [^Track t (make-track f args) @@ -424,14 +455,10 @@ (getRatom []))) (deftype RCursor - #?(:clj [ratom path - ^:unsynchronized-mutable reaction - ^:unsynchronized-mutable state - ^:unsynchronized-mutable watches] - :cljs [ratom path - ^:mutable reaction - ^:mutable state - ^:mutable watches]) + [ratom path + ^:mutable reaction + ^:mutable state + ^:mutable watches] #?(:cljs IAtom) IReactiveAtom @@ -448,8 +475,8 @@ (setState [this oldstate newstate] (when-not (identical? oldstate newstate) - (set! state newstate) - (when (some? watches) + (setum! state newstate) + (when (some? (getum watches)) (notify-w this oldstate newstate)))) #?@(:clj @@ -458,8 +485,8 @@ IDeref (#?(:clj deref :cljs -deref) [this] - (let [oldstate state - newstate (if-some [^IDeref r reaction] + (let [oldstate (getum state) + newstate (if-some [^IDeref r (getum reaction)] (#?(:clj .deref :cljs -deref) r) (let [f (if (satisfies? IDeref ratom) #(get-in @ratom path) @@ -470,7 +497,7 @@ #?(:clj IAtom :cljs IReset) (#?(:clj reset :cljs -reset!) [this new-value] - (let [oldstate state] + (let [oldstate (getum state)] (.setState this oldstate new-value) (if (satisfies? IDeref ratom) (if (= path []) @@ -503,8 +530,7 @@ (not (vector? src)))) (str "src must be a reactive atom or a function, not " (pr-str src))) - (RCursor. src path nil nil nil)) - + (RCursor. src path (umut nil) (umut nil) (umut nil))) ;;; with-let support @@ -531,34 +557,23 @@ (declare handle-reaction-change) (deftype Reaction - #?(:clj [^:unsynchronized-mutable f - ^:unsynchronized-mutable state - ^:unsynchronized-mutable dirty? - ^:unsynchronized-mutable no-cache? - ^:unsynchronized-mutable watching - ^:unsynchronized-mutable watches - ^:unsynchronized-mutable autoRun - ^:unsynchronized-mutable caught - ^:unsynchronized-mutable on-set - ^:unsynchronized-mutable on-dispose - ^:unsynchronized-mutable on-dispose-arr - ^:unsynchronized-mutable captured - ^:unsynchronized-mutable ratomGeneration - ^:unsynchronized-mutable watchesArr] - :cljs [f ^:mutable state - ^:mutable - ^boolean dirty? - ^boolean no-cache? - ^:mutable watching - ^:mutable watches - ^:mutable autoRun - ^:mutable caught - ^:mutable on-set - ^:mutable on-dispose - ^:mutable on-dispose-arr - ^:mutable captured - ^:mutable ratomGeneration - ^:mutable watchesArr]) + ; Note: `f`, `dirty?`, and `no-cache?` are not marked mutable in Reagent but are nonetheless mutable + [^:mutable f + ^:mutable state + #?(:clj ^:mutable dirty? + :cljs ^:mutable ^boolean dirty?) + #?(:clj ^:mutable no-cache? + :cljs ^:mutable ^boolean no-cache?) + ^:mutable watching + ^:mutable watches + ^:mutable autoRun + ^:mutable caught + ^:mutable on-set + ^:mutable on-dispose + ^:mutable on-dispose-arr + ^:mutable captured + ^:mutable ratomGeneration + ^:mutable watchesArr] #?(:cljs IAtom) IReactiveAtom @@ -566,19 +581,19 @@ #?(:cljs (-notify-watches [this old new] (notify-w this old new))) ; TODO CLJ (#?(:clj addWatch :cljs -add-watch ) [this key f] (add-w this key f)) (#?(:clj removeWatch :cljs -remove-watch) [this key] - (let [was-empty (empty? watches)] + (let [was-empty (empty? (getum watches))] (remove-w this key) (when (and (not was-empty) - (empty? watches) - (nil? autoRun)) + (empty? (getum watches)) + (nil? (getum autoRun))) (#?(:clj .dispose :cljs dispose) this)))) #?(:clj IAtom :cljs IReset) (#?(:clj reset :cljs -reset!) [a newval] - (assert (fn? (.-on-set a)) "Reaction is read only.") - (let [oldval state] - (set! state newval) - (on-set oldval newval) + (assert (fn? (getum on-set)) "Reaction is read only.") + (let [oldval (getum state)] + (setum! state newval) + ((getum on-set) oldval newval) (notify-w a oldval newval) newval)) @@ -595,47 +610,47 @@ (handleChange [this sender oldval newval] (when-not (or (identical? oldval newval) - dirty?) - (if (nil? autoRun) + (getum dirty?)) + (if (nil? (getum autoRun)) (do - (set! dirty? true) + (setum! dirty? true) (rea-enqueue this)) - (if (true? autoRun) + (if (true? (getum autoRun)) (.run this false) - (autoRun this))))) + ((getum autoRun) this))))) (updateWatching [this derefed] (let [new (set derefed) - old (set watching)] - (set! watching derefed) + old (set (getum watching))] + (setum! watching derefed) (doseq [#?(:clj ^IRef w :cljs w) (s/difference new old)] - (#?(:clj .addWatch :cljs -add-watch) w this handle-reaction-change)) + (#?(:clj .addWatch :cljs -add-watch) w this (fn ([]) ([k a o n] (handle-reaction-change k a o n))))) (doseq [#?(:clj ^IRef w :cljs w) (s/difference old new)] (#?(:clj .removeWatch :cljs -remove-watch) w this)))) (queuedRun [this] - (when (and dirty? (some? watching)) + (when (and (getum dirty?) (some? (getum watching))) (.run this true))) (tryCapture [this f] (try - (set! caught nil) + (setum! caught nil) (deref-capture f this) (catch #?(:clj Throwable :cljs :default) e - (set! state e) - (set! caught e) - (set! dirty? false)))) + (setum! state e) + (setum! caught e) + (setum! dirty? false)))) (run [this check] - (let [oldstate state + (let [oldstate (getum state) res (if check - (.tryCapture this f) - (deref-capture f this))] - (when-not no-cache? - (set! state res) + (.tryCapture this (getum f)) + (deref-capture (getum f) this))] + (when-not (getum no-cache?) + (setum! state res) ;; Use = to determine equality from reactions, since ;; they are likely to produce new data structures. - (when-not (or (nil? watches) + (when-not (or (nil? (getum watches)) (= oldstate res)) (notify-w this oldstate res))) res)) @@ -646,37 +661,37 @@ on-dispose* (:on-dispose opts) no-cache* (:no-cache opts)] (when (some? auto-run*) - (set! autoRun auto-run*)) + (setum! autoRun auto-run*)) (when (some? on-set*) - (set! on-set on-set*)) + (setum! on-set on-set*)) (when (some? on-dispose*) - (set! on-dispose on-dispose*)) + (setum! on-dispose on-dispose*)) (when (some? no-cache*) - (set! no-cache? no-cache*)))) + (setum! no-cache? no-cache*)))) #?@(:clj - [(getRatomGeneration [this] ratomGeneration) - (setRatomGeneration [this v] (set! ratomGeneration v)) - (getIsDirty [this] dirty?) - (setIsDirty [this v] (set! dirty? v)) - (getWatching [this] watching) - (setWatching [this v] (set! watching v)) - (getAutoRun [this] autoRun) - (setAutoRun [this v] (set! autoRun v)) + [(getRatomGeneration [this] (getum ratomGeneration)) + (setRatomGeneration [this v] (setum! ratomGeneration v)) + (getIsDirty [this] (getum dirty?)) + (setIsDirty [this v] (setum! dirty? v)) + (getWatching [this] (getum watching)) + (setWatching [this v] (setum! watching v)) + (getAutoRun [this] (getum autoRun)) + (setAutoRun [this v] (setum! autoRun v)) IHasF - (getF [this] f) - (setF [this v] (set! f v)) + (getF [this] (getum f)) + (setF [this v] (setum! f v)) IHasCaptured - (getCaptured [this] captured) - (setCaptured [this v] (set! captured v)) + (getCaptured [this] (getum captured)) + (setCaptured [this v] (setum! captured v)) IHasWatches - (getWatches [this] watches) - (setWatches [this v] (set! watches v)) - (getWatchesArr [this] watchesArr) - (setWatchesArr [this v] (set! watchesArr v))]) + (getWatches [this] (getum watches)) + (setWatches [this v] (setum! watches v)) + (getWatchesArr [this] (getum watchesArr)) + (setWatchesArr [this v] (setum! watchesArr v))]) IRunnable (run [this] @@ -685,43 +700,43 @@ IDeref (#?(:clj deref :cljs -deref) [this] - (when-some [e caught] + (when-some [e (getum caught)] (throw e)) (let [non-reactive (nil? *ratom-context*)] (when non-reactive (flush!)) - (if (and non-reactive (nil? autoRun)) - (when dirty? - (let [oldstate state] - (set! state (f)) - (when-not (or (nil? watches) (= oldstate state)) - (notify-w this oldstate state)))) + (if (and non-reactive (nil? (getum autoRun))) + (when (getum dirty?) + (let [oldstate (getum state)] + (setum! state ((getum f))) + (when-not (or (nil? (getum watches)) (= oldstate (getum state))) + (notify-w this oldstate (getum state))))) (do (notify-deref-watcher! this) - (when dirty? + (when (getum dirty?) (.run this false))))) - state) + (getum state)) IDisposable (dispose [this] - (let [s state - wg watching] - (set! watching nil) - (set! state nil) - (set! autoRun nil) - (set! dirty? true) + (let [s (getum state) + wg (getum watching)] + (setum! watching nil) + (setum! state nil) + (setum! autoRun nil) + (setum! dirty? true) (doseq [#?(:clj ^IRef w :cljs w) (set wg)] (#?(:clj .removeWatch :cljs -remove-watch) w this)) - (when (some? on-dispose) - (on-dispose s)) - (when-some [a on-dispose-arr] + (when (some? (getum on-dispose)) + ((getum on-dispose) s)) + (when-some [a (getum on-dispose-arr)] (dotimes [i (alength* a)] ((aget* a i) this))))) (addOnDispose [this f] ;; f is called with the reaction as argument when it is no longer active - (if-some [a on-dispose-arr] + (if-some [a (getum on-dispose-arr)] (add! a f) - (set! on-dispose-arr (array-list f)))) + (setum! on-dispose-arr (array-list f)))) #?(:clj Object :cljs IEquiv) (#?(:clj equals :cljs -equiv) [o other] (identical? o other)) @@ -740,7 +755,7 @@ (when (dev?) (setf! r ratomGeneration (setm! generation (inc generation)))) (let [res (in-context r f) - c (getf r captured)] + c (getum* (getf r captured))] ; `getum*` because it'll be an `array-list` (setf! r #?(:clj isDirty :cljs dirty?) false) ;; Optimize common case where derefs occur in same order (when-not (#?(:clj = :cljs arr-eq) c (getf r watching)) @@ -748,7 +763,9 @@ res)) (defn make-reaction [f & {:keys [auto-run on-set on-dispose]}] - (let [reaction (Reaction. f nil true false nil nil nil nil nil nil nil nil nil nil)] + (let [reaction (Reaction. (umut f ) (umut nil) (umut true) (umut false) (umut nil) (umut nil) + (umut nil) (umut nil) (umut nil ) (umut nil ) (umut nil) (umut nil) + (umut nil) (umut nil))] (.setOpts reaction {:auto-run auto-run :on-set on-set :on-dispose on-dispose}) @@ -784,46 +801,44 @@ (setChanged [v]))) (deftype Wrapper - #?(:clj [^:unsynchronized-mutable state callback - ^:unsynchronized-mutable changed - ^:unsynchronized-mutable watches] - :cljs [^:mutable state callback - ^:mutable ^boolean changed - ^:mutable watches]) + [^:mutable state callback + #?(:clj ^:mutable changed + :cljs ^:mutable ^boolean changed) + ^:mutable watches] #?(:cljs IAtom) IDeref (#?(:clj deref :cljs -deref) [this] (when (dev?) - (when (and changed (some? *ratom-context*)) + (when (and (getum changed) (some? *ratom-context*)) (#?(:clj println :cljs warn) "derefing stale wrap: " (pr-str this)))) - state) + (getum state)) #?(:clj IAtom :cljs IReset) (#?(:clj reset :cljs -reset!) [this newval] - (let [oldval state] - (set! changed true) - (set! state newval) - (when (some? watches) + (let [oldval (getum state)] + (setum! changed true) + (setum! state newval) + (when (some? (getum watches)) (notify-w this oldval newval)) (callback newval) newval)) #?(:cljs ISwap) - (#?(:clj swap :cljs -swap!) [a f] (#?(:clj .reset :cljs -reset!) a (f state))) - (#?(:clj swap :cljs -swap!) [a f x] (#?(:clj .reset :cljs -reset!) a (f state x))) - (#?(:clj swap :cljs -swap!) [a f x y] (#?(:clj .reset :cljs -reset!) a (f state x y))) - (#?(:clj swap :cljs -swap!) [a f x y more] (#?(:clj .reset :cljs -reset!) a (apply f state x y more))) + (#?(:clj swap :cljs -swap!) [a f] (#?(:clj .reset :cljs -reset!) a (f (getum state)))) + (#?(:clj swap :cljs -swap!) [a f x] (#?(:clj .reset :cljs -reset!) a (f (getum state) x))) + (#?(:clj swap :cljs -swap!) [a f x y] (#?(:clj .reset :cljs -reset!) a (f (getum state) x y))) + (#?(:clj swap :cljs -swap!) [a f x y more] (#?(:clj .reset :cljs -reset!) a (apply f (getum state) x y more))) #?(:clj Object :cljs IEquiv) (#?(:clj equals :cljs -equiv) [_ other] (and (instance? Wrapper other) ;; If either of the wrappers have changed, equality ;; cannot be relied on. - (not changed) + (not (getum changed)) (not (getf ^Wrapper other changed)) - (= state (getf ^Wrapper other state)) + (= (getum state) (getf ^Wrapper other state)) (= callback (getf ^Wrapper other callback)))) #?(:clj IRef :cljs IWatchable) @@ -836,17 +851,32 @@ #?@(:clj [IWrapper - (getState [this] state) - (setState [this v] (set! state v)) + (getState [this] (getum state)) + (setState [this v] (setum! state v)) (getCallback [this] callback) - (getChanged [this] changed) - (setChanged [this v] (set! changed v))])) - -#_(:cljs -(defn make-wrapper [value callback-fn args] - (Wrapper. value - (reagent.impl.util/partial-ifn. callback-fn args nil) - false nil))) + (getChanged [this] (getum changed)) + (setChanged [this v] (setum! changed v))])) + +(deftype PartialIFn [f args ^:mutable p] + #?(:clj clojure.lang.IFn :cljs IFn) + (#?(:clj invoke :cljs -invoke) [_ & a] + (or p (setum! p (apply core/partial f args))) + (apply p a)) + #?(:clj Object :cljs IEquiv) + (#?(:clj equals :cljs -equiv) [_ other] + (and (= f (.-f ^PartialIFn other)) (= args (.-args ^PartialIFn other)))) + #?(:clj IHashEq :cljs IHash) + (#?(:clj hasheq :cljs -hash) [_] (hash [f args]))) + +(defn make-wrapper + ([value callback-fn] + (Wrapper. (umut value) + callback-fn + (umut false) (umut nil))) + ([value callback-fn args] + (Wrapper. (umut value) + (PartialIFn. callback-fn args (umut nil)) + (umut false) (umut nil)))) #?(:cljs ; TODO CLJ (defn rswap! @@ -870,8 +900,7 @@ nil)) #?(:clj -(defmacro reaction [& body] - `(make-reaction (fn [] ~@body)))) +(defmacro reaction [& body] `(make-reaction (fn [] ~@body)))) #?(:clj (defmacro run! @@ -881,6 +910,18 @@ (deref co#) co#))) +(defn add-eager-watch + ([r k f] (add-eager-watch r k f (fn []))) + ([r k f on-dispose] + (let [deref-times (atom 0) + runner (run! @r (swap! deref-times inc)) + f' (fn ([] (.dispose #?(:clj ^IDisposable runner :cljs runner))) + ([k a oldv newv] + (when (> #?(:clj (long @deref-times) + :cljs @deref-times) 0) + (f k a oldv newv))))] + (add-watch r k f')))) + #?(:clj (defmacro with-let [bindings & body] (assert (vector? bindings)) diff --git a/src/posh/lib/util.cljc b/src/posh/lib/util.cljc index 4221298..1a4745a 100644 --- a/src/posh/lib/util.cljc +++ b/src/posh/lib/util.cljc @@ -1,12 +1,27 @@ (ns posh.lib.util) +;;; MACROS ;;; + +(defn cljs-env? + "Given an &env from a macro, tells whether it is expanding into CLJS." + [env] + (boolean (:ns env))) + +#?(:clj +(defmacro if-cljs + "Return @then if the macro is generating CLJS code and @else for CLJ code." + {:from "https://groups.google.com/d/msg/clojurescript/iBY5HaQda4A/w1lAQi9_AwsJ"} + ([env then else] `(if (cljs-env? ~env) ~then ~else)))) + +;;; EXCEPTION ;;; + (defn exception [^String msg] #?(:clj (throw (Exception. msg)) :cljs (throw (js/Error. msg)))) -;;;; db stuff +;;; DB ;;; (defn t-for-datoms [q-fn db datoms] (q-fn '[:find ?e ?a ?v ?t @@ -15,3 +30,31 @@ [?e ?a _ ?t]] db datoms)) + +;;; LOGGING ;;; + +(defonce debug? (atom false)) + +#?(:clj +(defmacro debug [msg & args] + (when @debug? + `(let [out-str# (with-out-str + (println ~(if-cljs &env `(js/Date.) `(java.util.Date.)) + ~(if-cljs &env nil `(str "[" (.getName (Thread/currentThread)) "]")) + ~msg) + ~@(for [arg args] + `(clojure.pprint/pprint ~arg)))] + (print out-str#) + (flush))))) + +#?(:clj +(defmacro prl + "'Print labeled'. + Puts each x in `xs` as vals in a map. + The keys in the map are the quoted vals. Then prints the map." + [& xs] + (when @debug? + `(let [out-str# (with-out-str + (clojure.pprint/pprint ~(->> xs (map #(vector (list 'quote %) %)) (into {}))))] + (print out-str#) + (flush))))) diff --git a/src/posh/plugin_base.cljc b/src/posh/plugin_base.cljc index 1fa953b..101251f 100644 --- a/src/posh/plugin_base.cljc +++ b/src/posh/plugin_base.cljc @@ -2,7 +2,9 @@ (:require [posh.core :as p] [posh.stateful :as ps] [posh.lib.db :as db] - [posh.lib.update :as u])) + [posh.lib.update :as u] + [posh.lib.util + :refer [debug]])) (defn missing-pull-result [dcfg pull-expr] @@ -21,29 +23,36 @@ (nil? id) (missing-pull-result query))) +(defn- assert-listeners [conn] + (assert (->> conn meta :listeners (#?(:clj instance? :cljs satisfies?) #?(:clj clojure.lang.IAtom :cljs cljs.core/IAtom))) + (str "Connection requires listener metadata atom."))) + ;; need to set last-tx-t in conn so that it doesn't try the same tx twice (defn set-conn-listener! [dcfg posh-atom conn db-id] (let [posh-vars {:posh-atom posh-atom - :db-id db-id}] + :db-id db-id} + conn ((or (:->poshable-conn dcfg) identity) conn)] + (assert-listeners conn) (do ((:listen! dcfg) conn :posh-dispenser (fn [var] + #_(debug "posh-dispenser" var) (when (keyword? var) (get posh-vars var)))) - (add-watch conn :posh-schema-listener - (fn [_ _ old-state new-state] - (when (not= (:schema old-state) (:schema new-state)) - (swap! posh-atom assoc-in [:schema db-id] (:schema new-state))))) - ;; Update posh conn + ;; Update posh conn ((:listen! dcfg) conn :posh-listener (fn [tx-report] + #_(debug "posh-listener" tx-report) ;;(println "CHANGED: " (keys (:changed (p/after-transact @posh-atom {conn tx-report})))) (let [{:keys [ratoms changed]} (swap! posh-atom p/after-transact {conn tx-report})] (doseq [[k v] changed] (reset! (get ratoms k) (:results v)))))) + (when-let [f (:additional-listeners dcfg)] (f conn posh-atom db-id)) conn))) +; TODO allow for `unposh!` or some such thing +; TODO warn if calling `posh!` multiple times on same conn (defn posh! [dcfg & conns] (let [posh-atom (atom {})] (reset! posh-atom @@ -60,8 +69,7 @@ (p/add-db posh-tree db-id (set-conn-listener! dcfg posh-atom (first conns) db-id) - (:schema @(first conns)))))))))) - + (when-let [f (:conn->schema dcfg)] (f (first conns))))))))))) ;; Posh's state atoms are stored inside a listener in the meta data of ;; the datascript conn @@ -185,7 +193,7 @@ (= (inc n-query-args) (count args)) [(butlast args) (last args)] :else - (throw "Incorrect number of args passed to posh query")) + (throw (#?(:clj Exception. :cljs js/Error.) "Incorrect number of args passed to posh query"))) true-poshdb-args (map #(if ((:conn? dcfg) %) (get-db dcfg %) %) args) posh-atom (first (remove nil? (map #(get-posh-atom dcfg %) args))) storage-key [:q query true-poshdb-args]] diff --git a/src/posh/reagent.cljs b/src/posh/reagent.cljs index 1e9cf53..b8328f8 100644 --- a/src/posh/reagent.cljs +++ b/src/posh/reagent.cljs @@ -2,9 +2,10 @@ (:require-macros [reagent.ratom :refer [reaction]]) (:require [posh.plugin-base :as base :include-macros] - [datascript.core :as d] - [reagent.core :as r] - [reagent.ratom :as ra])) + [posh.lib.datascript :as ldb] + [datascript.core :as d] + [reagent.core :as r] + [reagent.ratom :as ra])) (def dcfg (let [dcfg {:db d/db @@ -17,7 +18,9 @@ :listen! d/listen! :conn? d/conn? :ratom r/atom - :make-reaction ra/make-reaction}] + :make-reaction ra/make-reaction + :conn->schema ldb/conn->schema + :additional-listeners ldb/add-schema-listener!}] (assoc dcfg :pull (partial base/safe-pull dcfg)))) (base/add-plugin dcfg) diff --git a/test/posh/clj/common_tests.clj b/test/posh/clj/common_tests.clj new file mode 100644 index 0000000..497a915 --- /dev/null +++ b/test/posh/clj/common_tests.clj @@ -0,0 +1,45 @@ +(ns posh.clj.common-tests + "Test fns shared between Datomic and DataScript (at very least in CLJ, but probably cross-platform)." + (:require [clojure.test :as test + :refer [is deftest testing]] + [posh.lib.ratom :as r] + [posh.lib.util :as u + :refer [debug prl]])) + +(defn basic-test [conn dcfg] + (let [sub ((:q dcfg) [:find '?e :where ['?e :test/attr]] conn) + sub-no-deref ((:q dcfg) [:find '?e :where ['?e :test/attr]] conn) + _ (is (= @sub #{})) + notified (atom 0) + _ (r/add-eager-watch sub :k (fn [_ _ _ _] (swap! notified inc))) + notified-no-deref (atom 0) + _ (r/add-eager-watch sub-no-deref :k-no-deref (fn [_ _ _ _] (swap! notified-no-deref inc)))] + (testing "Listeners are notified correctly" + ((:transact! dcfg) conn + [{:db/id ((:tempid dcfg)) + :test/attr "Abcde"}]) + (do @sub @sub @sub @sub) + (is (= @sub + @((:q dcfg) [:find '?e + :where ['?e :test/attr]] + conn) + ((:q* dcfg) [:find '?e + :where ['?e :test/attr]] + ((:db dcfg) conn)))) + (is (= @notified 1)) + (is (= @notified-no-deref 1)) + ((:transact! dcfg) conn + [{:db/id ((:tempid dcfg)) + :test/attr "Fghijk"}]) + (do @sub @sub @sub @sub @sub) + (is (= @notified 2)) + (is (= @notified-no-deref 2))) + (testing "Remove-watch happens correctly" + (remove-watch sub :k) + (remove-watch sub :k-no-deref) + ((:transact! dcfg) conn + [{:db/id ((:tempid dcfg)) + :test/attr "Lmnop"}]) + (do @sub @sub @sub @sub @sub @sub) + (is (= @notified 2)) + (is (= @notified-no-deref 2))))) diff --git a/test/posh/clj/datascript_test.clj b/test/posh/clj/datascript_test.clj new file mode 100644 index 0000000..fd59524 --- /dev/null +++ b/test/posh/clj/datascript_test.clj @@ -0,0 +1,25 @@ +(ns posh.clj.datascript-test + (:require [clojure.test :as test + :refer [is deftest testing]] + [datascript.core :as d] + [posh.clj.datascript :as db] + [posh.lib.ratom :as r] + [posh.lib.util :as u + :refer [debug prl]] + [posh.clj.common-tests :as common])) + +(def default-partition :db.part/default) + +(defn tempid [] (d/tempid default-partition)) + +(deftest basic-test + (let [conn (d/create-conn {:test/attr + {;:db/valueType :db.type/string + :db/cardinality :db.cardinality/one}}) + _ (db/posh! conn)] + (common/basic-test conn + {:q db/q + :q* d/q + :db d/db + :tempid tempid + :transact! db/transact!}))) diff --git a/test/posh/clj/datomic_test.clj b/test/posh/clj/datomic_test.clj new file mode 100644 index 0000000..4f68ef0 --- /dev/null +++ b/test/posh/clj/datomic_test.clj @@ -0,0 +1,73 @@ +(ns posh.clj.datomic-test + (:require [clojure.test :as test + :refer [is deftest testing]] + [datomic.api :as d] + [posh.clj.datomic :as db] + [posh.lib.ratom :as r] + [posh.lib.util :as u + :refer [debug prl]] + [posh.clj.common-tests :as common]) + (:import posh.clj.datomic.PoshableConnection)) + +#_(do (require '[clojure.tools.namespace.repl :refer [refresh]]) + (reset! posh.lib.util/debug? true) + (refresh) + (set! *warn-on-reflection* true) + (eval `(do (clojure.test/run-tests 'posh.lib.ratom-test) + (clojure.test/run-tests 'posh.clj.datascript-test) + (clojure.test/run-tests 'posh.clj.datomic-test)))) + +(def default-partition :db.part/default) + +(defn tempid [] (d/tempid default-partition)) + +(defn install-partition [part] + (let [id (d/tempid :db.part/db)] + [{:db/id id + :db/ident part} + [:db/add :db.part/db + :db.install/partition id]])) + +(defmacro with-conn [sym & body] + `(let [uri# "datomic:mem://test"] + (try (d/create-database uri#) + (let [~sym (d/connect uri#)] + (try ~@body + (finally (d/release ~sym)))) + (finally (d/delete-database uri#))))) + +(defn transact-schemas! + "This is used because, perhaps very strangely, schema changes to Datomic happen + asynchronously." + [conn schemas] + (let [txn-report (db/transact! conn + (->> schemas + (map #(assoc % :db/id (d/tempid :db.part/db) + :db.install/_attribute :db.part/db)))) + txn-id (-> txn-report :tx-data first (get 3)) + _ #_(deref (d/sync (db/->conn conn) (java.util.Date. (System/currentTimeMillis))) 500 nil) + (deref (d/sync-schema (db/->conn conn) (inc txn-id)) 500 nil)] ; frustratingly, doesn't even work with un-`inc`ed txn-id + txn-report)) + +(defn with-setup [schemas f] + (with-conn conn* + (let [poshed (db/posh! conn*) ; This performs a `with-meta` so the result is needed + conn (-> poshed :conns :conn0) ; Has the necessary meta ; TODO simplify this + _ (is (instance? PoshableConnection conn))] + (try (let [txn-report (db/transact! conn (install-partition default-partition)) + txn-report (transact-schemas! conn schemas)] + (f conn)) + (finally (db/stop conn)))))) ; TODO `unposh!` + +(deftest basic-test + (with-setup + [{:db/ident :test/attr + :db/valueType :db.type/string + :db/cardinality :db.cardinality/one}] + (fn [conn] + (common/basic-test conn + {:db db/db* + :q db/q + :q* d/q + :tempid tempid + :transact! db/transact!}))))