Skip to content

Commit

Permalink
SPARK-1242 Add aggregate to python rdd
Browse files Browse the repository at this point in the history
Author: Holden Karau <[email protected]>

Closes apache#139 from holdenk/add_aggregate_to_python_api and squashes the following commits:

0f39ae3 [Holden Karau] Merge in master
4879c75 [Holden Karau] CR feedback, fix issue with empty RDDs in aggregate
70b4724 [Holden Karau] Style fixes from code review
96b047b [Holden Karau] Add aggregate to python rdd
  • Loading branch information
holdenk authored and pdeyhim committed Jun 25, 2014
1 parent aab55db commit 8ffe635
Showing 1 changed file with 29 additions and 2 deletions.
31 changes: 29 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ def _collect_iterator_through_file(self, iterator):
def reduce(self, f):
"""
Reduces the elements of this RDD using the specified commutative and
associative binary operator.
associative binary operator. Currently reduces partitions locally.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
Expand Down Expand Up @@ -641,7 +641,34 @@ def func(iterator):
vals = self.mapPartitions(func).collect()
return reduce(op, vals, zeroValue)

# TODO: aggregate
def aggregate(self, zeroValue, seqOp, combOp):
"""
Aggregate the elements of each partition, and then the results for all
the partitions, using a given combine functions and a neutral "zero
value."
The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
as its result value to avoid object allocation; however, it should not
modify C{t2}.
The first function (seqOp) can return a different result type, U, than
the type of this RDD. Thus, we need one operation for merging a T into an U
and one operation for merging two U
>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
>>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
(10, 4)
>>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
(0, 0)
"""
def func(iterator):
acc = zeroValue
for obj in iterator:
acc = seqOp(acc, obj)
yield acc

return self.mapPartitions(func).fold(zeroValue, combOp)


def max(self):
Expand Down

0 comments on commit 8ffe635

Please sign in to comment.