Skip to content

Commit

Permalink
2.1.0 type hints and upper limit for pool
Browse files Browse the repository at this point in the history
  • Loading branch information
tonsky committed Nov 8, 2023
1 parent e24237c commit 6376308
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 49 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# 2.1.0

- Type hints
- `:max-conn` and `:max-idle-conn` options for pool

# 2.0.0

- [ BREAKING ] `datascript.storage.sql.core/make` now accepts `javax.sql.DataSource` instead of `java.sql.Connection`
- [ BREAKING ] Removed `datascript.storage.sql.core/close`
- Added simple connection pool `datascript.storage.sql.core/pool`

# 1.0.0
Expand Down
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,26 @@ Currently supported `:dbtype`-s:
- `:postgresql`
- `:sqlite`

If needed, you can close connection through storage:
If your JDBC driver only provides you with `DataSource` and you want to add some basic pooling on top, use `storage-sql/pool`:

```
(def datasource
(doto (SQLiteDataSource.)
(.setUrl "jdbc:sqlite:target/db.sqlite")))
(def pooled-datasource
(storage-sql/pool datasource
{:max-conn 10
:max-idle-conn 4}))
(def storage
(storage-sql/make pooled-datasource
{:dbtype :sqlite}))
```

`pool` takes non-pooled `DataSource` and returns new `DataSource` that pools connections for you.

If you used pool to create storage, you can close it this way:

```
(storage-sql/close storage)
Expand Down
146 changes: 100 additions & 46 deletions src/datascript/storage/sql/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
[datascript.core :as d]
[datascript.storage :as storage])
(:import
[java.lang AutoCloseable]
[java.lang.reflect InvocationHandler Method Proxy]
[java.sql Connection DriverManager ResultSet SQLException Statement]
[javax.sql DataSource]
[java.lang.reflect InvocationHandler Method Proxy]))
[java.util.concurrent.locks Condition Lock ReentrantLock]))

(defmacro with-conn [[conn datasource] & body]
(let [conn (vary-meta conn assoc :tag Connection)]
Expand Down Expand Up @@ -143,7 +145,25 @@
:binary? (boolean (and (:freeze-bytes opts) (:thaw-bytes opts))))]
(merge {:ddl (ddl opts)} opts)))

(defn make
(defn make
"Create new DataScript storage from javax.sql.DataSource.
Mandatory opts:
:dbtype :: keyword, one of :h2, :mysql, :postgresql or :sqlite
Optional opts:
:batch-size :: int, default 1000
:table :: string, default \"datascript\"
:ddl :: custom DDL to create :table. Must have `addr, int` and `content, text` columns
:freeze-str :: (fn [any]) -> str, serialize DataScript segments, default pr-str
:thaw-str :: (fn [str]) -> any, deserialize DataScript segments, default clojure.edn/read-string
:freeze-bytes :: (fn [any]) -> bytes, same idea as freeze-str, but for binary serialization
:thaw-bytes :: (fn [bytes]) -> any
:freeze-str and :thaw-str, :freeze-bytes and :thaw-bytes should come in pairs, and are mutually exclusive
(it’s either binary or string serialization)"
([datasource]
{:pre [(instance? DataSource datasource)]}
(make datasource {}))
Expand All @@ -170,42 +190,60 @@

'datascript.storage/-delete
(fn [_ addr-seq]
(with-open [conn datasource]
(with-conn [conn datasource]
(delete-impl conn opts addr-seq)))}))))

(defn swap-return! [*atom f & args]
(let [*res (volatile! nil)]
(swap! *atom
(fn [atom]
(let [[res atom'] (apply f atom args)]
(vreset! *res res)
atom')))
@*res))

(defrecord Pool [*atom ^DataSource datasource opts]
java.lang.AutoCloseable
(defn close
"If storage was created with DataSource that also implements AutoCloseable,
it will close that DataSource"
[storage]
(let [datasource (:datasource storage)]
(when (instance? AutoCloseable datasource)
(.close ^AutoCloseable datasource))))

(defmacro with-lock [lock & body]
`(let [^Lock lock# ~lock]
(try
(.lock lock#)
~@body
(finally
(.unlock lock#)))))

(defrecord Pool [*atom ^Lock lock ^Condition condition ^DataSource datasource opts]
AutoCloseable
(close [_]
(let [[{:keys [taken free]} _] (swap-vals! *atom #(-> % (update :taken empty) (update :idle empty)))]
(doseq [conn (concat free taken)]
(try
(.close conn)
(.close ^Connection conn)
(catch Exception e
(.printStackTrace e))))))

DataSource
(getConnection [_]
(let [conn (swap-return! *atom
(fn [atom]
(if-some [conn (peek (:idle atom))]
[conn (-> atom
(update :taken conj conn)
(update :idle pop))]
[nil atom])))
conn (or conn
(let [conn (.getConnection datasource)]
(swap! *atom update :taken conj conn)
conn))
conn ^Connection conn
(getConnection [this]
(let [^Connection conn (with-lock lock
(loop []
(let [atom @*atom]
(cond
;; idle connections available
(> (count (:idle atom)) 0)
(let [conn (peek (:idle atom))]
(swap! *atom #(-> %
(update :taken conj conn)
(update :idle pop)))
conn)

;; has space for new connection
(< (count (:taken atom)) (:max-conn opts))
(let [conn (.getConnection datasource)]
(swap! *atom update :taken conj conn)
conn)

;; already at limit
:else
(do
(.await condition)
(recur))))))
*closed? (volatile! false)]
(Proxy/newProxyInstance
(.getClassLoader Connection)
Expand All @@ -220,14 +258,18 @@
(.rollback conn)
(.setAutoCommit conn true))
(vreset! *closed? true)
(when-some [conn (swap-return! *atom
(fn [atom]
(if (>= (count (:idle atom)) (:max-conn opts))
[conn (update atom :taken disj conn)]
[nil (-> atom
(update :taken disj conn)
(update :idle conj conn))])))]
(.close conn))
(with-lock lock
(if (< (count (:idle @*atom)) (:max-idle-conn opts))
;; normal return to pool
(do
(swap! *atom #(-> %
(update :taken disj conn)
(update :idle conj conn)))
(.signal condition))
;; excessive idle conn
(do
(swap! *atom update :taken disj conn)
(.close conn))))
nil)

"isClosed"
Expand All @@ -237,13 +279,25 @@
(.invoke method conn args)))))))))

(defn pool
([datasource]
(pool datasource {}))
([datasource opts]
(Pool.
(atom {:taken #{}
:idle []})
datasource
(merge
{:max-conn 4}
opts))))
"Simple connection pool.
Accepts javax.sql.DataSource, returns javax.sql.DataSource implementation
that creates java.sql.Connection on demand, up to :max-conn, and keeps up
to :max-idle-conn when no demand.
Implements AutoCloseable, which closes all pooled connections."
(^DataSource [datasource]
(pool datasource {}))
(^DataSource [datasource opts]
{:pre [(instance? DataSource datasource)]}
(let [lock (ReentrantLock.)]
(Pool.
(atom {:taken #{}
:idle []})
lock
(.newCondition lock)
datasource
(merge
{:max-idle-conn 4
:max-conn 10}
opts)))))
3 changes: 2 additions & 1 deletion test/datascript/storage/sql/test_main.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
[datascript.storage.sql.test-h2]
[datascript.storage.sql.test-mysql]
[datascript.storage.sql.test-postgresql]
[datascript.storage.sql.test-sqlite]))
[datascript.storage.sql.test-sqlite]
[datascript.storage.sql.test-pool]))

(defn -main [& args]
(t/run-all-tests #"datascript\.storage\.sql\..*"))
Expand Down
60 changes: 60 additions & 0 deletions test/datascript/storage/sql/test_pool.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
(ns datascript.storage.sql.test-pool
(:require
[clojure.test :as t :refer [deftest is are testing]]
[datascript.core :as d]
[datascript.storage.sql.core :as storage-sql]
[datascript.storage.sql.test-core :as test-core])
(:import
[java.nio.file Files Path]
[javax.sql DataSource]
[java.util.concurrent Executors Future]
[org.sqlite SQLiteDataSource]))

(deftest test-pool []
(Files/deleteIfExists (Path/of "target/db.sqlite" (make-array String 0)))
(with-open [datasource (storage-sql/pool
(doto (SQLiteDataSource.)
(.setUrl "jdbc:sqlite:target/db.sqlite"))
{:max-conn 10
:max-idle-conn 4})
thread-pool (Executors/newFixedThreadPool 100)]
(let [*stats (atom {:min-idle Long/MAX_VALUE
:max-idle 0
:min-taken Long/MAX_VALUE
:max-taken 0})
_ (add-watch (:*atom datasource) ::stats
(fn [_ _ _ new]
(swap! *stats
#(-> %
(update :min-idle min (count (:idle new)))
(update :max-idle max (count (:idle new)))
(update :min-taken min (count (:taken new)))
(update :max-taken max (count (:taken new)))))))
_ (with-open [conn (.getConnection datasource)]
(with-open [stmt (.createStatement conn)]
(.execute stmt "create table T (id INTEGER primary key)"))
(with-open [stmt (.prepareStatement conn "insert into T (id) values (?)")]
(dotimes [i 1000]
(.setLong stmt 1 i)
(.addBatch stmt))
(.executeBatch stmt)))
select (fn [i]
(with-open [conn (.getConnection datasource)
stmt (doto (.prepareStatement conn "select id from T where id = ?")
(.setLong 1 i))
rs (.executeQuery stmt)]
(.next rs)
(.getLong rs 1)))
tasks (mapv #(fn [] (select %)) (range 1000))
futures (.invokeAll thread-pool tasks)]
(is (= (range 1000) (map #(.get ^Future %) futures)))
(is (= 4 (count (:idle @(:*atom datasource)))))
(is (= 0 (count (:taken @(:*atom datasource)))))
(is (= 0 (:min-idle @*stats)))
(is (= 4 (:max-idle @*stats)))
(is (= 0 (:min-taken @*stats)))
(is (= 10 (:max-taken @*stats))))))

(comment
(t/run-test-var #'test-sqlite)
(t/run-tests *ns*))

0 comments on commit 6376308

Please sign in to comment.