Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Posh with Datomic #17

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b879ea2
Basic test established for Datomic!
alexandergunnarson Jan 7, 2017
44054e4
The tests progress... semi-unsatisfactorily
alexandergunnarson Jan 7, 2017
1ce7a63
Slightly update test
alexandergunnarson Jan 7, 2017
7875409
CLJ DataScript simple test works!!
alexandergunnarson Jan 7, 2017
bcd0ea9
Update tests
alexandergunnarson Jan 7, 2017
f40f42b
Add in @seantempesta 's changes to datomic.clj (thanks!)
alexandergunnarson Jan 16, 2017
7292a31
Fix minor compilation error
alexandergunnarson Jan 16, 2017
e2f71dc
Add tools.namespace in dev dependencies
alexandergunnarson Jan 17, 2017
17d661d
`rand` -> `gensym` for listener ids to guarantee uniqueness
alexandergunnarson Jan 17, 2017
cbb28e4
Move certain utils to, well, lib/util
alexandergunnarson Jan 17, 2017
c3de7d3
Add `notified-times` test to DS and Datomic
alexandergunnarson Jan 17, 2017
3fed4ed
Update plugin-base slightly
alexandergunnarson Jan 17, 2017
16b1fbd
Remove code duplication
alexandergunnarson Jan 17, 2017
7e32df3
Remove all `unsynchronized-mutable` and replace with atoms. Retain ea…
alexandergunnarson Jan 17, 2017
a612b5a
No superfluous debug in plugin-base
alexandergunnarson Jan 17, 2017
1927f61
Debug should default to false
alexandergunnarson Jan 17, 2017
49cce06
Datomic test works!! This is b/c Posh listeners are now run before `p…
alexandergunnarson Jan 17, 2017
ff00d4a
Add a few explanatory comments; remove certain logging
alexandergunnarson Jan 17, 2017
58380ba
`add-eager-watch`; `make-wrapper` (as is present in Reagent implement…
alexandergunnarson Jan 17, 2017
3275552
Can now eagerly watch reactive queries (really, any reactions)
alexandergunnarson Jan 17, 2017
3d5c2c6
Slight cleanup
alexandergunnarson Jan 17, 2017
10e9a6a
Refactor Datomic test to be a little cleaner
alexandergunnarson Jan 17, 2017
0e324ca
Slightly extend Datomic test
alexandergunnarson Jan 17, 2017
6b152dd
`add-eager-watch` now disposes runner when `remove-watch`
alexandergunnarson Jan 17, 2017
2a7c1ad
Test `remove-watch`
alexandergunnarson Jan 17, 2017
2cb4a53
Refactor into common DataScript and Datomic tests
alexandergunnarson Jan 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.7.0"]
[org.clojure/clojurescript "1.7.228"]
#_[datascript "0.15.0"]
#_[com.datomic/datomic-free "0.9.5407"]
[org.clojure/core.match "0.3.0-alpha4"]]
:plugins [[lein-cljsbuild "1.1.3"]]
:profiles {:dev {:dependencies [[datascript "0.15.0"]
[com.datomic/datomic-free "0.9.5344"]
[org.clojure/core.async "0.2.391"]
[org.clojure/tools.namespace "0.2.11"]]}}
:cljsbuild {
:builds [ {:id "posh"
:source-paths ["src/"]
Expand Down
5 changes: 4 additions & 1 deletion src/posh/clj/datascript.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns posh.clj.datascript
(:require [posh.plugin-base :as base]
[posh.lib.ratom :as rx]
[posh.lib.datascript :as ldb]
[datascript.core :as d]))

(def dcfg
Expand All @@ -14,7 +15,9 @@
:listen! d/listen!
:conn? d/conn?
:ratom rx/atom
:make-reaction rx/make-reaction}]
:make-reaction rx/make-reaction
:conn->schema ldb/conn->schema
:additional-listeners ldb/add-schema-listener!}]
(assoc dcfg :pull (partial base/safe-pull dcfg))))

(base/add-plugin dcfg)
125 changes: 115 additions & 10 deletions src/posh/clj/datomic.clj
Original file line number Diff line number Diff line change
@@ -1,32 +1,137 @@
(ns posh.clj.datomic
(:require [posh.plugin-base :as base]
[posh.lib.ratom :as rx]
[datomic.api :as d]))
[clojure.core.async :as async
:refer [thread offer! <!! promise-chan]]
[datomic.api :as d]
[posh.lib.util :as u
:refer [debug]]))

(defn- TODO [& [msg]] (throw (ex-info (str "TODO: " msg) nil)))
(defn datom->seq [db-after ^datomic.Datom d]
[(.e d) (d/ident db-after (.a d)) (.v d) (.tx d) (.added d)])

(defn- conn? [x] (instance? datomic.Connection x))
; TODO import stuartsierra.component ?
(defprotocol Lifecycle
(start [this])
(stop [this]))

; TODO maybe we don't want blocking here?)
(defn- transact!* [& args] @(apply d/transact args))
(defn normalized-tx-report [{:keys [db-after] :as tx-report}]
(update tx-report :tx-data
(fn [datoms] (mapv #(datom->seq db-after %) datoms))))

(defn- listen!
([conn callback] (listen! conn (rand) callback))
(defn run-listeners! [pconn tx-report']
(try (doseq [[_ callback] @(:listeners pconn)] (callback tx-report'))
(catch Throwable e (debug "WARNING:" e))))

(defrecord PoshableConnection [datomic-conn listeners deduplicate-tx-idents interrupted?]
Lifecycle
(start [this]
(assert (instance? datomic.Connection datomic-conn))
(assert (instance? clojure.lang.IAtom listeners))
(assert (instance? clojure.lang.IAtom interrupted?))
; See `transact!*` as to why the below schema+entity is required.
@(d/transact datomic-conn
[{:db/id (d/tempid :db.part/db)
:db.install/_attribute :db.part/db
:db/ident :posh.clj.datomic.tx-notifier/value
:db/cardinality :db.cardinality/one
:db/valueType :db.type/uuid}
{:db/id (d/tempid :db.part/db)
:db/ident ::tx-notifier}])
(thread
(loop []
(when-not @interrupted?
; `poll` because if `take`, still won't be nil or stop waiting when conn is released
; the poll time is how long it will take to shut down when that time comes
(when-let [{:keys [db-after] :as tx-report}
(.poll ^java.util.concurrent.BlockingQueue
(d/tx-report-queue datomic-conn)
1
java.util.concurrent.TimeUnit/SECONDS)]
(try (let [{:keys [tx-data] :as tx-report'} (normalized-tx-report tx-report)
last-tx-item (last tx-data)
tx-ident (when (and last-tx-item
(= (d/ident db-after (get last-tx-item 0)) ::tx-notifier)
(= (d/ident db-after (get last-tx-item 1)) :posh.clj.datomic.tx-notifier/value))
(get last-tx-item 2))]
(try (debug "tx-report received in PoshableConnection")
(when-not (get @deduplicate-tx-idents tx-ident)
(run-listeners! this tx-report'))
(finally
(swap! deduplicate-tx-idents #(disj % tx-ident)))))
(catch Throwable e (debug "WARNING:" e)))
(recur)))))
this)
(stop [this]
(reset! interrupted? true)
(swap! deduplicate-tx-idents empty)
this))

(defn ->poshable-conn [datomic-conn]
{:pre [(instance? datomic.Connection datomic-conn)]}
(let [listeners (atom nil)]
(with-meta (start (PoshableConnection. datomic-conn listeners (atom #{}) (atom false)))
{:listeners listeners})))

(defn conn? [x] (instance? PoshableConnection x))

(defn ->conn [x]
(if (conn? x)
(:datomic-conn x)
x))

(defn assert-pconn [x] (assert (instance? PoshableConnection x)))

(defn listen!
([conn callback] (listen! conn (gensym) callback))
([conn key callback]
{:pre [(conn? conn)]}
(TODO "Need to figure out how to listen to Datomic connection in the same way as DataScript")
(swap! (:listeners (meta conn)) assoc key callback)
key))

(defn db* [x]
(cond (instance? datomic.Database x)
x
(conn? x)
(-> x :datomic-conn d/db)
(instance? datomic.Connection x)
(d/db x)
:else x #_(throw (ex-info "Object cannot be converted into DB" {:obj x}))))

(defn q* [q x & args]
(apply d/q q (db* x) args))

(defn transact!*
"The main point of the additions onto Datomic's base `transact` fn is to wait for related
listeners to be run before returning."
[conn tx]
{:pre [(conn? conn)]}
(let [; In order to ensure listeners are run only once (i.e. deduplicate them),
; we have to transmit to the report queue in a race-condition-free way
; some sort of unique ID we know ahead of time. I'd like to just use the
; txn ID, but this is not given ahead of time. Thus we must pass a squuid
; to the transaction.
; This is cleaner than e.g. using channels because they introduce race
; conditions in this situation.
tx-ident (d/squuid)
_ (swap! (:deduplicate-tx-idents conn) conj tx-ident)
tx-report @(d/transact (->conn conn)
(conj (vec tx) [:db/add ::tx-notifier :posh.clj.datomic.tx-notifier/value tx-ident]))
tx-report' (normalized-tx-report tx-report)
_ (run-listeners! conn tx-report')]
tx-report'))

(def dcfg
(let [dcfg {:db d/db
(let [dcfg {:db db*
:pull* d/pull
:q d/q
:q q*
:filter d/filter
:with d/with
:entid d/entid
:transact! transact!*
:listen! listen!
:conn? conn?
:->poshable-conn ->poshable-conn
:ratom rx/atom
:make-reaction rx/make-reaction}]
(assoc dcfg :pull (partial base/safe-pull dcfg))))
Expand Down
1 change: 0 additions & 1 deletion src/posh/core.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
:filters {}})
;; {db-id {:filter pred :as-of t :with tx-data :since t}}


(defn add-db
([posh-tree db-id conn schema] (add-db posh-tree db-id conn schema nil))
([{:keys [dcfg conns schemas dbs cache graph] :as posh-tree}
Expand Down
9 changes: 9 additions & 0 deletions src/posh/lib/datascript.cljc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
(ns posh.lib.datascript)

(defn conn->schema [conn] (:schema @conn))

(defn add-schema-listener! [conn posh-atom db-id]
(add-watch conn :posh-schema-listener
(fn [_ _ old-state new-state]
(when (not= (:schema old-state) (:schema new-state))
(swap! posh-atom assoc-in [:schema db-id] (:schema new-state))))))
2 changes: 1 addition & 1 deletion src/posh/lib/db.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
db (if with (:db-after ((:with dcfg) db with)) db) ;; with tx-data
db (if filter
((:filter dcfg) db (if (symbol? filter)
#?(:clj (resolve filter) :cljs nil)
#?(:clj (resolve filter) :cljs nil) ; TODO why use resolve?
filter))
db) ;; filter pred-sym
]
Expand Down
Loading