Skip to content

Commit

Permalink
works with Distributed and Threads now
Browse files Browse the repository at this point in the history
  • Loading branch information
pbayer committed Dec 5, 2020
1 parent 05f0617 commit 7b59537
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 5 deletions.
112 changes: 112 additions & 0 deletions docs/src/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,115 @@ julia> @grd gd
7
8
```

## Multithreading

Even if with actors we avoid race conditions, concurrency is still challenging. Consider the following where 8 threads concurrently try to increment a guarded variable:

```julia
julia> using .Threads

julia> gd = guard(zeros(Int, 10))
Guard{Array{Int64,1}}(Link{Channel{Any}}(Channel{Any}(sz_max:32,sz_curr:0), 1, :guard))

julia> for i in 1:10
@threads for _ in 1:nthreads()
gd[i] += 1
end
end

julia> @grd gd
10-element Array{Int64,1}:
1
1
1
1
1
1
1
1
1
1
```

What has happened? `gd[i] += 1` is not a single actor call. First all 8 threads get 0 by doing `getindex(var, i)` and then they do `setindex!(var, i, 0+1)` on it. The result is 1 and not 8 as we would expect. In order to get it right we create a function:

```julia
julia> incr(arr, index, by) = arr[index] += by
incr (generic function with 1 method)

julia> gd = guard(zeros(Int, 10))
Guard{Array{Int64,1}}(Link{Channel{Any}}(Channel{Any}(sz_max:32,sz_curr:0), 1, :guard))

julia> for i in 1:10
@threads for _ in 1:nthreads()
@grd incr(gd, i, 1)
end
end

julia> @grd gd
10-element Array{Int64,1}:
8
8
8
8
8
8
8
8
8
8
```

Thus the guard actor receives `nthreads()` calls to `incr` for each `i` and it works as expected.

## Distributed

For distributed computing we can create named guards or guards with remote links. All worker processes can work with the same guarded variable:

```julia
julia> using Distributed

julia> addprocs(1);

julia> @everywhere using Guards

julia> gd = guard([1,2,3], remote=true) # a guard with a remote link
Guard{Array{Int64,1}}(Link{RemoteChannel{Channel{Any}}}(RemoteChannel{Channel{Any}}(1, 1, 13), 1, :guard))

julia> fetch(@spawnat 2 @grd gd) # show it on pid 2
3-element Array{Int64,1}:
1
2
3

julia> @fetchfrom 2 InteractiveUtils.varinfo()
name size summary
––––––––––– ––––––––––– –––––––––––––––––––––
Base Module
Core Module
Distributed 918.170 KiB Module
Main Module
gd 56 bytes Guard{Array{Int64,1}}

julia> @grd push!(gd, 4) # push! on pid 1

4-element Array{Int64,1}:
1
2
3
4

julia> @spawnat 2 @grd push!(gd, 5) # push on pid 2
Future(2, 1, 20, nothing)

julia> @grd gd # it is everywhere up to date
5-element Array{Int64,1}:
1
2
3
4
5
```

If we send local guarded variables to distributed actors or if we create distributed actors with guarded variables as arguments, their local links are automatically converted to remote ones, so they can work with them.
2 changes: 1 addition & 1 deletion src/Guards.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ include("protocol.jl")
include("api.jl")
include("interface.jl")

export guard, @grd
export guard, @grd, Guard

end
4 changes: 2 additions & 2 deletions src/api.jl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ its guarded variable var.
Actors.cast(gd::Guard, f, args...) = cast(gd.link, f, args...)

"""
update!(gd::Guard, var)
update!(gd::Guard, var)
Update a guarded variable represented by `gd` with `var`.
Expand All @@ -49,8 +49,8 @@ end

"""
```
@grd gd
@grd f(gd, args...)
@grd gd
```
Execute a function `f` on a guarded variable `var`
represented by an actor link `gd` and return a deep copy
Expand Down
12 changes: 10 additions & 2 deletions src/guard.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ id(x) = x

guardtype(::Guard{T}) where T = T

#
# return a Guard variable with a remote link
#
Actors._rlink(gd::Guard{T}) where T =
Guard{guardtype(gd)}(Actors._rlink(gd.link))

"""
```
Expand All @@ -35,20 +40,23 @@ a [`Guard`](@ref) link to it.
- `var`: variable to guard for,
- `name=nothing`: if a `name::Symbol` is provided the server
is registered and the name is returned,
- `remote=false`: if `remote=true` a guard with an
remote link is returned,
- `pid=myid()`: worker pid to create the actor on,
- `thrd=false`: thread to create the actor on,
- `sticky=false`: if `true`, the actor is created in
the same thread,
- `taskref=nothing`: if a `Ref{Task}` variable is
provided, it gets the created `Task`.
"""
function guard(var; name=nothing, pid=myid(), thrd=false,
function guard(var; name=nothing, remote=false,
pid=myid(), thrd=false,
sticky=false, taskref=nothing)
s = spawn(Bhv(id, var); mode=:guard, pid=pid,
thrd=thrd, sticky=sticky, taskref=taskref)
isnothing(name) || register(name, s)
init!(s, id, var)
return isnothing(name) ?
Guard{typeof(var)}(s) :
Guard{typeof(var)}(remote ? Actors._rlink(s) : s) :
Guard{typeof(var)}(name)
end
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ length(procs()) == 1 && addprocs(1)

@safetestset "Basics" begin include("test_basics.jl") end
@safetestset "Interface" begin include("test_interface.jl") end
@testset "Distributed" begin include("test_distr.jl") end
20 changes: 20 additions & 0 deletions test/test_distr.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# This file is part of the Guards.jl Julia package,
# MIT license, part of https://github.com/JuliaActors
#

using Guards, Test, Distributed

length(procs()) == 1 && addprocs(1)

@everywhere using Actors, Guards

@everywhere did(x) = x isa Guard ? (@grd x) : x

act1 = Actors.spawn(Bhv(did), pid=2)

gd = guard([1,2,3])

@test call(act1, gd) == [1,2,3]
@grd push!(gd, 4)
@test call(act1, gd) == [1,2,3,4]

0 comments on commit 7b59537

Please sign in to comment.