diff --git a/docs/src/examples.md b/docs/src/examples.md index a15e5191..a290d669 100644 --- a/docs/src/examples.md +++ b/docs/src/examples.md @@ -630,8 +630,8 @@ file = CSV.File(IOBuffer(data); types=Dict(:zipcode => String)) using CSV # In this file, we have an `id` column and a `code` column. There can be advantages with various DataFrame/table operations -# like joining and grouping when `String` values are "pooled", meaning each unique value is mapped to a `UInt64`. By default, -# `pool=0.1`, so string columns with low cardinality are pooled by default. Via the `pool` keyword argument, we can provide +# like joining and grouping when `String` values are "pooled", meaning each unique value is mapped to a `UInt32`. By default, +# `pool=(0.2, 500)`, so string columns with low cardinality are pooled by default. Via the `pool` keyword argument, we can provide # greater control: `pool=0.4` means that if 40% or less of a column's values are unique, then it will be pooled. data = """ id,code @@ -666,4 +666,25 @@ category,amount file = CSV.File(IOBuffer(data); pool=Dict(1 => true)) file = CSV.File(IOBuffer(data); pool=[true, false]) +``` + +## [Pool with absolute threshold](@id pool_absolute_threshold) + +```julia +using CSV + +# In this file, we have an `id` column and a `code` column. There can be advantages with various DataFrame/table operations +# like joining and grouping when `String` values are "pooled", meaning each unique value is mapped to a `UInt32`. By default, +# `pool=(0.2, 500)`, so string columns with low cardinality are pooled by default. Via the `pool` keyword argument, we can provide +# greater control: `pool=(0.5, 2)` means that if a column has 2 or fewer unique values _and_ the total number of unique values is less than 50% of all values, then it will be pooled. +data = """ +id,code +A18E9,AT +BF392,GC +93EBC,AT +54EE1,AT +8CD2E,GC +""" + +file = CSV.File(IOBuffer(data); pool=(0.5, 2)) ``` \ No newline at end of file diff --git a/docs/src/reading.md b/docs/src/reading.md index f1a3906f..cea052d9 100644 --- a/docs/src/reading.md +++ b/docs/src/reading.md @@ -192,11 +192,12 @@ A `Dict{Type, Type}` argument that allows replacing a non-`String` standard type ## [`pool`](@id pool) -Argument that controls whether columns will be returned as `PooledArray`s. Can be provided as a `Bool`, `Float64`, vector of `Bool` or `Float64`, dict mapping column number/name to `Bool` or `Float64`, or a function of the form `(i, name) -> Union{Bool, Float64, Nothing}`. As a `Bool`, controls absolutely whether a column will be pooled or not; if passed as a single `Bool` argument like `pool=true`, then all string columns will be pooled, regardless of cardinality. When passed as a `Float64`, the value should be between `0.0` and `1.0` indicating the threshold under which the % of unique values found in the column will result in the column being pooled. For example, if `pool=0.1`, then all string columns with a unique value % less than 10% will be returned as `PooledArray`, while other string columns will be normal string vectors. As mentioned, when the `pool` argument is a single `Bool` or `Float64`, only string columns will be considered for pooling. When a vector or dict is provided, the pooling for any column can be provided as a `Bool` or `Float64`. Similar to the [types](@ref types) argument, providing a vector to `pool` should have an element for each column in the data input, while a dict argument can map column number/name to `Bool` or `Float64` for specific columns. Unspecified columns will not be pooled when the argument is a dict. +Argument that controls whether columns will be returned as `PooledArray`s. Can be provided as a `Bool`, `Float64`, `Tuple{Float64, Int}`, vector, dict, or a function of the form `(i, name) -> Union{Bool, Real, Tuple{Float64, Int}, Nothing}`. As a `Bool`, controls absolutely whether a column will be pooled or not; if passed as a single `Bool` argument like `pool=true`, then all string columns will be pooled, regardless of cardinality. When passed as a `Float64`, the value should be between `0.0` and `1.0` to indicate the threshold under which the % of unique values found in the column will result in the column being pooled. For example, if `pool=0.1`, then all string columns with a unique value % less than 10% will be returned as `PooledArray`, while other string columns will be normal string vectors. If `pool` is provided as a tuple, like `(0.2, 500)`, the first tuple element is the same as a single `Float64` value, which represents the % cardinality allowed. The second tuple element is an upper limit on the # of unique values allowed to pool the column. So the example, `pool=(0.2, 500)` means if a String column has less than or equal to 500 unique values _and_ the # of unique values is less than 20% of total # of values, it will be pooled, otherwise, it won't. As mentioned, when the `pool` argument is a single `Bool`, `Real`, or `Tuple{Float64, Int}`, only string columns will be considered for pooling. When a vector or dict is provided, the pooling for any column can be provided as a `Bool`, `Float64`, or `Tuple{Float64, Int}`. Similar to the [types](@ref types) argument, providing a vector to `pool` should have an element for each column in the data input, while a dict argument can map column number/name to `Bool`, `Float64`, or `Tuple{Float64, Int}` for specific columns. Unspecified columns will not be pooled when the argument is a dict. ### Examples * [Pooled values](@ref pool_example) * [Non-string column pooling](@ref nonstring_pool_example) + * [Pool with absolute threshold](@ref pool_absolute_threshold) ## [`downcast`](@id downcast) diff --git a/src/CSV.jl b/src/CSV.jl index e2a3808a..589af855 100644 --- a/src/CSV.jl +++ b/src/CSV.jl @@ -56,7 +56,7 @@ Base.showerror(io::IO, e::Error) = println(io, e.msg) # constants const DEFAULT_STRINGTYPE = InlineString -const DEFAULT_POOL = 0.25 +const DEFAULT_POOL = (0.2, 500) const DEFAULT_ROWS_TO_CHECK = 30 const DEFAULT_MAX_WARNINGS = 100 const DEFAULT_MAX_INLINE_STRING_LENGTH = 32 diff --git a/src/README.md b/src/README.md index 797c5241..bd9cd242 100644 --- a/src/README.md +++ b/src/README.md @@ -22,14 +22,13 @@ By providing the `pool` keyword argument, users can control how this optimizatio Valid inputs for `pool` include: * A `Bool`, `true` or `false`, which will apply to all string columns parsed; string columns either will _all_ be pooled, or _all_ not pooled - * A `Real`, which will be converted to `Float64`, which should be a value between `0.0` and `1.0`, to indicate the % cardinality threshold _under which_ a column will be pooled. e.g. by passing `pool=0.1`, if a column has less than 10% unique values, it will end up as a `PooledArray`, otherwise a normal array. Like the `Bool` argument, this will apply the same % threshold to only/all string columns - * An `AbstractVector`, where the # of elements should/needs to match the # of columns in the dataset. Each element of the `pool` argument should be a `Bool` or `Real` indicating the pooling behavior for each specific column. - * An `AbstractDict`, with keys as `String`s, `Symbol`s, or `Int`s referring to column names or indices, and values in the `AbstractDict` being `Bool` or `Real` to again signal how specific columns should be pooled + * A `Real`, which will be converted to `Float64`, which should be a value between `0.0` and `1.0`, to indicate the % cardinality threshold _under which_ a column will be pooled. e.g. by passing `pool=0.1`, if a column has less than 10% unique values, it will end up as a `PooledArray`, otherwise a normal array. Like the `Bool` argument, this will apply the same % threshold to only/all string columns. + * a `Tuple{Float64, Int}`, where the 1st argument is the same as the above percent threshold on cardinality, while the 2nd argument is an absolute upper limit on the # of unique values. This is useful for large datasets where 0.2 may grow to allow pooled columns with thousands of values; it's helpful performance-wise to put an upper limit like `pool=(0.2, 500)` to ensure no pooled column will have more than 500 unique values. + * An `AbstractVector`, where the # of elements should/needs to match the # of columns in the dataset. Each element of the `pool` argument should be a `Bool`, `Real`, or `Tuple{Float64, Int}` indicating the pooling behavior for each specific column. + * An `AbstractDict`, with keys as `String`s, `Symbol`s, or `Int`s referring to column names or indices, and values in the `AbstractDict` being `Bool`, `Real`, or `Tuple{Float64, Int}` to again signal how specific columns should be pooled + * A function of the form `(i, nm) -> Union{Bool, Real, Tuple{Float64, Int}}` where it takes the column index and name as two arguments, and returns one of the first 3 possible pool values from the above list. For the implementation of pooling: * We normalize however the keyword argument was provided to have a `pool` value per column while parsing * We also have a `pool` field on the `Context` structure in case columns are widened while parsing, they will take on this value - * For multithreaded parsing, we decide if a column will be pooled or not from the type sampling stage; if a column has a `pool` value of `1.0`, it will _always_ be pooled (as requested), if it has `0.0` it will _not_ be pooled, if `0.0 < pool < 1.0` then we'll calculate whether or not it will be pooled from the sampled values. As noted aboved, a `pool` value of `NaN` will also be considered if a column had `String` values sampled and meets the default threshold. Currently, we'll sample `rows_to_check * ntasks` values per column, which are both configurable via keyword arguments, with defaults `rows_to_check=30` and `ntasks=Threads.nthreads()`. From those samples, we'll calculate the # of unique values divided by the total # of values sampled and compare it with the `pool` value to determine whether the column will be ultimately pooled. The ramification here is that we may "get it wrong" in two different ways: 1) based on the values sampled, we may determine that a column _will_ be pooled even though the total # of uniques values we'll parse will be over the `pool` threshold and 2) we may determine a column _shouldn't_ be pooled because of seemingly high cardinality, even though the total # of unique values for the column is ultimately _lower_ than the `pool` threshold. The only way to do things perfectly is to check _all_ the values of the entire column, but in multithreaded parsing, that would be expensive relative to the simpler sampling method. The other unique piece of multithreaded parsing is that we form a column's initial `refpool` field from the sampled values, which individual task-parsing columns will then start with when parsing their local chunks. Two tricky implementation details involved with sampling and pooling are 1) if a column's values end up being promoted to a different type _while_ sampling or post-sampling while parsing, and 2) if the whole columnset parsed is widened. For the first case, we take all sampled values and promote to the "widest" type, then when building a potential refpool, only consider values that "are" (i.e. `val isa type`) of the promoted type. That may seem like the obvious course of action, but consider that we may detect a value like `2021-01-01` as a `Date` object, but the column type is promoted to `String`; in that case, the `Date(2021, 1, 1)` object parsed _will not_ be in the initial refpool, since `!(Date(2021, 1, 1) isa String)`. For the 2nd tricky case, the columnset isn't widened while type sampling, so "extra" columns are just ignored. The columnset _will_ be widened by each local parsing task that detects the extra columns, and those extra columns will be synchronized/promoted post-parsing as needed. These "widened" columns will only ever be pooled if the user passed `pool=true`, meaning _every_ column for the whole file should be pooled. - * In the single-threaded case, we take a simpler approach by pooling any column with `pool` value `0.0 < pool <= 1.0`, meaning even if we're not totally sure the column will be pooled, we'll pool it while parsing, then decide post-parsing whether the column should actually be pooled or unpooled. - * One of the effects of these approaches to pooling in single vs. multithreading code is that we'll never change _whether a column is pooled_ while parsing; it's either decided post-sampling (multithreaded) or post-parsing (single-threaded). - * Couple other notes on things we have to account for in the multithreading case that makes things a little more complicated. We have to synchronize ref values between local parsing tasks. This is because each task is encountering unique values in different orders, so different tasks may have the same values, but mapped to different ref values. We also have to account for the fact that different parsing tasks may also have promoted to different types, so we may need to promote the pooled values. + * Once column parsing is done, the cardinality is checked against the individual column pool value and whether the column should be pooled or not is computed. \ No newline at end of file diff --git a/src/chunks.jl b/src/chunks.jl index 7c4404b4..41f3198a 100644 --- a/src/chunks.jl +++ b/src/chunks.jl @@ -64,7 +64,7 @@ function Chunks(source::ValidSources; type=nothing, types=nothing, typemap::Dict=Dict{Type, Type}(), - pool::Union{Bool, Real, AbstractVector, AbstractDict}=DEFAULT_POOL, + pool::Union{Bool, Real, AbstractVector, AbstractDict, Base.Callable, Tuple}=DEFAULT_POOL, downcast::Bool=false, lazystrings::Bool=false, stringtype::StringTypes=DEFAULT_STRINGTYPE, diff --git a/src/context.jl b/src/context.jl index 66bb2a7a..60938142 100644 --- a/src/context.jl +++ b/src/context.jl @@ -1,19 +1,3 @@ -# a RefPool holds our refs as a Dict, along with a lastref field which is incremented when a new ref is found while parsing pooled columns -mutable struct RefPool - # what? why ::Any here? well, we want flexibility in what kind of refs we stick in here - # it might be Dict{Union{String, Missing}, UInt32}, but it might be some other string type - # or it might not allow `missing`; in short, there are too many options to try and type - # the field concretely; luckily, working with the `refs` field here is limited to - # a very few specific methods, and we'll always have the column type, so we just need - # to make sure we assert the concrete type before using refs - refs::Any - lastref::UInt32 -end - -# start lastref at 1, since it's reserved for `missing`, so first ref value will be 2 -const Refs{T} = Dict{Union{T, Missing}, UInt32} -RefPool(::Type{T}=String) where {T} = RefPool(Refs{T}(), 1) - """ Internal structure used to track information for a single column in a delimited file. @@ -25,7 +9,6 @@ Fields: * `pool`: computed from `pool` keyword argument; `true` is `1.0`, `false` is `0.0`, everything else is `Float64(pool)`; once computed, this field isn't mutated at all while parsing; it's used in type detection to determine whether a column will be pooled or not once a type is detected; * `columnspecificpool`: if `pool` was provided via Vector or Dict by user, then `true`, other `false`; if `false`, then only string column types will attempt pooling * `column`: the actual column vector to hold parsed values; field is typed as `AbstractVector` and while parsing, we do switches on `col.type` to assert the column type to make code concretely typed - * `refpool`: if the column is pooled (or might be pooled in single-threaded case), this is the column-specific `RefPool` used to track unique parsed values and their `UInt32` ref codes * `lock`: in multithreaded parsing, we have a top-level set of `Vector{Column}`, then each threaded parsing task makes its own copy to parse its own chunk; when synchronizing column types/pooled refs, the task-local `Column` will `lock(col.lock)` to make changes to the parent `Column`; each task-local `Column` shares the same `lock` of the top-level `Column` * `position`: for transposed reading, the current column position * `endposition`: for transposed reading, the expected ending position for this column @@ -36,18 +19,17 @@ mutable struct Column anymissing::Bool userprovidedtype::Bool willdrop::Bool - pool::Float64 + pool::Union{Float64, Tuple{Float64, Int}} columnspecificpool::Bool # lazily/manually initialized fields column::AbstractVector - refpool::RefPool # per top-level column fields (don't need to copy per task when parsing) lock::ReentrantLock position::Int endposition::Int options::Parsers.Options - Column(type::Type, anymissing::Bool, userprovidedtype::Bool, willdrop::Bool, pool::Float64, columnspecificpool::Bool) = + Column(type::Type, anymissing::Bool, userprovidedtype::Bool, willdrop::Bool, pool::Union{Float64, Tuple{Float64, Int}}, columnspecificpool::Bool) = new(type, anymissing, userprovidedtype, willdrop, pool, columnspecificpool) end @@ -71,10 +53,6 @@ function Column(x::Column) if isdefined(x, :options) y.options = x.options end - if isdefined(x, :refpool) - # if parent has refpool from sampling, make a copy - y.refpool = RefPool(copy(x.refpool.refs), x.refpool.lastref) - end # specifically _don't_ copy/re-use x.column; that needs to be allocated fresh per parsing task return y end @@ -126,7 +104,7 @@ struct Context datarow::Int options::Parsers.Options columns::Vector{Column} - pool::Float64 + pool::Union{Float64, Tuple{Float64, Int}} downcast::Bool customtypes::Type typemap::Dict{Type, Type} @@ -239,7 +217,7 @@ end type::Union{Nothing, Type}, types::Union{Nothing, Type, AbstractVector, AbstractDict, Function}, typemap::Dict, - pool::Union{Bool, Real, AbstractVector, AbstractDict, Base.Callable}, + pool::Union{Bool, Real, AbstractVector, AbstractDict, Base.Callable, Tuple}, downcast::Bool, lazystrings::Bool, stringtype::StringTypes, diff --git a/src/file.jl b/src/file.jl index aae16f37..4103e198 100644 --- a/src/file.jl +++ b/src/file.jl @@ -271,29 +271,17 @@ function File(ctx::Context, @nospecialize(chunking::Bool=false)) # cleanup our columns if needed for col in columns @label processcolumn - if col.type === NeedsTypeDetection && pooled(col) - # we get here if user specified pool=true for at least this column - # but we never encountered any non-missing values - col.type = Missing - col.column = allocate(Pooled, finalrows) - col.refpool = RefPool(Missing) - elseif col.type === NeedsTypeDetection + if col.type === NeedsTypeDetection # fill in uninitialized column fields col.type = Missing col.column = MissingVector(finalrows) col.pool = 0.0 end - if col.column isa Vector{UInt32} - if pooled(col) - makepooled!(col) - elseif ((length(col.refpool.refs) - 1) / finalrows) <= col.pool - # check if final column should be PooledArray or not - makepooled!(col) - else - # cardinality too high, so unpool - unpool!(col, col.type, col.refpool) - @goto processcolumn - end + T = col.anymissing ? Union{col.type, Missing} : col.type + if maybepooled(col) && + (col.type isa StringTypes || col.columnspecificpool) && + checkpooled!(T, nothing, col, 0, 1, finalrows, ctx) + # col.column is a PooledArray elseif col.type === PosLenString # string col parsed lazily; return a PosLenStringVector makeposlen!(col, coltype(col), ctx) @@ -409,55 +397,26 @@ function multithreadpostparse(ctx, ntasks, pertaskcolumns, rows, finalrows, j, c if T === Float64 && T2 <: Integer # one chunk parsed as Int, another as Float64, promote to Float64 ctx.debug && println("multithreaded promoting column $j to float") - if task_col.column isa Vector{UInt32} - task_col.refpool.refs = convert(Refs{Float64}, task_col.refpool.refs) - else - task_col.column = convert(SentinelVector{Float64}, task_col.column) - end + task_col.column = convert(SentinelVector{Float64}, task_col.column) elseif T !== T2 && (T <: InlineString || (T === String && T2 <: InlineString)) # promote to widest InlineString type - if task_col.column isa Vector{UInt32} - task_col.refpool.refs = convert(Refs{T}, task_col.refpool.refs) - else - task_col.column = convert(SentinelVector{T}, task_col.column) - end + task_col.column = convert(SentinelVector{T}, task_col.column) elseif T !== T2 # one chunk parsed all missing values, but another chunk had a typed value, promote to that # while keeping all values `missing` (allocate by default ensures columns have all missing values) ctx.debug && println("multithreaded promoting column $j from missing on task $i") - task_col.column = allocate(maybepooled(col) && (T isa StringTypes || col.columnspecificpool) ? Pooled : T, task_rows) - end - # synchronize refs if needed - if maybepooled(col) && (T isa StringTypes || col.columnspecificpool) - if !isdefined(col, :refpool) && isdefined(task_col, :refpool) - # this case only occurs if user explicitly passed pool=true - # but we only parsed `missing` values during sampling - col.refpool = task_col.refpool - elseif isdefined(task_col, :refpool) - syncrefs!(col.type, col, task_col, task_rows) - end + task_col.column = allocate(T, task_rows) end end -@label threadedprocesscolumn - if maybepooled(col) && (col.type isa StringTypes || col.columnspecificpool) - if pooled(col) - makechain!(col.type, pertaskcolumns, col, j, ntasks) - # pooled columns are the one case where we invert the order of arrays; - # i.e. we return PooledArray{T, ChainedVector{T}} instead of ChainedVector{T, PooledArray{T}} - makepooled!(col) - elseif ((length(col.refpool.refs) - 1) / finalrows) <= col.pool - col.pool = 1.0 - @goto threadedprocesscolumn - else - # cardinality too high, so unpool - col.pool = 0.0 - foreach(cols -> unpool!(cols[j], col.type, col.refpool), pertaskcolumns) - @goto threadedprocesscolumn - end + T = col.anymissing ? Union{col.type, Missing} : col.type + if maybepooled(col) && + (col.type isa StringTypes || col.columnspecificpool) && + checkpooled!(T, pertaskcolumns, col, j, ntasks, finalrows, ctx) + # col.column is a PooledArray elseif col.type === Int64 # we need to special-case Int here because while parsing, a default Int64 sentinel value is chosen to # represent missing; if any chunk bumped into that sentinel value while parsing, then it cycled to a - # new sentinel value; this step ensure that each chunk has the same encoded sentinel value + # new sentinel value; this step ensures that each chunk has the same encoded sentinel value # passing force=false means it will first check if all chunks already have the same sentinel and return # immediately if so, which will be the case most often SentinelArrays.newsentinel!((pertaskcolumns[i][j].column::SVec{Int64} for i = 1:ntasks)...; force=false) @@ -477,51 +436,6 @@ function multithreadpostparse(ctx, ntasks, pertaskcolumns, rows, finalrows, j, c return end -const EMPTY_REFRECODE = UInt32[] - -# after multithreaded parsing, we need to synchronize pooled refs from different chunks of the file -# we pick one chunk as "source of truth", then adjust other chunks as needed -function syncrefs!(::Type{T}, col, task_col, task_rows) where {T} - @assert task_col.column isa Vector{UInt32} - # @inbounds begin - colrefpool = col.refpool - colrefs = colrefpool.refs::Refs{T} - taskrefpool = task_col.refpool - taskrefs = taskrefpool.refs::Refs{T} - refrecodes = EMPTY_REFRECODE - recode = false - for (k, v) in taskrefs - # `k` is our task-specific parsed value - # `v` is our task-specific UInt32 ref code - # for each task-specific ref, check if it matches parent column ref - refvalue = get(colrefs, k, UInt32(0)) - if refvalue != v - # task-specific ref didn't match parent column ref, need to recode - recode = true - if isempty(refrecodes) - # refrecodes is indexed by task-specific ref code - # each *element*, however, is the parent column ref code - refrecodes = [UInt32(i) for i = 1:taskrefpool.lastref] - end - if refvalue == 0 - # parent column didn't know about this ref, so we add it - refvalue = (colrefpool.lastref += UInt32(1)) - colrefs[k] = refvalue - end - refrecodes[v] = refvalue - end - end - if recode - column = task_col.column::Vector{UInt32} - for j = 1:task_rows - # here we recode by replacing task column ref code w/ parent column ref code - column[j] = refrecodes[column[j]] - end - end - # end # @inbounds begin - return -end - function makechain!(::Type{T}, pertaskcolumns, col, j, ntasks) where {T} if col.anymissing col.column = ChainedVector([pertaskcolumns[i][j].column for i = 1:ntasks]) @@ -530,8 +444,6 @@ function makechain!(::Type{T}, pertaskcolumns, col, j, ntasks) where {T} col.column = ChainedVector([convert(Vector{Bool}, pertaskcolumns[i][j].column::vectype(T)) for i = 1:ntasks]) elseif col.type !== Union{} && col.type <: SmallIntegers col.column = ChainedVector([convert(Vector{col.type}, pertaskcolumns[i][j].column::vectype(T)) for i = 1:ntasks]) - elseif maybepooled(col) && (col.type isa StringTypes || col.columnspecificpool) - col.column = ChainedVector([pertaskcolumns[i][j].column::Vector{UInt32} for i = 1:ntasks]) else col.column = ChainedVector([parent(pertaskcolumns[i][j].column) for i = 1:ntasks]) end @@ -539,53 +451,62 @@ function makechain!(::Type{T}, pertaskcolumns, col, j, ntasks) where {T} return end -function makepooled!(col) - T = col.type - r = isdefined(col, :refpool) ? col.refpool.refs : Refs{T === NeedsTypeDetection ? Missing : T}() - column = isdefined(col, :column) ? col.column : UInt32[] - return makepooled2!(col, T, r, column) -end - -function makepooled2!(col, ::Type{T}, r::Refs{T}, column) where {T} - if col.anymissing - r[missing] = UInt32(1) - else - # need to recode pool and refs - for (k, v) in r - @inbounds r[k] = v - 1 - end - r = convert(Dict{T, UInt32}, r) - # recode refs - for i = 1:length(column) - @inbounds column[i] -= 1 +# T is Union{T, Missing} or T depending on col.anymissing +function checkpooled!(::Type{T}, pertaskcolumns, col, j, ntasks, nrows, ctx) where {T} + S = Base.nonmissingtype(T) + pool = Dict{T, UInt32}() + lastref = Ref{UInt32}(0) + refs = Vector{UInt32}(undef, nrows) + k = 1 + limit = col.pool isa Tuple ? col.pool[2] : typemax(Int) + for i = 1:ntasks + column = (pertaskcolumns === nothing ? col.column : pertaskcolumns[i][j].column)::columntype(S) + for x in column + if x isa PosLen + if x.missingvalue + @inbounds refs[k] = get!(pool, missing) do + lastref[] += UInt32(1) + end + elseif x.escapedvalue + val = S === PosLenString ? S(ctx.buf, x, ctx.options.e) : Parsers.getstring(ctx.buf, x, ctx.options.e) + @inbounds refs[k] = get!(pool, val) do + lastref[] += UInt32(1) + end + else + val = PointerString(pointer(ctx.buf, x.pos), x.len) + index = Base.ht_keyindex2!(pool, val) + if index > 0 + @inbounds found_key = pool.vals[index] + ref = found_key::UInt32 + else + new = lastref[] += UInt32(1) + if S === PosLenString + @inbounds Base._setindex!(pool, new, S(ctx.buf, x, ctx.options.e), -index) + else + @inbounds Base._setindex!(pool, new, S(val), -index) + end + ref = new + end + @inbounds refs[k] = ref + end + else + @inbounds refs[k] = get!(pool, x) do + lastref[] += UInt32(1) + end + end + k += 1 + if length(pool) > limit + return false + end end end - col.column = PooledArray(PooledArrays.RefArray(column), r) - return -end - -function unpool!(col, T, refpool) - r = refpool.refs - column = isdefined(col, :column) ? col.column : UInt32[] - return unpool2!(col, T, r, column) -end - -function unpool2!(col, ::Type{T}, r::Refs{T}, column) where {T} - if col.anymissing - r[missing] = UInt32(1) - end - pool = Vector{keytype(r)}(undef, length(r) + 1) - for (k, v) in r - @inbounds pool[v] = k - end - if col.type === PosLenString - # unwrap the PosLenStrings, so they get handled by makeposlen! - col.column = [pool[ref].poslen for ref in column] + percent = col.pool isa Tuple ? col.pool[1] : col.pool + if ((length(pool) - 1) / nrows) <= percent + col.column = PooledArray(PooledArrays.RefArray(refs), pool) + return true else - col.column = [pool[ref] for ref in column] + return false end - col.pool = 0.0 - return end function makeposlen!(col, T, ctx) @@ -806,13 +727,7 @@ function detectcell(buf, pos, len, row, rowoffset, i, col, ctx, rowsguess)::Tupl end @label done # if we're here, that means we found a non-missing value, so we need to update column - if maybepooled(col) && (newT isa StringTypes || col.columnspecificpool) - column = allocate(Pooled, rowsguess) - col.refpool = RefPool(newT) - val = getref!(col.refpool, newT, val isa PosLen ? PosLenString(buf, val, opts.e) : val) - else - column = allocate(newT, rowsguess) - end + column = allocate(newT, rowsguess) column[row] = val col.column = column col.type = newT @@ -830,22 +745,10 @@ function parsevalue!(::Type{type}, buf, pos, len, row, rowoffset, i, col, ctx):: col.anymissing = true else column = col.column - if column isa Vector{UInt32} - if type === String || type === PosLenString - if Parsers.escapedstring(code) - ref = getref!(col.refpool, type, Parsers.getstring(buf, res.val, opts.e)) - else - poslen = res.val - ref = getref!(col.refpool, type, PointerString(pointer(buf, poslen.pos), poslen.len), buf, poslen, opts.e) - end - else - ref = getref!(col.refpool, type, res.val) - end - @inbounds column[row] = ref + if column isa Vector{PosLen} + @inbounds (column::Vector{PosLen})[row] = res.val elseif type === String @inbounds (column::SVec2{String})[row] = Parsers.getstring(buf, res.val, opts.e) - elseif type === PosLenString - @inbounds (column::Vector{PosLen})[row] = res.val else @inbounds (column::vectype(type))[row] = res.val end @@ -884,18 +787,13 @@ function parsevalue!(::Type{type}, buf, pos, len, row, rowoffset, i, col, ctx):: if !Parsers.invalid(ret.code) col.type = newT column = col.column - if column isa Vector{UInt32} - col.refpool.refs = convert(Refs{newT}, col.refpool.refs) - ref = getref!(col.refpool, newT, ret.val) - @inbounds column[row] = ref - else - col.column = convert(SentinelVector{newT}, col.column::vectype(type)) - @inbounds col.column[row] = ret.val - end + col.column = convert(SentinelVector{newT}, col.column::vectype(type)) + @inbounds col.column[row] = ret.val return pos + ret.tlen, ret.code end newT = widen(newT) end + #TODO: should we just convert(SentinelVector{String}) here? code |= PROMOTE_TO_STRING else code |= PROMOTE_TO_STRING @@ -912,11 +810,7 @@ end if !Parsers.invalid(code) col.type = to column = col.column - if column isa Vector{UInt32} - col.refpool.refs = convert(Refs{to}, col.refpool.refs) - ref = getref!(col.refpool, to, res.val) - @inbounds column[row] = ref - elseif column isa vectype(from) + if column isa vectype(from) col.column = convert(promotevectype(to), column) @inbounds col.column[row] = res.val end @@ -926,31 +820,6 @@ end return code end -@inline function getref!(refpool, ::Type{T}, key)::UInt32 where {T} - x = refpool.refs::Refs{T} - get!(x, key) do - refpool.lastref += UInt32(1) - end -end - -@inline function getref!(refpool, ::Type{T}, key::PointerString, buf, poslen, e)::UInt32 where {T} - x = refpool.refs::Refs{T} - index = Base.ht_keyindex2!(x, key) - if index > 0 - @inbounds found_key = x.vals[index] - ret = found_key::UInt32 - else - @inbounds new = refpool.lastref += UInt32(1) - if T === PosLenString - @inbounds Base._setindex!(x, new, T(buf, poslen, e), -index) - else - @inbounds Base._setindex!(x, new, T(key), -index) - end - ret = new - end - return ret -end - @inline function parsecustom!(::Type{customtypes}, buf, pos, len, row, rowoffset, i, col, ctx) where {customtypes} if @generated block = Expr(:block) @@ -978,12 +847,7 @@ end @noinline function promotetostring!(ctx::Context, buf, pos, len, rowsguess, rowoffset, columns, ::Type{customtypes}, column_to_promote, numwarnings, limit, stringtype) where {customtypes} cols = [i == column_to_promote ? columns[i] : Column(Missing, columns[i].options) for i = 1:length(columns)] col = cols[column_to_promote] - if maybepooled(col) - col.refpool = RefPool(stringtype) - col.column = allocate(Pooled, rowsguess) - else - col.column = allocate(stringtype, rowsguess) - end + col.column = allocate(stringtype, rowsguess) col.type = stringtype row = 0 startpos = pos diff --git a/src/keyworddocs.jl b/src/keyworddocs.jl index aa9596a9..38c4ab90 100644 --- a/src/keyworddocs.jl +++ b/src/keyworddocs.jl @@ -34,7 +34,7 @@ const KEYWORD_DOCS = """ * `types`: a single `Type`, `AbstractVector` or `AbstractDict` of types, or a function of the form `(i, name) -> Union{T, Nothing}` to be used for column types; if a single `Type` is provided, _all_ columns will be parsed with that single type; an `AbstractDict` can map column index `Integer`, or name `Symbol` or `String` to type for a column, i.e. `Dict(1=>Float64)` will set the first column as a `Float64`, `Dict(:column1=>Float64)` will set the column named `column1` to `Float64` and, `Dict("column1"=>Float64)` will set the `column1` to `Float64`; if a `Vector` is provided, it must match the # of columns provided or detected in `header`. If a function is provided, it takes a column index and name as arguments, and should return the desired column type for the column, or `nothing` to signal the column's type should be detected while parsing. * `typemap::Dict{Type, Type}`: a mapping of a type that should be replaced in every instance with another type, i.e. `Dict(Float64=>String)` would change every detected `Float64` column to be parsed as `String`; only "standard" types are allowed to be mapped to another type, i.e. `Int64`, `Float64`, `Date`, `DateTime`, `Time`, and `Bool`. If a column of one of those types is "detected", it will be mapped to the specified type. - * `pool::Union{Bool, Real, AbstractVector, AbstractDict, Function}=$DEFAULT_POOL`: [not supported by `CSV.Rows`] controls whether columns will be built as `PooledArray`; if `true`, all columns detected as `String` will be pooled; alternatively, the proportion of unique values below which `String` columns should be pooled (by default $DEFAULT_POOL, meaning that if the # of unique strings in a column is under $(DEFAULT_POOL * 100)%, it will be pooled); if an `AbstractVector`, each element should be `Bool` or `Real` and the # of elements should match the # of columns in the dataset; if an `AbstractDict`, a `Bool` or `Real` value can be provided for individual columns where the dict key is given as column index `Integer`, or column name as `Symbol` or `String`. If a function is provided, it should take a column index and name as 2 arguments, and return a `Bool`, `Float64`, or `nothing` for each column. + * `pool::Union{Bool, Real, AbstractVector, AbstractDict, Function, Tuple{Float64, Int}}=$DEFAULT_POOL`: [not supported by `CSV.Rows`] controls whether columns will be built as `PooledArray`; if `true`, all columns detected as `String` will be pooled; alternatively, the proportion of unique values below which `String` columns should be pooled (meaning that if the # of unique strings in a column is under 25%, `pool=0.25`, it will be pooled). If provided as a `Tuple{Float64, Int}` like `(0.2, 500)`, it represents the percent cardinality threshold as the 1st tuple element (`0.2`), and an upper limit for the # of unique values (`500`), under which the column will be pooled; this is the default (`pool=$DEFAULT_POOL`). If an `AbstractVector`, each element should be `Bool`, `Real`, or `Tuple{Float64, Int}` and the # of elements should match the # of columns in the dataset; if an `AbstractDict`, a `Bool`, `Real`, or `Tuple{Float64, Int}` value can be provided for individual columns where the dict key is given as column index `Integer`, or column name as `Symbol` or `String`. If a function is provided, it should take a column index and name as 2 arguments, and return a `Bool`, `Real`, `Tuple{Float64, Int}`, or `nothing` for each column. * `downcast::Bool=false`: controls whether columns detected as `Int64` will be "downcast" to the smallest possible integer type like `Int8`, `Int16`, `Int32`, etc. * `stringtype=$DEFAULT_STRINGTYPE`: controls how detected string columns will ultimately be returned; default is `InlineString`, which stores string data in a fixed-size primitive type that helps avoid excessive heap memory usage; if a column has values longer than 32 bytes, it will default to `String`. If `String` is passed, all string columns will just be normal `String` values. If `PosLenString` is passed, string columns will be returned as `PosLenStringVector`, which is a special "lazy" `AbstractVector` that acts as a "view" into the original file data. This can lead to the most efficient parsing times, but note that the "view" nature of `PosLenStringVector` makes it read-only, so operations like `push!`, `append!`, or `setindex!` are not supported. It also keeps a reference to the entire input dataset source, so trying to modify or delete the underlying file, for example, may fail * `strict::Bool=false`: whether invalid values should throw a parsing error or be replaced with `missing` diff --git a/src/rows.jl b/src/rows.jl index aa1fb810..6b9ab03e 100644 --- a/src/rows.jl +++ b/src/rows.jl @@ -262,8 +262,6 @@ macro unrollcolumns(setmissing, values, ex) $ex elseif column isa Vector{Union{Missing, Bool}} $ex - elseif column isa Vector{UInt32} - $ex elseif customtypes !== Tuple{} setcustom!(customtypes, $values, columns, i, $setmissing) else diff --git a/src/utils.jl b/src/utils.jl index ed2edcba..4e57ea32 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -17,18 +17,26 @@ end struct NeedsTypeDetection end nonmissingtypeunlessmissingtype(::Type{T}) where {T} = ifelse(T === Missing, Missing, nonmissingtype(T)) -pooltype(col) = nonmissingtype(keytype(col.refpool.refs)) finaltype(T) = T finaltype(::Type{HardMissing}) = Missing finaltype(::Type{NeedsTypeDetection}) = Missing coltype(col) = ifelse(col.anymissing, Union{finaltype(col.type), Missing}, finaltype(col.type)) -pooled(col) = col.pool == 1.0 -maybepooled(col) = 0.0 < col.pool +maybepooled(col) = col.pool isa Tuple ? (col.pool[1] > 0.0) : (col.pool > 0.0) -function getpool(x::Real)::Float64 +function getpool(x)::Union{Float64, Tuple{Float64, Int}} if x isa Bool return x ? 1.0 : 0.0 + elseif x isa Tuple + y = Float64(x[1]) + (isnan(y) || 0.0 <= y <= 1.0) || throw(ArgumentError("pool tuple 1st argument must be in the range: 0.0 <= x <= 1.0")) + try + z = Int(x[2]) + @assert z > 0 + return (y, z) + catch + throw(ArgumentError("pool tuple 2nd argument must be a positive integer > 0")) + end else y = Float64(x) (isnan(y) || 0.0 <= y <= 1.0) || throw(ArgumentError("pool argument must be in the range: 0.0 <= x <= 1.0")) @@ -95,8 +103,6 @@ const SmallIntegers = Union{Int8, UInt8, Int16, UInt16, Int32, UInt32} const SVec{T} = SentinelVector{T, T, Missing, Vector{T}} const SVec2{T} = SentinelVector{T, typeof(undef), Missing, Vector{T}} -struct Pooled end - promotevectype(::Type{T}) where {T <: Union{Bool, SmallIntegers}} = vectype(T) promotevectype(::Type{T}) where {T} = SentinelVector{T} @@ -107,14 +113,7 @@ function allocate!(columns, rowsguess) # if the type hasn't been detected yet, then column will get allocated # in the detect method while parsing if col.type !== NeedsTypeDetection - if maybepooled(col) && (col.type isa StringTypes || col.columnspecificpool) - col.column = allocate(Pooled, rowsguess) - if !isdefined(col, :refpool) - col.refpool = RefPool(col.type) - end - else - col.column = allocate(col.type, rowsguess) - end + col.column = allocate(col.type, rowsguess) end end return @@ -134,8 +133,6 @@ setmissing!(col::Vector{PosLen}, i) = col[i] = POSLEN_MISSING return A elseif T === String return SentinelVector{String}(undef, len) - elseif T === Pooled - return fill(UInt32(1), len) # initialize w/ all missing values elseif T === Bool return Vector{Union{Missing, Bool}}(undef, len) elseif T <: SmallIntegers @@ -146,14 +143,7 @@ setmissing!(col::Vector{PosLen}, i) = col[i] = POSLEN_MISSING end function reallocate!(@nospecialize(A), len) - if A isa Vector{UInt32} - oldlen = length(A) - resize!(A, len) - # make sure new values are initialized to missing - for i = (oldlen + 1):len - @inbounds A[i] = UInt32(1) - end - elseif A isa Vector{PosLen} + if A isa Vector{PosLen} oldlen = length(A) resize!(A, len) # when reallocating, we just need to make sure the missing bit is set for lazy string PosLen @@ -166,10 +156,13 @@ end firstarray(x::ChainedVector) = x.arrays[1] +columntype(::Type{T}) where {T <: Union{Bool, SmallIntegers}} = Vector{Union{T, Missing}} +columntype(::Type{T}) where {T} = isbitstype(T) ? SVec{T} : SVec2{T} +columntype(::Type{PosLenString}) = Vector{PosLen} + vectype(::Type{T}) where {T <: Union{Bool, SmallIntegers}} = Vector{Union{T, Missing}} vectype(::Type{T}) where {T} = isbitstype(T) ? SVec{T} : SVec2{T} vectype(::Type{PosLenString}) = PosLenStringVector{Union{PosLenString, Missing}} -vectype(::Type{Pooled}) = Vector{UInt32} pooledvectype(::Type{T}) where {T} = PooledVector{Union{T, Missing}, UInt32, Vector{UInt32}} pooledtype(::Type{T}) where {T} = PooledVector{T, UInt32, Vector{UInt32}} # missingvectype(::PooledVector{T, R, AT}) where {T, R, AT} = PooledVector{Union{T, Missing}, R, AT} diff --git a/test/basics.jl b/test/basics.jl index 20913774..ad3386bc 100644 --- a/test/basics.jl +++ b/test/basics.jl @@ -430,7 +430,7 @@ f = CSV.File(IOBuffer("x\r\n1\r\n2\r\n3\r\n4\r\n5\r\n"), footerskip=3) @test f[1][1] == 1 # 578 -f = CSV.File(IOBuffer("h1234567890123456\t"^2262 * "lasthdr\r\n" *"dummy dummy dummy\r\n"* ("1.23\t"^2262 * "2.46\r\n")^10), skipto=3, ntasks=1); +f = CSV.File(IOBuffer("h1234567890123456\t"^2262 * "lasthdr\r\n" * "dummy dummy dummy\r\n" * ("1.23\t"^2262 * "2.46\r\n")^10), skipto=3, ntasks=1); @test (length(f), length(f.names)) == (10, 2263) @test all(x -> eltype(x) == Float64, Tables.Columns(f)) @@ -669,7 +669,7 @@ row = first(CSV.Rows(IOBuffer("a,b,c\n1,2,3\n\n"); select=[:a, :c])) # 871 f = CSV.File(IOBuffer("a,b,c\n1,2,3\n3.14,5,6\n"); typemap=Dict(Float64 => String)) -@test f.a isa Vector{<:AbstractString} +@test f.a isa AbstractVector{<:AbstractString} # support SubArray{UInt8} as source f = CSV.File(IOBuffer(strip(""""column_name","data_type","is_nullable"\nfoobar,string,YES\nbazbat,timestamptz,YES"""))) diff --git a/test/runtests.jl b/test/runtests.jl index 479e970e..0b3a0409 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -41,7 +41,7 @@ include("write.jl") @test (length(f), length(f.names)) == (4, 1) @test f.X[3] === missing - f = CSV.File(IOBuffer("X\nc\nc\n\nc\nc\nc\nc\nc\nc"), ignoreemptyrows=false) + f = CSV.File(IOBuffer("X\nc\nc\n\nc\nc\nc\nc\nc\nc"), pool=0.25, ignoreemptyrows=false) @test typeof(f.X) == PooledArray{Union{Missing, InlineString1},UInt32,1,Array{UInt32,1}} @test (length(f), length(f.names)) == (9, 1) @test isequal(f.X, ["c", "c", missing, "c", "c", "c", "c", "c", "c"])