Skip to content

Commit

Permalink
[DTable] DataFrames-like select using DataFrames syntax #344
Browse files Browse the repository at this point in the history
  • Loading branch information
krynju committed Jun 17, 2022
1 parent 5f315d1 commit ad5f1ce
Show file tree
Hide file tree
Showing 11 changed files with 594 additions and 6 deletions.
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ version = "0.14.4"
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
ContextVariablesX = "6add18c4-b38d-439d-96f6-d6bc489c04c5"
DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
InvertedIndices = "41ab1584-1d38-5bbf-9106-f11c6c58b48f"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94"
Profile = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79"
Expand Down
43 changes: 43 additions & 0 deletions demo2.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using DataFrames
using Dagger
using Statistics

r(d::Dagger.DTable, args...) = fetch(Dagger.select(d, args...), DataFrame)

r(d::DataFrames.DataFrame, args...) = (DataFrames.select(d, args...);)

t(d::Dagger.DTable, args...) = @time wait(Dagger.select(d, args...))

t(d::DataFrames.DataFrame, args...) = (@time DataFrames.select(d, args...); nothing)

partitions = 100
s = 1_000_000
nt = (a=collect(1:s) .% 3, b=rand(s))
dt = DTable(nt, s ÷ partitions)
df = fetch(dt, DataFrame)


r(dt, :a => :eee)
r(df, :a => :eee)

t(dt, :b, :a, AsTable([:a, :b]) => ByRow(sum))
t(df, :b, :a, AsTable([:a, :b]) => ByRow(sum))

## DTable Column wrapper speed
@time select(df, :b => mean)
@time mean(Dagger.DTableColumn(dt, 2))
t(dt, :b => mean)

r(dt, AsTable([:a, :b]) => ByRow(identity))
r(dt, [:a, :a] => ((x, y) -> x .+ y), :b)
r(dt, [:a, :a] => ((x, y) -> x .+ y)) # this broken

r(dt, [] => ByRow(() -> 1) => :x, :b) # make this work

r(dt, [] => ByRow(rand) => :x) # make this work

r(dt, :a => mean)

@time map(row -> (a=row.a, b=row.b), dt)


16 changes: 15 additions & 1 deletion docs/src/dtable.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,4 +335,18 @@ julia> pp(innerjoin(dt, d2, on=:a))
3, 3, -3
4, 4, -4
5, 5, -5
```
```

# DataFrames.jl minilanguage and operations support (experimental)

Support for `DataFrames.jl` minilanguage and operations is planned for the `DTable`
to enable a seemless transition between in-memory and distributed data processing.

As of today `select` is available with more operations to come in the future.

The goal is to provide exactly the same output as for DataFrames using the same `args`.
Even though the output should be the same the DTable may require modification of user input in order to provide optimal distributed performance.

One already known tactic is to avoid functions that require access to the full column at once.
The user should prefer to use `ByRow` equivalents or `reduce` instead.
A complete performance guide will surely be a part of the documentation at some point.
3 changes: 3 additions & 0 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ include("table/operations.jl")
include("table/groupby.jl")
include("table/join_interface.jl")
include("table/join.jl")
include("table/dtable_column.jl")
include("table/dataframes_interface_utils.jl")
include("table/dataframes_interface.jl")

include("lib/logging-extras.jl")

Expand Down
245 changes: 245 additions & 0 deletions src/table/dataframes_interface.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
import InvertedIndices: BroadcastedInvertedIndex
import DataAPI: Between, All, Cols, BroadcastedSelector
import DataFrames: ColumnIndex, MultiColumnIndex,
ByRow, AsTable, normalize_selection

make_pair_concrete(@nospecialize(x::Pair)) =
make_pair_concrete(x.first) => make_pair_concrete(x.second)
make_pair_concrete(@nospecialize(x)) = x

broadcast_pair(df::DTable, @nospecialize(p::Any)) = p

# Copied as is from DataFrames.jl
function broadcast_pair(df::DTable, @nospecialize(p::Pair))
src, second = p
src_broadcast = src isa Union{BroadcastedInvertedIndex,
BroadcastedSelector}
second_broadcast = second isa Union{BroadcastedInvertedIndex,
BroadcastedSelector}
if second isa Pair
fun, dst = second
dst_broadcast = dst isa Union{BroadcastedInvertedIndex,
BroadcastedSelector}
if src_broadcast || dst_broadcast
new_src = src_broadcast ? names(df, src.sel) : src
new_dst = dst_broadcast ? names(df, dst.sel) : dst
new_p = new_src .=> fun .=> new_dst
return isempty(new_p) ? [] : new_p
else
return p
end
else
if src_broadcast || second_broadcast
new_src = src_broadcast ? names(df, src.sel) : src
new_second = second_broadcast ? names(df, second.sel) : second
new_p = new_src .=> new_second
return isempty(new_p) ? [] : new_p
else
return p
end
end
end

# this is needed in broadcasting when one of dimensions has length 0
# as then broadcasting produces Matrix{Any} rather than Matrix{<:Pair}
broadcast_pair(df::DTable, @nospecialize(p::AbstractMatrix)) = isempty(p) ? [] : p


# Copied as is from DataFrames.jl
function broadcast_pair(df::DTable, @nospecialize(p::AbstractVecOrMat{<:Pair}))
isempty(p) && return []
need_broadcast = false

src = first.(p)
first_src = first(src)
if first_src isa Union{BroadcastedInvertedIndex,
BroadcastedSelector}
if any(!=(first_src), src)
throw(ArgumentError("when broadcasting column selector it must " *
"have a constant value"))
end
need_broadcast = true
new_names = names(df, first_src.sel)
if !(length(new_names) == size(p, 1) || size(p, 1) == 1)
throw(ArgumentError("broadcasted dimension does not match the " *
"number of selected columns"))
end
new_src = new_names
else
new_src = src
end

second = last.(p)
first_second = first(second)
if first_second isa Union{BroadcastedInvertedIndex,
BroadcastedSelector}
if any(!=(first_second), second)
throw(ArgumentError("when using broadcasted column selector it " *
"must have a constant value"))
end
need_broadcast = true
new_names = names(df, first_second.sel)
if !(length(new_names) == size(p, 1) || size(p, 1) == 1)
throw(ArgumentError("broadcasted dimension does not match the " *
"number of selected columns"))
end
new_second = new_names
else
if first_second isa Pair
fun, dst = first_second
if dst isa Union{BroadcastedInvertedIndex,
BroadcastedSelector}
if !all(x -> x isa Pair && last(x) == dst, second)
throw(ArgumentError("when using broadcasted column selector " *
"it must have a constant value"))
end
need_broadcast = true
new_names = names(df, dst.sel)
if !(length(new_names) == size(p, 1) || size(p, 1) == 1)
throw(ArgumentError("broadcasted dimension does not match the " *
"number of selected columns"))
end
new_dst = new_names
new_second = first.(second) .=> new_dst
else
new_second = second
end
else
new_second = second
end
end

if need_broadcast
new_p = new_src .=> new_second
return isempty(new_p) ? [] : new_p
else
return p
end
end

# Copied as is from DataFrames.jl
function manipulate(df::DTable, @nospecialize(cs...); copycols::Bool, keeprows::Bool, renamecols::Bool)
cs_vec = []
for v in cs
if v isa AbstractVecOrMat{<:Pair}
append!(cs_vec, v)
else
push!(cs_vec, v)
end
end
return _manipulate(df, Any[normalize_selection(index(df), make_pair_concrete(c), renamecols) for c in cs_vec],
copycols, keeprows)
end

# Not copied - full custom implementation
function _manipulate(df::DTable, normalized_cs::Vector{Any}, copycols::Bool, keeprows::Bool)
#########
# STAGE 1: Spawning full column thunks - also multicolumn when needed (except identity)
# These get saved later and used in last stages.
#########
colresults = Dict{Int,Any}()
for (i, (colidx, (f, _))) in enumerate(normalized_cs)
if !(colidx isa AsTable) && !(f isa ByRow) && f != identity
if length(colidx) > 0
cs = DTableColumn.(Ref(df), [colidx...])
colresults[i] = Dagger.@spawn f(cs...)
else
colresults[i] = Dagger.@spawn f() # case of select(d, [] => fun)
end
end
end

#########
# STAGE 2: Fetching full column thunks with result of length 1
# These will be just injected as values in the mapping, because it's a vector full of these values
#########

colresults = Dict{Int,Any}(
k => fetch(Dagger.spawn(length, v)) == 1 ? fetch(v) : v
for (k, v) in colresults
)

mapmask = [
haskey(colresults, x) && colresults[x] isa Dagger.EagerThunk
for (x, _) in enumerate(normalized_cs)
]

mappable_part_of_normalized_cs = filter(x -> !mapmask[x[1]], collect(enumerate(normalized_cs)))

#########
# STAGE 3: Mapping function (need to ensure this is compiled only once)
# It's awful right now, but it covers all cases
# Essentially we skip all the non-mappable stuff here
#########

rd = map(x -> select_rowfunction(x, mappable_part_of_normalized_cs, colresults), df)

#########
# STAGE 4: Preping for last stage - getting all the full column thunks with not 1 lengths
#########
cpcolresults = Dict{Int,Any}()

for (k, v) in colresults
if v isa Dagger.EagerThunk
cpcolresults[k] = v
end
end

for (_, v) in colresults
if v isa Dagger.EagerThunk
if fetch(Dagger.spawn(length, v)) != length(df)
throw("result column is not the size of the table")
end
end
end
#########
# STAGE 5: Fill columns - meaning the previously omitted full column tasks
# will be now merged into the final DTable
#########

rd = fillcolumns(rd, cpcolresults, normalized_cs, chunk_lengths(df))

return rd
end

# Not copied - full custom implementation
function manipulate(dt::DTable, args::AbstractVector{Int}; copycols::Bool, keeprows::Bool, renamecols::Bool)
colidx = first(args)
colname = Tables.columnnames(Tables.columns(dt))[colidx]
map(r -> (; colname => Tables.getcolumn(r, colidx)), dt)
end

# Copied as is from DataFrames.jl
function manipulate(df::DTable, c::MultiColumnIndex; copycols::Bool, keeprows::Bool,
renamecols::Bool)
if c isa AbstractVector{<:Pair}
return manipulate(df, c..., copycols=copycols, keeprows=keeprows,
renamecols=renamecols)
else
return manipulate(df, index(df)[c], copycols=copycols, keeprows=keeprows,
renamecols=renamecols)
end
end

# Copied as is from DataFrames.jl
manipulate(df::DTable, c::ColumnIndex; copycols::Bool, keeprows::Bool, renamecols::Bool) =
manipulate(df, Int[index(df)[c]], copycols=copycols, keeprows=keeprows, renamecols=renamecols)


"""
select(df::DTable, args...; copycols::Bool=true, renamecols::Bool=true)
Create a new DTable that contains columns from `df` specified by `args` and return it.
The result is guaranteed to have the same number of rows as df, except when no columns
are selected (in which case the result has zero rows).
This operation is supposed to provide the same functionality and syntax as `DataFrames.select`,
but for DTable input. Most cases should be covered and the output should be exactly the
same as one obtained using DataFrames. In case of output differences or `args` causing errors
please file an issue with reproduction steps and data.
Please refer to DataFrames documentation for more details on usage.
"""
select(df::DTable, @nospecialize(args...); copycols::Bool=true, renamecols::Bool=true) =
manipulate(df, map(x -> broadcast_pair(df, x), args)...,
copycols=copycols, keeprows=true, renamecols=renamecols)
Loading

0 comments on commit ad5f1ce

Please sign in to comment.