From 96b047b75c6c5e5d7ffa2848fff7a635f35dde88 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 13 Mar 2014 19:11:06 -0700 Subject: [PATCH 1/3] Add aggregate to python rdd --- python/pyspark/rdd.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6d549b40e5698..2f0f44c4af214 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -524,7 +524,7 @@ def _collect_iterator_through_file(self, iterator): def reduce(self, f): """ Reduces the elements of this RDD using the specified commutative and - associative binary operator. + associative binary operator. Currently reduces partitions locally. >>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) @@ -566,8 +566,32 @@ def func(iterator): vals = self.mapPartitions(func).collect() return reduce(op, vals, zeroValue) - # TODO: aggregate + def aggregate(self, zeroValue, seqOp, combOp): + """ + Aggregate the elements of each partition, and then the results for all + the partitions, using a given combine functions and a neutral "zero + value." + + The functions C{op(t1, t2)} is allowed to modify C{t1} and return it + as its result value to avoid object allocation; however, it should not + modify C{t2}. + + The first function (seqOp) can return a different result type, U, than + the type of this RDD. Thus, we need one operation for merging a T into an U + and one operation for merging two U + >>> seqOp = (lambda x, y: (x[0]+y, x[1] + 1)) + >>> combOp = (lambda x, y: (x[0]+y[0], x[1] + y[1])) + >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) + (10, 4) + """ + def func(iterator): + acc = zeroValue + for obj in iterator: + acc = seqOp(acc, obj) + if acc is not None: + yield acc + return self.mapPartitions(func).reduce(combOp) def sum(self): """ Add up the elements in this RDD. From 70b4724cc7e98eb9f75f0808a9b4995bd5de9f42 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 14 Mar 2014 16:46:21 -0700 Subject: [PATCH 2/3] Style fixes from code review --- python/pyspark/rdd.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 2f0f44c4af214..b1f26d6f37d5e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -580,8 +580,8 @@ def aggregate(self, zeroValue, seqOp, combOp): the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U - >>> seqOp = (lambda x, y: (x[0]+y, x[1] + 1)) - >>> combOp = (lambda x, y: (x[0]+y[0], x[1] + y[1])) + >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) + >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) (10, 4) """ @@ -592,6 +592,7 @@ def func(iterator): if acc is not None: yield acc return self.mapPartitions(func).reduce(combOp) + def sum(self): """ Add up the elements in this RDD. From 4879c755c3455280d727c65539d9527ca838f01c Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 7 Apr 2014 21:43:12 -0700 Subject: [PATCH 3/3] CR feedback, fix issue with empty RDDs in aggregate --- python/pyspark/rdd.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index b1f26d6f37d5e..d5bcebde1a7f2 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -584,14 +584,16 @@ def aggregate(self, zeroValue, seqOp, combOp): >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) (10, 4) + >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) + (0, 0) """ def func(iterator): acc = zeroValue for obj in iterator: acc = seqOp(acc, obj) - if acc is not None: - yield acc - return self.mapPartitions(func).reduce(combOp) + yield acc + + return self.mapPartitions(func).fold(zeroValue, combOp) def sum(self): """