Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare for ChunkSplitters 3.0 #119

Merged
merged 16 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
OhMyThreads.jl Changelog
=========================

Version 0.7.0
-------------
- ![BREAKING][badge-breaking] We now use ChunkSplitters version 3.0. The function `OhMyThreads.chunks` has been renamed to `OhMyThreads.index_chunks`. The new functions `index_chunks` and `chunks` (different from the old one with the same name!) are now exported. See ChunkSplitters.jl for more information.
- ![BREAKING][badge-breaking] If you provide a `chunks` or `index_chunks` as input we now disable the internal chunking without a warning. Previously, we did show a warning unless you had set `chunking=false`. In contrast, we now throw an error when you set any incompatible chunking related keyword arguments.
- ![Deprecation][badge-deprecation] The `split` options `:batch` and `:scatter` are now deprecated (they still work but will be dropped at some point). Use `:consecutive` and `:roundrobin`, respectively, instead.
- ![Enhancement][badge-enhancement] The `split` keyword argument can now also be a `<: OhMyThreads.Split`. Compared to providing a `Symbol`, the former can potentially give better performance. For example, you can replace `:consecutive` by `OhMyThreads.Consecutive()` and `:roundrobin` by `OhMyThreads.RoundRobin()`.

Version 0.6.2
-------------
- ![Enhancement][badge-enhancement] Added API support for `enumerate(chunks(...))`. Best used in combination with `chunking=false`.
- ![Enhancement][badge-enhancement] Added API support for `enumerate(chunks(...))`. Best used in combination with `chunking=false`

Version 0.6.1
-------------
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ TaskLocalValues = "ed4db957-447d-4319-bfb6-7fa9ae7ecf34"
[compat]
Aqua = "0.8"
BangBang = "0.3.40, 0.4"
ChunkSplitters = "2.4"
ChunkSplitters = "3"
StableTasks = "0.1.5"
TaskLocalValues = "0.1"
Test = "1"
Expand Down
8 changes: 4 additions & 4 deletions docs/src/literate/falsesharing/falsesharing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ data = rand(1_000_000 * nthreads());
#
# A common, manual implementation of this idea might look like this:

using OhMyThreads: @spawn, chunks
using OhMyThreads: @spawn, index_chunks

function parallel_sum_falsesharing(data; nchunks = nthreads())
psums = zeros(eltype(data), nchunks)
@sync for (c, idcs) in enumerate(chunks(data; n = nchunks))
@sync for (c, idcs) in enumerate(index_chunks(data; n = nchunks))
@spawn begin
for i in idcs
psums[c] += data[i]
Expand Down Expand Up @@ -102,7 +102,7 @@ nthreads()

function parallel_sum_tasklocal(data; nchunks = nthreads())
psums = zeros(eltype(data), nchunks)
@sync for (c, idcs) in enumerate(chunks(data; n = nchunks))
@sync for (c, idcs) in enumerate(index_chunks(data; n = nchunks))
@spawn begin
local s = zero(eltype(data))
for i in idcs
Expand Down Expand Up @@ -131,7 +131,7 @@ end
# using `map` and reusing the built-in (sequential) `sum` function on each parallel task:

function parallel_sum_map(data; nchunks = nthreads())
ts = map(chunks(data, n = nchunks)) do idcs
ts = map(index_chunks(data, n = nchunks)) do idcs
@spawn @views sum(data[idcs])
end
return sum(fetch.(ts))
Expand Down
8 changes: 4 additions & 4 deletions docs/src/literate/falsesharing/falsesharing.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ catastrophic numerical errors due to potential rearrangements of terms in the su
A common, manual implementation of this idea might look like this:

````julia
using OhMyThreads: @spawn, chunks
using OhMyThreads: @spawn, index_chunks

function parallel_sum_falsesharing(data; nchunks = nthreads())
psums = zeros(eltype(data), nchunks)
@sync for (c, idcs) in enumerate(chunks(data; n = nchunks))
@sync for (c, idcs) in enumerate(index_chunks(data; n = nchunks))
@spawn begin
for i in idcs
psums[c] += data[i]
Expand Down Expand Up @@ -132,7 +132,7 @@ into `psums` (once!).
````julia
function parallel_sum_tasklocal(data; nchunks = nthreads())
psums = zeros(eltype(data), nchunks)
@sync for (c, idcs) in enumerate(chunks(data; n = nchunks))
@sync for (c, idcs) in enumerate(index_chunks(data; n = nchunks))
@spawn begin
local s = zero(eltype(data))
for i in idcs
Expand Down Expand Up @@ -168,7 +168,7 @@ using `map` and reusing the built-in (sequential) `sum` function on each paralle

````julia
function parallel_sum_map(data; nchunks = nthreads())
ts = map(chunks(data, n = nchunks)) do idcs
ts = map(index_chunks(data, n = nchunks)) do idcs
@spawn @views sum(data[idcs])
end
return sum(fetch.(ts))
Expand Down
8 changes: 4 additions & 4 deletions docs/src/literate/mc/mc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ using OhMyThreads: StaticScheduler

# ## Manual parallelization
#
# First, using the `chunks` function, we divide the iteration interval `1:N` into
# First, using the `index_chunks` function, we divide the iteration interval `1:N` into
# `nthreads()` parts. Then, we apply a regular (sequential) `map` to spawn a Julia task
# per chunk. Each task will locally and independently perform a sequential Monte Carlo
# simulation. Finally, we fetch the results and compute the average estimate for $\pi$.

using OhMyThreads: @spawn, chunks
using OhMyThreads: @spawn, index_chunks

function mc_parallel_manual(N; nchunks = nthreads())
tasks = map(chunks(1:N; n = nchunks)) do idcs
tasks = map(index_chunks(1:N; n = nchunks)) do idcs
@spawn mc(length(idcs))
end
pi = sum(fetch, tasks) / nchunks
Expand All @@ -104,7 +104,7 @@ mc_parallel_manual(N)
# `mc(length(idcs))` is faster than the implicit task-local computation within
# `tmapreduce` (which itself is a `mapreduce`).

idcs = first(chunks(1:N; n = nthreads()))
idcs = first(index_chunks(1:N; n = nthreads()))

@btime mapreduce($+, $idcs) do i
rand()^2 + rand()^2 < 1.0
Expand Down
8 changes: 4 additions & 4 deletions docs/src/literate/mc/mc.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,16 @@ using OhMyThreads: StaticScheduler

## Manual parallelization

First, using the `chunks` function, we divide the iteration interval `1:N` into
First, using the `index_chunks` function, we divide the iteration interval `1:N` into
`nthreads()` parts. Then, we apply a regular (sequential) `map` to spawn a Julia task
per chunk. Each task will locally and independently perform a sequential Monte Carlo
simulation. Finally, we fetch the results and compute the average estimate for $\pi$.

````julia
using OhMyThreads: @spawn, chunks
using OhMyThreads: @spawn, index_chunks

function mc_parallel_manual(N; nchunks = nthreads())
tasks = map(chunks(1:N; n = nchunks)) do idcs
tasks = map(index_chunks(1:N; n = nchunks)) do idcs
@spawn mc(length(idcs))
end
pi = sum(fetch, tasks) / nchunks
Expand Down Expand Up @@ -151,7 +151,7 @@ It is faster than `mc_parallel` above because the task-local computation
`tmapreduce` (which itself is a `mapreduce`).

````julia
idcs = first(chunks(1:N; n = nthreads()))
idcs = first(index_chunks(1:N; n = nthreads()))

@btime mapreduce($+, $idcs) do i
rand()^2 + rand()^2 < 1.0
Expand Down
4 changes: 2 additions & 2 deletions docs/src/literate/tls/tls.jl
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ res ≈ res_naive
# iterations (i.e. matrix pairs) for which this task is responsible.
# Before we learn how to do this more conveniently, let's implement this idea of a
# task-local temporary buffer (for each parallel task) manually.
using OhMyThreads: chunks, @spawn
using OhMyThreads: index_chunks, @spawn
using Base.Threads: nthreads

function matmulsums_manual(As, Bs)
N = size(first(As), 1)
tasks = map(chunks(As; n = 2 * nthreads())) do idcs
tasks = map(index_chunks(As; n = 2 * nthreads())) do idcs
@spawn begin
local C = Matrix{Float64}(undef, N, N)
map(idcs) do i
Expand Down
4 changes: 2 additions & 2 deletions docs/src/literate/tls/tls.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ Before we learn how to do this more conveniently, let's implement this idea of a
task-local temporary buffer (for each parallel task) manually.

````julia
using OhMyThreads: chunks, @spawn
using OhMyThreads: index_chunks, @spawn
using Base.Threads: nthreads

function matmulsums_manual(As, Bs)
N = size(first(As), 1)
tasks = map(chunks(As; n = 2 * nthreads())) do idcs
tasks = map(index_chunks(As; n = 2 * nthreads())) do idcs
@spawn begin
local C = Matrix{Float64}(undef, N, N)
map(idcs) do i
Expand Down
13 changes: 11 additions & 2 deletions docs/src/refs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,25 @@ GreedyScheduler
SerialScheduler
```

## Non-Exported
## Re-exported

| | |
|------------------------|---------------------------------------------------------------------|
| `OhMyThreads.chunks` | see [ChunkSplitters.jl](https://juliafolds2.github.io/ChunkSplitters.jl/stable/references/#ChunkSplitters.chunks) |
| `OhMyThreads.index_chunks` | see [ChunkSplitters.jl](https://juliafolds2.github.io/ChunkSplitters.jl/stable/references/#ChunkSplitters.index_chunks) |

## Public but not exported

| | |
|------------------------|---------------------------------------------------------------------|
| `OhMyThreads.@spawn` | see [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) |
| `OhMyThreads.@spawnat` | see [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) |
| `OhMyThreads.@fetch` | see [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) |
| `OhMyThreads.@fetchfrom` | see [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) |
| `OhMyThreads.chunks` | see [ChunkSplitters.jl](https://juliafolds2.github.io/ChunkSplitters.jl/dev/references/#ChunkSplitters.chunks) |
| `OhMyThreads.TaskLocalValue` | see [TaskLocalValues.jl](https://github.com/vchuravy/TaskLocalValues.jl) |
| `OhMyThreads.Split` | see [ChunkSplitters.jl](https://juliafolds2.github.io/ChunkSplitters.jl/stable/references/#ChunkSplitters.Split) |
| `OhMyThreads.Consecutive` | see [ChunkSplitters.jl](https://juliafolds2.github.io/ChunkSplitters.jl/stable/references/#ChunkSplitters.Consecutive) |
| `OhMyThreads.RoundRobin` | see [ChunkSplitters.jl](https://juliafolds2.github.io/ChunkSplitters.jl/stable/references/#ChunkSplitters.RoundRobin) |


```@docs
Expand Down
5 changes: 5 additions & 0 deletions src/OhMyThreads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ for mac in Symbol.(["@spawn", "@spawnat", "@fetch", "@fetchfrom"])
end

using ChunkSplitters: ChunkSplitters
const index_chunks = ChunkSplitters.index_chunks
const chunks = ChunkSplitters.chunks
const Split = ChunkSplitters.Split
const Consecutive = ChunkSplitters.Consecutive
const RoundRobin = ChunkSplitters.RoundRobin
export chunks, index_chunks

using TaskLocalValues: TaskLocalValues
const TaskLocalValue = TaskLocalValues.TaskLocalValue
Expand Down
Loading
Loading