Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FR] gforce mean and sum in parallel #3042

Closed
jangorecki opened this issue Sep 11, 2018 · 8 comments
Closed

[FR] gforce mean and sum in parallel #3042

jangorecki opened this issue Sep 11, 2018 · 8 comments
Milestone

Comments

@jangorecki
Copy link
Member

jangorecki commented Sep 11, 2018

As highlighted in recently updated grouping benchmark https://h2oai.github.io/db-benchmark/ data.table is already lagged behind some other tools, precisely speaking those that can perform aggregation using multiple cores. To keep up with the competition we need to parallelize grouping.
Related issues:

We should try to make it for 1.12.0.

@st-pasha
Copy link
Contributor

Related: #2533

@jangorecki
Copy link
Member Author

@st-pasha thanks, added to first issue.

@nbenn
Copy link
Member

nbenn commented Oct 29, 2018

I posted here last week, but there were some mistakes in my post and it was long and confusing, so I deleted it again. Sorry for that. This is my 2nd attempt:

I have a function which I would like to apply in grouped-by fashion. Applying this function takes much longer as the data.table internals involved in grouping etc. Such a scenario lends itself to be run in parallel but currently this does not work out as one could (naïvely) hope.

Let's say, I have data like tbl

new_dt <- function(n = 3e6) {
  data.table::data.table(
    a = rep(LETTERS, each = n),
    b = rep(letters, times = n),
    c = runif(length(letters) * n, 0, 1),
    d = runif(length(letters) * n, 1, 2),
    e = runif(length(letters) * n, 1, 3),
    f = rnorm(length(letters) * n, 0, 1),
    g = rnorm(length(letters) * n, 1, 2),
    h = rnorm(length(letters) * n, 2, 1)
  )
}
tbl <- new_dt()

and two functions col_var() and plus_one() like

col_var <- function(x) lapply(x, var)
plus_one <- function(x) lapply(x, function(y) y + 1)

Now if I split the data into n_core partitions (by adding a group index), run in parallel with parallel::mcparallel() and within each partition do something like

dt[GroupIndex == i, c(use_cols) := fun(.SD), by = group_by, .SDcols = use_cols]

this works fine with col_var() but not with plus_one(). The reason for this is serialisation of the results for communication from child to master process. For col_var(), nrow(res) is 26 ^ 2 / n_cores, whereas for plus_one(), nrow(res) is n / n_cores (and n = 3e6).

For something like this to work efficiently, we need a way to place a data.table in shared memory (see #3104). This leads me to two questions:

  • Do the maintainers feel that this is in-scope for data.table?
  • I'd be interested in having a go at this. Are there people willing to offer some guidance/discussion on implementation strategy (there are several options for shared memory), package API decisions, etc.?

Some of my experiment code is available from here.

@st-pasha
Copy link
Contributor

@nbenn GroupIndex == i is probably going to be inefficient, since in order to compute this filtering condition, you'd have to run through the entire column GroupIndex, in each thread. That is unless the table is known to be sorted by GroupIndex, in which case a simple binary search would be sufficient.

Also, please note that many groupby tasks do not actually require putting data.table into shared memory and forking R process via parallel::mcparallel(), they can be implemented using the built-in OpenMP library.

@nbenn
Copy link
Member

nbenn commented Oct 29, 2018

@st-pasha

GroupIndex == i is probably going to be inefficient, since in order to compute this filtering condition, you'd have to run through the entire column GroupIndex, in each thread.

Sure, this isn't great. You could make the grouping col an index, or create a vector like

grp_vec <- parallel::splitIndices(nrow(dt), n_cores)

and use this to subset the data.table. But this is beside the point here:

  1. if the time spent evaluating fun is large, all of this becomes irrelevant
  2. this group indexing could easily be done in C to speed things up
  3. the subsetting in forked processes is what currently is working. The problem ist getting the results back to the master process (that is why I'm proposing a shared memory implementation).

Also, please note that many groupby tasks do not actually require putting data.table into shared memory and forking R process via parallel::mcparallel(), they can be implemented using the built-in OpenMP library.

What do you mean by many groupby tasks? A handful of functions like sum(), mean(), var() etc.? This is not what I am talking about here. I am talking about enabling arbitrary functions to be applied in parallel grouped-by fashion.

@st-pasha
Copy link
Contributor

@nbenn Great, it helps to clear things up -- that's why we have this discussion in the first place!

When you say "what do you mean by many groupby tasks" -- I really do mean the handful of functions like sum(), mean(), etc. They are not "many" by the simple count, but they account for the majority of use cases that data.table users face. And they are probably what @jangorecki had in mind when creating this issue, given that the benchmark he posted uses sums and means.

Which is NOT to say that your use case is somehow less important -- clearly it is important to you and to many people like you. So it should be implemented too.

So, I think this discussion helped us understand the following: there are 2 (at least) use cases for "parallel grouping": (1) simple group-by reductions like sum(), mean(), etc; (2) arbitrary group-by operation with .SD, like computing a regression. These two use cases require radically different approaches: one uses OpenMP, the other process forks. I would say it also means that this issue ought to be split into two: one dedicated to use-case 1, and the other to use-case 2.

@nbenn
Copy link
Member

nbenn commented Oct 30, 2018

Yes, @st-pasha I agree that splitting this issue makes sense. The reason why I posted here is that there are already quite a few issues surrounding this topic of "parallel group-by operation" that I did not want to create yet another new issue. But as the two scenarios you are outlining above require different approaches, it does make sense. Thank you for engaging in this discussion.

@mattdowle mattdowle changed the title [FR] grouping in parallel [FR] gforce mean and sum in parallel Jan 11, 2019
@mattdowle
Copy link
Member

The focus of this FR at the top was the 5 grouping tests on https://h2oai.github.io/db-benchmark/.
There is much left to do on extending to other gforce functions and grouping arbitrary functions (existing or new FRs should be refined) but I'd say this FR has been achieved for 1.12.0 which was the target Jan suggested. Have updated the title and closed.
For future reference, results were tweeted here : https://twitter.com/MattDowle/status/1074746218645938176
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants