From 0e6024bf2b91ee1805e38993f614b832f15edf1c Mon Sep 17 00:00:00 2001 From: Chris Nuernberger Date: Thu, 21 Nov 2024 17:49:00 -0700 Subject: [PATCH] Release 2.019 --- CHANGELOG.md | 4 +- deps.edn | 2 +- docs/Reductions.html | 2 +- docs/ham-fisted.api.html | 2 +- docs/ham-fisted.function.html | 2 +- docs/ham-fisted.hlet.html | 2 +- docs/ham-fisted.lazy-noncaching.html | 2 +- docs/ham-fisted.mut-map.html | 2 +- docs/ham-fisted.primitive-invoke.html | 2 +- docs/ham-fisted.protocols.html | 2 +- docs/ham-fisted.reduce.html | 58 +++++++++++++-------------- docs/ham-fisted.set.html | 2 +- docs/index.html | 4 +- java/ham_fisted/ParallelOptions.java | 5 ++- src/ham_fisted/impl.clj | 8 +++- src/ham_fisted/reduce.clj | 11 +++-- test/ham_fisted/parallel_test.clj | 4 ++ 17 files changed, 65 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e2cc54..9cca2a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ - +# 2.019 + * impl/pmap really does support user-defined thread pool. + # 2.018 * Add clj-kondo exports and config, fix linting errors * Remove support for and call to `take-last` 1-arity, which was not valid. diff --git a/deps.edn b/deps.edn index 971b9f9..d5d20d6 100644 --- a/deps.edn +++ b/deps.edn @@ -44,7 +44,7 @@ :exec-fn codox.main/-main :exec-args {:group-id "com.cnuernber" :artifact-id "ham-fisted" - :version "2.018" + :version "2.019" :name "Ham-Fisted" :description "High Performance Clojure Primitives" :metadata {:doc/format :markdown} diff --git a/docs/Reductions.html b/docs/Reductions.html index 0e1b3e1..bd8605a 100644 --- a/docs/Reductions.html +++ b/docs/Reductions.html @@ -4,7 +4,7 @@ function gtag(){dataLayer.push(arguments);} gtag('js', new Date()); - gtag('config', 'G-XJYNJF48RM');

Reductions

+ gtag('config', 'G-XJYNJF48RM');

Reductions

The ham-fisted project extends the concept of Clojure's reduce in a few ways, taking influence from java streams and Clojure transducers. The most important way is a formal definition of a parallel reduction (analogous to pmap for map).

diff --git a/docs/ham-fisted.api.html b/docs/ham-fisted.api.html index 7834c7e..f07ee7c 100644 --- a/docs/ham-fisted.api.html +++ b/docs/ham-fisted.api.html @@ -4,7 +4,7 @@ function gtag(){dataLayer.push(arguments);} gtag('js', new Date()); - gtag('config', 'G-XJYNJF48RM');

ham-fisted.api

Fast mutable and immutable associative data structures based on bitmap trie + gtag('config', 'G-XJYNJF48RM');

ham-fisted.api

Fast mutable and immutable associative data structures based on bitmap trie hashmaps. Mutable pathways implement the java.util.Map or Set interfaces including in-place update features such as compute or computeIfPresent.

Mutable maps or sets can be turned into their immutable counterparts via the diff --git a/docs/ham-fisted.function.html b/docs/ham-fisted.function.html index 8047b1d..7f79bce 100644 --- a/docs/ham-fisted.function.html +++ b/docs/ham-fisted.function.html @@ -4,7 +4,7 @@ function gtag(){dataLayer.push(arguments);} gtag('js', new Date()); - gtag('config', 'G-XJYNJF48RM');

ham-fisted.function

Helpers for working with java.util.function + gtag('config', 'G-XJYNJF48RM');

ham-fisted.function

Helpers for working with java.util.function package objects.

->bi-function

(->bi-function cljfn)

Convert an object to a java.util.BiFunction. Object can either already be a bi-function or an IFn to be invoked with 2 arguments.

diff --git a/docs/ham-fisted.hlet.html b/docs/ham-fisted.hlet.html index 6984533..8db27cf 100644 --- a/docs/ham-fisted.hlet.html +++ b/docs/ham-fisted.hlet.html @@ -4,7 +4,7 @@ function gtag(){dataLayer.push(arguments);} gtag('js', new Date()); - gtag('config', 'G-XJYNJF48RM');

ham-fisted.hlet

Extensible let to allow efficient typed destructuring.

+ gtag('config', 'G-XJYNJF48RM');

ham-fisted.hlet

Extensible let to allow efficient typed destructuring.

Registered Extensions:

dbls and lngs will most efficiently destructure java primitive arrays and fall back to casting the result of clojure.lang.RT/nth if input is not a double or long array.

diff --git a/docs/ham-fisted.lazy-noncaching.html b/docs/ham-fisted.lazy-noncaching.html index 4749c20..dc03583 100644 --- a/docs/ham-fisted.lazy-noncaching.html +++ b/docs/ham-fisted.lazy-noncaching.html @@ -4,7 +4,7 @@ function gtag(){dataLayer.push(arguments);} gtag('js', new Date()); - gtag('config', 'G-XJYNJF48RM');

ham-fisted.lazy-noncaching

Lazy, noncaching implementation of many clojure.core functions. There are several benefits of carefully + gtag('config', 'G-XJYNJF48RM');

ham-fisted.lazy-noncaching

Lazy, noncaching implementation of many clojure.core functions. There are several benefits of carefully constructed lazy noncaching versions:

  1. No locking - better multithreading/green thread performance.
  2. diff --git a/docs/ham-fisted.mut-map.html b/docs/ham-fisted.mut-map.html index c7bd864..36d5dc3 100644 --- a/docs/ham-fisted.mut-map.html +++ b/docs/ham-fisted.mut-map.html @@ -4,7 +4,7 @@ function gtag(){dataLayer.push(arguments);} gtag('js', new Date()); - gtag('config', 'G-XJYNJF48RM');

    ham-fisted.mut-map

    Functions for working with java's mutable map interface

    + gtag('config', 'G-XJYNJF48RM');

    ham-fisted.mut-map

    Functions for working with java's mutable map interface

    compute!

    (compute! m k bfn)

    Compute a new value in a map derived from an existing value. bfn gets passed k, v where k may be nil. If the function returns nil the corresponding key is removed from the map.

    See Map.compute

    diff --git a/docs/ham-fisted.primitive-invoke.html b/docs/ham-fisted.primitive-invoke.html index 39af799..ae34dbe 100644 --- a/docs/ham-fisted.primitive-invoke.html +++ b/docs/ham-fisted.primitive-invoke.html @@ -4,7 +4,7 @@ function gtag(){dataLayer.push(arguments);} gtag('js', new Date()); - gtag('config', 'G-XJYNJF48RM');

    ham-fisted.primitive-invoke

    For statically traced calls the Clojure compiler calls the primitive version of type-hinted functions + gtag('config', 'G-XJYNJF48RM');

    ham-fisted.primitive-invoke

    For statically traced calls the Clojure compiler calls the primitive version of type-hinted functions and this makes quite a difference in tight loops. Often times, however, functions are passed by values or returned from if-statements and then you need to explicitly call the primitive overload - this makes that pathway less verbose. Functions must first be check-casted to their primitive types and then diff --git a/docs/ham-fisted.protocols.html b/docs/ham-fisted.protocols.html index ce60fbc..757af63 100644 --- a/docs/ham-fisted.protocols.html +++ b/docs/ham-fisted.protocols.html @@ -4,7 +4,7 @@ function gtag(){dataLayer.push(arguments);} gtag('js', new Date()); - gtag('config', 'G-XJYNJF48RM');

    ham-fisted.protocols

    BitSet

    protocol

    Protocol for efficiently dealing with bitsets

    + gtag('config', 'G-XJYNJF48RM');

    ham-fisted.protocols

    BitSet

    protocol

    Protocol for efficiently dealing with bitsets

    members

    bitset?

    (bitset? item)

    contains-range?

    (contains-range? item sidx eidx)

    intersects-range?

    (intersects-range? item sidx eidx)

    max-set-value

    (max-set-value item)

    min-set-value

    (min-set-value item)

    BulkSetOps

    protocol

    members

    reduce-intersection

    (reduce-intersection l data)

    reduce-union

    (reduce-union l data)

    Finalize

    protocol

    Generic protocol for things that finalize results of reductions. Defaults to deref of instance of IDeref or identity.

    members

    finalize

    (finalize this val)

    PAdd

    protocol

    Define a function to mutably add items to a collection. This function must return diff --git a/docs/ham-fisted.reduce.html b/docs/ham-fisted.reduce.html index df26e5a..62b3d30 100644 --- a/docs/ham-fisted.reduce.html +++ b/docs/ham-fisted.reduce.html @@ -4,11 +4,11 @@ function gtag(){dataLayer.push(arguments);} gtag('js', new Date()); - gtag('config', 'G-XJYNJF48RM');

    ham-fisted.reduce

    Protocol-based parallel reduction architecture and helper functions.

    + gtag('config', 'G-XJYNJF48RM');

    ham-fisted.reduce

    Protocol-based parallel reduction architecture and helper functions.

    ->consumer

    (->consumer cfn)

    Return an instance of a consumer, double consumer, or long consumer.

    -

    bind-double-consumer-reducer!

    (bind-double-consumer-reducer! cls-type ctor)(bind-double-consumer-reducer! ctor)

    Bind a classtype as a double consumer parallel reducer - the consumer must implement +

    bind-double-consumer-reducer!

    (bind-double-consumer-reducer! cls-type ctor)(bind-double-consumer-reducer! ctor)

    Bind a classtype as a double consumer parallel reducer - the consumer must implement DoubleConsumer, ham_fisted.Reducible, and IDeref.

    -

    compose-reducers

    (compose-reducers reducers)(compose-reducers options reducers)

    Given a map or sequence of reducers return a new reducer that produces a map or +

    compose-reducers

    (compose-reducers reducers)(compose-reducers options reducers)

    Given a map or sequence of reducers return a new reducer that produces a map or vector of results.

    If data is a sequence then context is guaranteed to be an object array.

    Options:

    @@ -17,19 +17,19 @@ should all be uniform as accepting longs, doubles, or generically objects. Defaults to nil. -

    consume!

    (consume! consumer coll)

    Consumer a collection. This is simply a reduction where the return value +

    consume!

    (consume! consumer coll)

    Consumer a collection. This is simply a reduction where the return value is ignored.

    Returns the consumer.

    -

    consumer-accumulator

    Generic reduction function using a consumer

    -

    consumer-preducer

    (consumer-preducer constructor)

    Bind a consumer as a parallel reducer.

    +

    consumer-accumulator

    Generic reduction function using a consumer

    +

    consumer-preducer

    (consumer-preducer constructor)

    Bind a consumer as a parallel reducer.

    Consumer must implement java.util.function.Consumer, ham_fisted.Reducible and clojure.lang.IDeref.

    Returns instance of type bound.

    See documentation for declare-double-consumer-preducer!.

    -

    consumer-reducer

    (consumer-reducer ctor)

    Make a parallel double consumer reducer given a function that takes no arguments and is +

    consumer-reducer

    (consumer-reducer ctor)

    Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref

    -

    double-accumulator

    macro

    (double-accumulator accvar varvar & code)

    Type-hinted double reduction accumulator. +

    double-accumulator

    macro

    (double-accumulator accvar varvar & code)

    Type-hinted double reduction accumulator. consumer:

      ham-fisted.api> (reduce (double-accumulator acc v (+ (double acc) v))
                                  0.0
    @@ -38,7 +38,7 @@
     ham-fisted.api> @*1
     499500.0
     
    -

    double-consumer-accumulator

    Converts from a double consumer to a double reduction accumulator that returns the +

    double-consumer-accumulator

    Converts from a double consumer to a double reduction accumulator that returns the consumer:

    ham-fisted.api> (reduce double-consumer-accumulator
                                  (Sum$SimpleSum.)
    @@ -47,7 +47,7 @@
     ham-fisted.api> @*1
     499500.0
     
    -

    double-consumer-preducer

    (double-consumer-preducer constructor)

    Return a preducer for a double consumer.

    +

    double-consumer-preducer

    (double-consumer-preducer constructor)

    Return a preducer for a double consumer.

    Consumer must implement java.util.function.DoubleConsumer, ham_fisted.Reducible and clojure.lang.IDeref.

    user> (require '[ham-fisted.api :as hamf])
    @@ -74,9 +74,9 @@
       user> (hamf/preduce-reducer (double-consumer-preducer #(MeanR. 0 0)) (hamf/range 200000))
     99999.5
     
    -

    double-consumer-reducer

    (double-consumer-reducer ctor)

    Make a parallel double consumer reducer given a function that takes no arguments and is +

    double-consumer-reducer

    (double-consumer-reducer ctor)

    Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref

    -

    immut-map-kv

    (immut-map-kv keyfn valfn data)(immut-map-kv ks vs)

    indexed-accum

    macro

    (indexed-accum accvar idxvar varvar & code)

    Create an indexed accumulator that recieves and additional long index +

    immut-map-kv

    (immut-map-kv keyfn valfn data)(immut-map-kv ks vs)

    indexed-accum

    macro

    (indexed-accum accvar idxvar varvar & code)

    Create an indexed accumulator that recieves and additional long index during a reduction:

    ham-fisted.api> (reduce (indexed-accum
                              acc idx v (conj acc [idx v]))
    @@ -84,7 +84,7 @@
                             (range 5))
     [[0 0] [1 1] [2 2] [3 3] [4 4]]
     
    -

    indexed-double-accum

    macro

    (indexed-double-accum accvar idxvar varvar & code)

    Create an indexed double accumulator that recieves and additional long index +

    indexed-double-accum

    macro

    (indexed-double-accum accvar idxvar varvar & code)

    Create an indexed double accumulator that recieves and additional long index during a reduction:

    ham-fisted.api> (reduce (indexed-double-accum
                              acc idx v (conj acc [idx v]))
    @@ -92,7 +92,7 @@
                             (range 5))
     [[0 0.0] [1 1.0] [2 2.0] [3 3.0] [4 4.0]]
     
    -

    indexed-long-accum

    macro

    (indexed-long-accum accvar idxvar varvar & code)

    Create an indexed long accumulator that recieves and additional long index +

    indexed-long-accum

    macro

    (indexed-long-accum accvar idxvar varvar & code)

    Create an indexed long accumulator that recieves and additional long index during a reduction:

    ham-fisted.api> (reduce (indexed-long-accum
                              acc idx v (conj acc [idx v]))
    @@ -100,7 +100,7 @@
                             (range 5))
     [[0 0] [1 1] [2 2] [3 3] [4 4]]
     
    -

    long-accumulator

    macro

    (long-accumulator accvar varvar & code)

    Type-hinted double reduction accumulator. +

    long-accumulator

    macro

    (long-accumulator accvar varvar & code)

    Type-hinted double reduction accumulator. consumer:

      ham-fisted.api> (reduce (double-accumulator acc v (+ (double acc) v))
                                  0.0
    @@ -109,7 +109,7 @@
     ham-fisted.api> @*1
     499500.0
     
    -

    long-consumer-accumulator

    Converts from a long consumer to a long reduction accumulator that returns the +

    long-consumer-accumulator

    Converts from a long consumer to a long reduction accumulator that returns the consumer:

    ham-fisted.api> (reduce double-consumer-accumulator
                                  (Sum$SimpleSum.)
    @@ -118,9 +118,9 @@
     ham-fisted.api> @*1
     499500.0
     
    -

    long-consumer-reducer

    (long-consumer-reducer ctor)

    Make a parallel double consumer reducer given a function that takes no arguments and is +

    long-consumer-reducer

    (long-consumer-reducer ctor)

    Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref

    -

    options->parallel-options

    (options->parallel-options options)

    Convert an options map to a parallel options object.

    +

    options->parallel-options

    (options->parallel-options options)

    Convert an options map to a parallel options object.

    Options:

    • :pool - supply the forkjoinpool to use.
    • @@ -142,7 +142,7 @@
    • :n-lookahead - How for to look ahead for pmap and upmap to add new jobs to the queue. Defaults to `(* 2 parallelism).
    -

    parallel-reducer

    (parallel-reducer init-fn rfn merge-fn fin-fn)(parallel-reducer init-fn rfn merge-fn)

    Implement a parallel reducer by explicitly passing in the various required functions.

    +

    parallel-reducer

    (parallel-reducer init-fn rfn merge-fn fin-fn)(parallel-reducer init-fn rfn merge-fn)

    Implement a parallel reducer by explicitly passing in the various required functions.

    • 'init-fn' - Takes no argumenst and returns a new accumulation target.
    • 'rfn' - clojure rf function - takes two arguments, the accumulation target and a new value @@ -159,7 +159,7 @@ (lznc/map (fn ^long [^long v] (rem v 13)) (hamf/range 1000000))) [0 1 2 3 4 5 6 7 8 9 10 11 12] -

    preduce

    (preduce init-val-fn rfn merge-fn coll)(preduce init-val-fn rfn merge-fn options coll)

    Parallelized reduction. Currently coll must either be random access or a lznc map/filter +

    preduce

    (preduce init-val-fn rfn merge-fn coll)(preduce init-val-fn rfn merge-fn options coll)

    Parallelized reduction. Currently coll must either be random access or a lznc map/filter chain based on one or more random access entities, hashmaps and sets from this library or any java.util set, hashmap or concurrent versions of these. If input cannot be parallelized this lowers to a normal serial reduction.

    @@ -196,7 +196,7 @@ the noncaching aspect -- repeatedly evaluating this result may kick off the parallelized reduction multiple times. To ensure caching if unsure call seq on the result. -

    preduce-reducer

    (preduce-reducer reducer options coll)(preduce-reducer reducer coll)

    Given an instance of ham-fisted.protocols/ParallelReducer, perform a parallel +

    preduce-reducer

    (preduce-reducer reducer options coll)(preduce-reducer reducer coll)

    Given an instance of ham-fisted.protocols/ParallelReducer, perform a parallel reduction.

    In the case where the result is requested unmerged then finalize will be called on each result in a lazy noncaching way. In this case you can use a @@ -211,32 +211,32 @@

    • :skip-finalize? - when true, the reducer's finalize method is not called on the result.
    -

    preduce-reducers

    (preduce-reducers reducers options coll)(preduce-reducers reducers coll)

    Given a map or sequence of ham-fisted.protocols/ParallelReducer, produce a map or +

    preduce-reducers

    (preduce-reducers reducers options coll)(preduce-reducers reducers coll)

    Given a map or sequence of ham-fisted.protocols/ParallelReducer, produce a map or sequence of reduced values. Reduces over input coll once in parallel if coll is large enough. See options for ham-fisted.reduce/preduce.

    ham-fisted.api> (preduce-reducers {:sum (Sum.) :mult *} (range 20))
     {:mult 0, :sum #<Sum@5082c3b7: {:sum 190.0, :n-elems 20}>}
     
    -

    reduce-reducer

    (reduce-reducer reducer coll)

    Serially reduce a reducer.

    +

    reduce-reducer

    (reduce-reducer reducer coll)

    Serially reduce a reducer.

    ham-fisted.api> (reduce-reducer (Sum.) (range 1000))
     #<Sum@afbedb: {:sum 499500.0, :n-elems 1000}>
     
    -

    reduce-reducers

    (reduce-reducers reducers coll)

    Serially reduce a map or sequence of reducers into a map or sequence of results.

    +

    reduce-reducers

    (reduce-reducers reducers coll)

    Serially reduce a map or sequence of reducers into a map or sequence of results.

    ham-fisted.api> (reduce-reducers {:a (Sum.) :b *} (range 1 21))
     {:b 2432902008176640000, :a #<Sum@6bcebeb1: {:sum 210.0, :n-elems 20}>}
     
    -

    reducer->completef

    (reducer->completef reducer)

    Return fold-compatible pair of reducef, completef given a parallel reducer. +

    reducer->completef

    (reducer->completef reducer)

    Return fold-compatible pair of reducef, completef given a parallel reducer. Note that folded reducers are not finalized as of this time:

    ham-fisted.api> (def data (vec (range 200000)))
     #'ham-fisted.api/data
     ham-fisted.api> (r/fold (reducer->completef (Sum.)) (reducer->rfn (Sum.)) data)
     #<Sum@858c206: {:sum 1.99999E10, :n-elems 200000}>
     
    -

    reducer->rf

    (reducer->rf reducer)

    Given a reducer, return a transduce-compatible rf -

    +

    reducer->rf

    (reducer->rf reducer)

    Given a reducer, return a transduce-compatible rf -

    ham-fisted.api> (transduce (clojure.core/map #(+ % 2)) (reducer->rf (Sum.)) (range 200))
     {:sum 20300.0, :n-elems 200}
     
    -

    reducer-with-finalize

    (reducer-with-finalize reducer fin-fn)

    reducer-xform->reducer

    (reducer-xform->reducer reducer xform)

    Given a reducer and a transducer xform produce a new reducer which will apply +

    reducer-with-finalize

    (reducer-with-finalize reducer fin-fn)

    reducer-xform->reducer

    (reducer-xform->reducer reducer xform)

    Given a reducer and a transducer xform produce a new reducer which will apply the transducer pipeline before is reduction function.

    ham-fisted.api> (reduce-reducer (reducer-xform->reducer (Sum.) (clojure.core/filter even?))
                                     (range 1000))
    @@ -244,6 +244,6 @@
     

    !! - If you use a stateful transducer here then you must not use the reducer in a parallelized reduction.

    -

    reducible-merge

    Parallel reduction merge function that expects both sides to be an instances of +

    reducible-merge

    Parallel reduction merge function that expects both sides to be an instances of Reducible

    -
    \ No newline at end of file +
    \ No newline at end of file diff --git a/docs/ham-fisted.set.html b/docs/ham-fisted.set.html index 31031c2..bd9ccf2 100644 --- a/docs/ham-fisted.set.html +++ b/docs/ham-fisted.set.html @@ -4,7 +4,7 @@ function gtag(){dataLayer.push(arguments);} gtag('js', new Date()); - gtag('config', 'G-XJYNJF48RM');

    ham-fisted.set

    ->integer-random-access

    (->integer-random-access s)

    Given a set (or bitset), return a efficient, sorted random access structure. This assumes + gtag('config', 'G-XJYNJF48RM');

    ham-fisted.set

    ->integer-random-access

    (->integer-random-access s)

    Given a set (or bitset), return a efficient, sorted random access structure. This assumes the set contains integers.

    bitset

    (bitset)(bitset data)(bitset start end)

    Create a java.util.Bitset. The two argument version assumes you are passing in the start, end of a monotonically incrementing range.

    diff --git a/docs/index.html b/docs/index.html index f266329..7d3e088 100644 --- a/docs/index.html +++ b/docs/index.html @@ -1,10 +1,10 @@ -Ham-Fisted 2.018

    Ham-Fisted 2.018

    High Performance Clojure Primitives.

    Topics

    Namespaces

    ham-fisted.api

    Fast mutable and immutable associative data structures based on bitmap trie + gtag('config', 'G-XJYNJF48RM');

    Ham-Fisted 2.019

    High Performance Clojure Primitives.

    Topics

    Namespaces

    ham-fisted.api

    Fast mutable and immutable associative data structures based on bitmap trie hashmaps. Mutable pathways implement the java.util.Map or Set interfaces including in-place update features such as compute or computeIfPresent.

    Public variables and functions:

    ham-fisted.function

    Helpers for working with java.util.function diff --git a/java/ham_fisted/ParallelOptions.java b/java/ham_fisted/ParallelOptions.java index abacf72..9bbc1e7 100644 --- a/java/ham_fisted/ParallelOptions.java +++ b/java/ham_fisted/ParallelOptions.java @@ -1,6 +1,7 @@ package ham_fisted; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; @@ -8,7 +9,7 @@ public class ParallelOptions { public final long minN; public final int maxBatchSize; public final boolean ordered; - public final ForkJoinPool pool; + public final ExecutorService pool; public final int parallelism; public final CatParallelism catParallelism; public final int putTimeoutMs; @@ -26,7 +27,7 @@ public enum CatParallelism { } public ParallelOptions(long _minN, int batchSize, boolean _ordered, - ForkJoinPool _pool, int _parallelism, + ExecutorService _pool, int _parallelism, CatParallelism _catParallelism, int _putTimeoutMs, boolean unmergedResult, int nLookahead) { minN = _minN; diff --git a/src/ham_fisted/impl.clj b/src/ham_fisted/impl.clj index 85ebb88..c1af2bc 100644 --- a/src/ham_fisted/impl.clj +++ b/src/ham_fisted/impl.clj @@ -195,7 +195,6 @@ (if (in-fork-join-task?) (apply map map-fn sequences) (let [pool (.-pool options) - parallelism (.-parallelism options) ;;In this case we want a caching sequence - so we call 'seq' on a lazy noncaching ;;map queue (options->queue options) @@ -413,6 +412,11 @@ (defn- fjjoin [task] (.join ^ForkJoinTask task)) (defn- fjtask [^Callable f] (ForkJoinTask/adapt f)) +(defn- ensure-fj-pool + ^ForkJoinPool [p] + (when-not (instance? ForkJoinPool p) + (throw (RuntimeException. "Pool passed in must be a forkjoin pool"))) + p) (extend-protocol protocols/ParallelReduction Object @@ -434,7 +438,7 @@ PersistentHashMap (preduce [coll init-val-fn rfn merge-fn options] (let [options ^ParallelOptions options - pool (.-pool options) + pool (ensure-fj-pool (.-pool options)) n (.-minN options) _ (when (.-unmergedResult options) (throw (RuntimeException. "Persistent hash maps do not support unmerged results"))) diff --git a/src/ham_fisted/reduce.clj b/src/ham_fisted/reduce.clj index c8da757..0308db4 100644 --- a/src/ham_fisted/reduce.clj +++ b/src/ham_fisted/reduce.clj @@ -10,9 +10,11 @@ [clojure.lang IFn$DO IFn$LO IFn$OLO IFn$DDD IFn$LLL] [java.util Map] [java.util.function DoubleConsumer LongConsumer Consumer] - [java.util.concurrent ForkJoinPool]) + [java.util.concurrent Executor ForkJoinPool]) (:refer-clojure :exclude [map])) +(set! *warn-on-reflection* true) + (defn- unpack-reduced [item] @@ -52,12 +54,15 @@ (ParallelOptions.) :else (let [^Map options (or options {}) - ^ForkJoinPool pool (.getOrDefault options :pool (ForkJoinPool/commonPool))] + ^Executor pool (.getOrDefault options :pool (ForkJoinPool/commonPool))] (ParallelOptions. (.getOrDefault options :min-n 1000) (.getOrDefault options :max-batch-size 64000) (boolean (.getOrDefault options :ordered? true)) pool - (.getOrDefault options :parallelism (.getParallelism pool)) + (.getOrDefault options :parallelism + (if (instance? ForkJoinPool pool) + (.getParallelism ^ForkJoinPool pool) + 0)) (case (.getOrDefault options :cat-parallelism :seq-wise) :seq-wise ParallelOptions$CatParallelism/SEQWISE :elem-wise ParallelOptions$CatParallelism/ELEMWISE) diff --git a/test/ham_fisted/parallel_test.clj b/test/ham_fisted/parallel_test.clj index f37bd4d..22d6409 100644 --- a/test/ham_fisted/parallel_test.clj +++ b/test/ham_fisted/parallel_test.clj @@ -44,3 +44,7 @@ (is (every? nil? (api/upgroups 10000 (fn [^long sidx ^long eidx] nil))))) + +(deftest custom-pmap-pool + (let [p (java.util.concurrent.Executors/newCachedThreadPool)] + (is (= 5050.0 (api/sum (api/pmap-opts {:n-lookahead 2 :pool p :min-n 0} #(do #_(println (.getName (Thread/currentThread))) (+ % 1)) (range 100)))))))