Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1170-pyspark-histogram: added histogram method, added max and min to statscounter #122

Closed
wants to merge 7 commits into from
61 changes: 60 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import sys
import shlex
import traceback
from bisect import bisect_right
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
Expand Down Expand Up @@ -572,7 +573,6 @@ def func(iterator):

# TODO: aggregate


def max(self):
"""
Find the maximum item in this RDD.
Expand Down Expand Up @@ -666,6 +666,65 @@ def sampleVariance(self):
"""
return self.stats().sampleVariance()

def _getBuckets(self, bucketCount):
#use the statscounter as a quick way of getting max and min
mm_stats = self.stats()
min = mm_stats.min()
max = mm_stats.max()

increment = (max-min)/bucketCount
buckets = range(min,min)
if increment != 0:
buckets = range(min,max, increment)

return {"min":min, "max":max, "buckets":buckets}

def histogram(self, bucketCount, buckets=None):
"""
Compute a histogram of the data using bucketCount number of buckets
evenly spaced between the min and max of the RDD.

>>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3)
defaultdict(<type 'int'>, {(67, 100): 2, (1, 33): 6, (34, 66): 2})
"""
min = float("-inf")
max = float("inf")
evenBuckets = False
if not buckets:
b = self._getBuckets(bucketCount)
buckets = b["buckets"]
min = b["min"]
max = b["max"]

if len(buckets) < 2:
raise ValueError("requires more than 1 bucket")
if len(buckets) % 2 == 0:
evenBuckets = True

def histogramPartition(iterator):
counters = defaultdict(int)
for obj in iterator:
k = bisect_right(buckets, obj)
if k < len(buckets) and k > 0:
key = (buckets[k-1], buckets[k]-1)
elif k == len(buckets):
key = (buckets[k-1], max)
elif k == 0:
key = (min, buckets[k]-1)
print obj, k, key
counters[key] += 1
yield counters


def mergeCounters(d1, d2):
for k in d2.keys():
if k in d1:
d1[k] += d2[k]
return d1

return self.mapPartitions(histogramPartition).reduce(mergeCounters)


def countByValue(self):
"""
Return the count of each unique value in this RDD as a dictionary of
Expand Down