Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always keep missing in pool #65

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 53 additions & 26 deletions src/PooledArrays.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ end

mutable struct PooledArray{T, R<:Integer, N, RA} <: AbstractArray{T, N}
refs::RA
pool::Vector{T}
invpool::Dict{T,R}
pool::Vector{Union{Missing,T}}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of think we should just keep this T; and let Union{Missing, T} be provided as the T. That way if you don't have missing values, you're not penalized. It seems like to so simillar_missing, we could provide a function that just re-wraps the refs, pool, invpool, etc in a new PooledArray struct w/ Union{T, Missing}, right?

But maybe you're right that it would be too expensive to have to change pool::Vector{T} and invpool::Dict{T, R} to support missing as well in the similar_missing operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to avoid having to pay the following cost:

julia> y = Dict{Int, Int}(1:10^8 .=> 1:10^8);

julia> @time convert(Dict{Union{Int, Missing}, Int}, y);
  3.684057 seconds (253.19 k allocations: 4.514 GiB, 6.35% gc time, 2.88% compilation time)

(it would also reduce the memory burden of making this operation)

With this PR it would be avoided.
My original preference was different, i.e. to never allow Missing in the refpool and invrefpool but have 0 to be a ref that is interpreted as missing without making a lookup, but @nalimilan thought it would be overly complex. That is why I have made a draft here to discuss the design.

This would have a consequence similar to what we have in CategoricalArrays.jl:

julia> x = categorical(["a"])
1-element CategoricalArray{String,1,UInt32}:
 "a"

julia> resize!(x, 2)
2-element CategoricalArray{String,1,UInt32}:
    "a"
 #undef

julia> convert(CategoricalArray{Union{String,Missing},1,UInt32}, x)
2-element CategoricalArray{Union{Missing, String},1,UInt32}:
 "a"
 missing

(so magically #undef becomes missing which is probably undesirable)

There is no rush to make a decision here. Let us think and decide what is best.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah neither solution is simple. Maybe it would be possible to make convert faster for dicts when the target eltype is a supertype of the source eltype?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this has been an open issue in Julia Base for a long time. Who do you think is best to ask?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea. I guess we (you :-p) would have to come up with at least a draft PR to get people's attention. But this may be quite easy to do. convert calls this constructor when the origin type is the same as the destination:
https://github.com/JuliaLang/julia/blob/be6887fa9ffa3d08315c93f1551c41b44dd3d720/base/dict.jl#L92-L95
I wonder whether what's needed is just to call convert on the keys and value when the types don't match. What do you think?

Assuming that works, do you think it would be an acceptable replacement for this PR or would it still not be sufficient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then we loose COW benefits. However, since we are not fully convinced what is best I would park this PR for now, so that @quinnj can finish CSV.jl update without this functionality (which is more pressing). We can go back to this PR and decide what to do later (especially that if we change the internals probably 2.0 release would be needed).

And maybe in the mean time what @nalimilan proposes to do in Julia Base would be implemented (I cannot do it as I do not know how to do it unfortunately as I know too little about internal design related to JuliaLang/julia#26681)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid the best person to implement no-copy convert for Array{Union{T, Missing}} is also @quinnj! :-D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@quinnj - if you do not have time for this (which I assume is the case) - can you please point me to the part of the sources that would have to be changed? (I assume it is somewhere in the C codes)
Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the relevant code is in https://github.com/JuliaLang/julia/blob/master/src/array.c. In the jl_array_typetagdata function, it returns the "selector bytes" for an isbits-union array. This is where we lengthen the requested array size to account for selectory bytes. We also have to ensure the selector bytes are always initialized to zero, since otherwise you could get selector bytes pointing to something not just garbage, but totally wrong.

So the operation we'd need is convert(::Type{Vector{Union{T, S}}}, x::Vector{T}) where {T, S}, right? And we're ok if the two arrays potentially share data? So it seems we would add a check on the julia-side for if T is isbitstype and S is isbitstype, and if so, call a new array.c routine.

This is where it gets a little hairy though, because we have array_resize_buffer, but that takes an existing array to resize it. We'd need to use some of the same logic in that routine and make a new jl_convert_to_isbitsunion method that:

  • asserts the input array eltype is isbitstype
  • asserts new desired Union eltype is also isbitstype
  • checks that the existing array data isn't a->flags.isshared (means we can't resize it; like an mmap buffer)
  • then probably called the same gc method as this line to resize existing array data to new size (which is just old array size + length(old_array))
  • then I think we could just call jl_ptr_to_array_1d with our realloc'd pointer? that will take care of making the new array and using the realloc'd data pointer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I was afraid it would not be that simple. What we need is only convert(::Type{Vector{Union{T, Missing}}}, x::Vector{T}) where {T} but T may be any type (also non-bits, e.g. String). So I guess it would require separate paths for T being bits type and being non-bits type.

However, this in general shows the following problem. Assume that we have x and y sharing the same data. Where eltype of x is T and of y is Union{Missing, T}. What will happen to x if we do y[1] = missing? My intuition that this operation is super problematic - right? (i.e. would lead to an undefined state of x[1])

So the conclusion is that these arrays may not share data. Therefore we have an easier task of adding an internal constructor in PooledArrays.jl for Dict type that instead of:

function Dict{K,V}(kv) where V where K
    h = Dict{K,V}()
    haslength(kv) && sizehint!(h, Int(length(kv))::Int)
    for (k,v) in kv
        h[k] = v
    end
    return h
end

would copy an internal structure of source Dict and only do promotion of eltype of keys field in the source dict.

So we should have a function like:

function widen_dict_missing(d::Dict{K,V}) where {K, V}
    @assert !(Missing <: K) # this is handled otherwise in PooledArrays.jl
    K′ = Union{K, Missing}
    return Dict{K′,V}(copy(d.slots), Vector{K′}(d.keys), copy(d.vals),
                      d.ndel, d.count, d.age, d.idxfloor, d.maxprobe)
end

and the performance would be:

julia> d = Dict(1:10^7 .=> 1:10^7);

julia> @time Dict{Union{Int,Missing}, Int}(d);
  0.282199 seconds (7 allocations: 288.001 MiB)

julia> @time Dict{Union{Int,Missing}, Int}(d);
  0.301356 seconds (7 allocations: 288.001 MiB)

julia> @time Dict{Union{Int,Missing}, Int}(d);
  0.313754 seconds (7 allocations: 288.001 MiB)

julia> @time widen_dict_missing(d);
  0.080447 seconds (7 allocations: 288.000 MiB)

julia> @time widen_dict_missing(d);
  0.078461 seconds (7 allocations: 288.000 MiB)

julia> @time widen_dict_missing(d);
  0.085039 seconds (7 allocations: 288.000 MiB)

the benefit would be ~3x speedup.

invpool::Dict{Union{Missing,T},R}
# refcount[] is 1 if only one PooledArray holds a reference to pool and invpool
refcount::Threads.Atomic{Int}

function PooledArray{T,R,N,RA}(rs::RefArray{RA}, invpool::Dict{T, R},
pool::Vector{T}=_invert(invpool),
function PooledArray{T,R,N,RA}(rs::RefArray{RA}, invpool::Dict{Union{Missing,T}, R},
pool::Vector{Union{Missing,T}}=_invert(invpool),
refcount::Threads.Atomic{Int}=Threads.Atomic{Int}(1)) where {T,R,N,RA<:AbstractArray{R, N}}
# this is a quick but incomplete consistency check
if length(pool) != length(invpool)
Expand All @@ -45,11 +45,22 @@ mutable struct PooledArray{T, R<:Integer, N, RA} <: AbstractArray{T, N}
if length(rs.a) > 0
# 0 indicates #undef
# refs mustn't overflow pool
minref, maxref = extrema(rs.a)
if (minref < 0 || maxref > length(invpool))
throw(ArgumentError("Reference array points beyond the end of the pool"))

ipl = length(invpool)
for v in rs.a
if (v < 0 || v > ipl)
throw(ArgumentError("Reference array points beyond the end of the pool"))
end
if !(T >: Missing)
if v == 1
throw(ArgumentError("Missing value in a pool that does not allow it"))
end
end
end
end
if pool[1] !== missing
throw(ArgumentError("First entry in pool must be a missing value"))
end
pa = new{T,R,N,RA}(rs.a, pool, invpool, refcount)
finalizer(x -> Threads.atomic_sub!(x.refcount, 1), pa)
return pa
Expand All @@ -76,7 +87,8 @@ const PooledArrOrSub = Union{SubArray{T, N, <:PooledArray{T, R}},
##############################################################################

# Echo inner constructor as an outer constructor
PooledArray(refs::RefArray{RA}, invpool::Dict{T,R}, pool::Vector{T}=_invert(invpool),
PooledArray{T}(refs::RefArray{RA}, invpool::Dict{Union{Missing,T},R},
pool::Vector{Union{Missing,T}}=_invert(invpool),
refcount::Threads.Atomic{Int}=Threads.Atomic{Int}(1)) where {T,R,RA<:AbstractArray{R}} =
PooledArray{T,R,ndims(RA),RA}(refs, invpool, pool, refcount)

Expand All @@ -89,9 +101,9 @@ function _our_copy(x::SubArray{<:Any, 0})
return y
end

function PooledArray(d::PooledArrOrSub)
function PooledArray(d::PooledArrOrSub{T}) where {T}
Threads.atomic_add!(refcount(d), 1)
return PooledArray(RefArray(_our_copy(DataAPI.refarray(d))),
return PooledArray{T}(RefArray(_our_copy(DataAPI.refarray(d))),
DataAPI.invrefpool(d), DataAPI.refpool(d), refcount(d))
end

Expand All @@ -100,9 +112,9 @@ function _label(xs::AbstractArray,
::Type{I}=DEFAULT_POOLED_REF_TYPE,
start = 1,
labels = Array{I}(undef, size(xs)),
invpool::Dict{T,I} = Dict{T, I}(),
pool::Vector{T} = T[],
nlabels = 0,
invpool::Dict{Union{Missing,T},I} = Dict{Union{Missing,T}, I}(missing => one(I)),
pool::Vector{Union{Missing,T}} = Union{Missing,T}[missing],
nlabels = 1,
) where {T, I<:Integer}

@inbounds for i in start:length(xs)
Expand Down Expand Up @@ -176,7 +188,7 @@ end
function PooledArray{T}(d::AbstractArray; signed::Bool=false, compress::Bool=false) where {T}
R = signed ? (compress ? Int8 : DEFAULT_SIGNED_REF_TYPE) : (compress ? UInt8 : DEFAULT_POOLED_REF_TYPE)
refs, invpool, pool = _label(d, T, R)
return PooledArray(RefArray(refs), invpool, pool)
return PooledArray{T}(RefArray(refs), invpool, pool)
end

PooledArray(d::AbstractArray{T}, r::Type) where {T} = PooledArray{T}(d, r)
Expand Down Expand Up @@ -213,7 +225,7 @@ Base.copy(pa::PooledArrOrSub) = PooledArray(pa)

# here we do not allow dest to be SubArray as copy! is intended to replace whole arrays
# slow path will be used for SubArray
function copy!(dest::PooledArray{T, R, N},
function copy!(dest::Union{PooledArray{T, R, N}, PooledArray{Union{Missing,T}, R, N}},
src::PooledArrOrSub{T, N, R}) where {T, N, R}
copy!(dest.refs, DataAPI.refarray(src))
src_refcount = refcount(src)
Expand All @@ -232,10 +244,12 @@ function copy!(dest::PooledArray{T, R, N},
end

# this is needed as Julia Base uses a special path for this case we want to avoid
Base.copyto!(dest::PooledArrOrSub{T, N, R}, src::PooledArrOrSub{T, N, R}) where {T, N, R} =
Base.copyto!(dest::Union{PooledArrOrSub{T, N, R}, PooledArrOrSub{Union{Missing,T}, N, R}},
src::PooledArrOrSub{T, N, R}) where {T, N, R} =
copyto!(dest, 1, src, 1, length(src))

function Base.copyto!(dest::PooledArrOrSub{T, N, R}, doffs::Union{Signed, Unsigned},
function Base.copyto!(dest::Union{PooledArrOrSub{T, N, R}, PooledArrOrSub{Union{Missing,T}, N, R}},
doffs::Union{Signed, Unsigned},
src::PooledArrOrSub{T, N, R}, soffs::Union{Signed, Unsigned},
n::Union{Signed, Unsigned}) where {T, N, R}
n == 0 && return dest
Expand Down Expand Up @@ -277,9 +291,9 @@ function Base.resize!(pa::PooledArray{T,R,1}, n::Integer) where {T,R}
return pa
end

function Base.reverse(x::PooledArray)
function Base.reverse(x::PooledArray{T}) where {T}
Threads.atomic_add!(x.refcount, 1)
PooledArray(RefArray(reverse(x.refs)), x.invpool, x.pool, x.refcount)
PooledArray{T}(RefArray(reverse(x.refs)), x.invpool, x.pool, x.refcount)
end

function Base.permute!!(x::PooledArray, p::AbstractVector{T}) where T<:Integer
Expand All @@ -293,7 +307,7 @@ function Base.invpermute!!(x::PooledArray, p::AbstractVector{T}) where T<:Intege
end

Base.similar(pa::PooledArray{T,R}, S::Type, dims::Dims) where {T,R} =
PooledArray(RefArray(zeros(R, dims)), Dict{S,R}())
PooledArray{S}(RefArray(zeros(R, dims)), Dict{Union{Missing,S},R}(missing=>one(R)))

Base.findall(pdv::PooledVector{Bool}) = findall(convert(Vector{Bool}, pdv))

Expand All @@ -304,6 +318,10 @@ Base.findall(pdv::PooledVector{Bool}) = findall(convert(Vector{Bool}, pdv))
##
##############################################################################

# TODO ensure proper translation:
# 1. if missing is mapped to something else - re-introduce the missing
# 2. make sure missing ends up as the first entry in the result
# 3. correctly calculate the eltype of the result (so that Missing does not affect it if it was not allowed originally)
function Base.map(f, x::PooledArray{T,R}) where {T,R<:Integer}
ks = collect(keys(x.invpool))
vs = collect(values(x.invpool))
Expand Down Expand Up @@ -392,6 +410,8 @@ Base.sort(pa::PooledArray; kw...) = pa[sortperm(pa; kw...)]
##
##############################################################################

# TODO: correctly handle types in conversions

function Base.convert(::Type{PooledArray{S,R1,N}}, pa::PooledArray{T,R2,N}) where {S,T,R1<:Integer,R2<:Integer,N}
invpool_conv = convert(Dict{S,R1}, pa.invpool)
@assert invpool_conv !== pa.invpool
Expand Down Expand Up @@ -446,27 +466,29 @@ Base.convert(::Type{Array}, pa::PooledArray{T, R, N}) where {T, R, N} = convert(

# We need separate functions due to dispatch ambiguities

Base.@propagate_inbounds function Base.getindex(A::PooledArray, I::Int)
Base.@propagate_inbounds function Base.getindex(A::PooledArray{T}, I::Int)::T where T
idx = DataAPI.refarray(A)[I]
iszero(idx) && throw(UndefRefError())
# if eltype(A) does not allow Missing then also 1 is an error
idx <= !(eltype(A) >: Missing) && throw(UndefRefError())
return @inbounds DataAPI.refpool(A)[idx]
end

# we handle fast only the case when the first index is an abstract vector
# this is to make sure other indexing synraxes use standard dispatch from Base
# the reason is that creation of DataAPI.refarray(A) is unfortunately slow
Base.@propagate_inbounds function Base.getindex(A::PooledArrOrSub,
Base.@propagate_inbounds function Base.getindex(A::PooledArrOrSub{T},
I1::AbstractVector,
I2::Union{Real, AbstractVector}...)
I2::Union{Real, AbstractVector}...) where {T}
# make sure we do not increase A.refcount in case creation of newrefs fails
newrefs = DataAPI.refarray(A)[I1, I2...]
@assert newrefs isa AbstractArray
Threads.atomic_add!(refcount(A), 1)
return PooledArray(RefArray(newrefs), DataAPI.invrefpool(A), DataAPI.refpool(A), refcount(A))
return PooledArray{T}(RefArray(newrefs), DataAPI.invrefpool(A), DataAPI.refpool(A), refcount(A))
end

Base.@propagate_inbounds function Base.isassigned(pa::PooledArrOrSub, I::Int...)
!iszero(DataAPI.refarray(pa)[I...])
# if eltype(A) does not allow Missing then also 1 is an error
!(DataAPI.refarray(pa)[I...] <= !(eltype(pa) >: Missing))
end

##############################################################################
Expand Down Expand Up @@ -511,6 +533,9 @@ end
Base.IndexStyle(::Type{<:PooledArray}) = IndexLinear()

Base.@propagate_inbounds function Base.setindex!(x::PooledArray, val, ind::Int)
if val === missing && !(eltype(x) >: Missing)
throw(ArgumentError("PooledArray element type does not allow storing missing values"))
end
x.refs[ind] = getpoolidx(x, val)
return x
end
Expand Down Expand Up @@ -548,6 +573,8 @@ Base.empty!(pv::PooledVector) = (empty!(pv.refs); pv)

Base.deleteat!(pv::PooledVector, inds) = (deleteat!(pv.refs, inds); pv)

# TODO: review vcat for possible Missing related issues

function _vcat!(c, a, b)
copyto!(c, 1, a, 1, length(a))
return copyto!(c, length(a)+1, b, 1, length(b))
Expand Down