Skip to content

Commit

Permalink
[SPARK-2871] [PySpark] add histgram() API
Browse files Browse the repository at this point in the history
RDD.histogram(buckets)

        Compute a histogram using the provided buckets. The buckets
        are all open to the right except for the last which is closed.
        e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
        which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
        and 50 we would have a histogram of 1,0,1.

        If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
        this can be switched from an O(log n) inseration to O(1) per
        element(where n = # buckets).

        Buckets must be sorted and not contain any duplicates, must be
        at least two elements.

        If `buckets` is a number, it will generates buckets which is
        evenly spaced between the minimum and maximum of the RDD. For
        example, if the min value is 0 and the max is 100, given buckets
        as 2, the resulting buckets will be [0,50) [50,100]. buckets must
        be at least 1 If the RDD contains infinity, NaN throws an exception
        If the elements in RDD do not vary (max == min) always returns
        a single bucket.

        It will return an tuple of buckets and histogram.

        >>> rdd = sc.parallelize(range(51))
        >>> rdd.histogram(2)
        ([0, 25, 50], [25, 26])
        >>> rdd.histogram([0, 5, 25, 50])
        ([0, 5, 25, 50], [5, 20, 26])
        >>> rdd.histogram([0, 15, 30, 45, 60], True)
        ([0, 15, 30, 45, 60], [15, 15, 15, 6])
        >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
        >>> rdd.histogram(("a", "b", "c"))
        (('a', 'b', 'c'), [2, 2])

closes #122, it's duplicated.

Author: Davies Liu <[email protected]>

Closes #2091 from davies/histgram and squashes the following commits:

a322f8a [Davies Liu] fix deprecation of e.message
84e85fa [Davies Liu] remove evenBuckets, add more tests (including str)
d9a0722 [Davies Liu] address comments
0e18a2d [Davies Liu] add histgram() API
  • Loading branch information
davies authored and JoshRosen committed Aug 26, 2014
1 parent 8856c3d commit 3cedc4f
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 1 deletion.
129 changes: 128 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import heapq
import bisect
from random import Random
from math import sqrt, log
from math import sqrt, log, isinf, isnan

from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
Expand Down Expand Up @@ -886,6 +886,133 @@ def redFunc(left_counter, right_counter):

return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)

def histogram(self, buckets):
"""
Compute a histogram using the provided buckets. The buckets
are all open to the right except for the last which is closed.
e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
and 50 we would have a histogram of 1,0,1.
If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
this can be switched from an O(log n) inseration to O(1) per
element(where n = # buckets).
Buckets must be sorted and not contain any duplicates, must be
at least two elements.
If `buckets` is a number, it will generates buckets which are
evenly spaced between the minimum and maximum of the RDD. For
example, if the min value is 0 and the max is 100, given buckets
as 2, the resulting buckets will be [0,50) [50,100]. buckets must
be at least 1 If the RDD contains infinity, NaN throws an exception
If the elements in RDD do not vary (max == min) always returns
a single bucket.
It will return an tuple of buckets and histogram.
>>> rdd = sc.parallelize(range(51))
>>> rdd.histogram(2)
([0, 25, 50], [25, 26])
>>> rdd.histogram([0, 5, 25, 50])
([0, 5, 25, 50], [5, 20, 26])
>>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets
([0, 15, 30, 45, 60], [15, 15, 15, 6])
>>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
>>> rdd.histogram(("a", "b", "c"))
(('a', 'b', 'c'), [2, 2])
"""

if isinstance(buckets, (int, long)):
if buckets < 1:
raise ValueError("number of buckets must be >= 1")

# filter out non-comparable elements
def comparable(x):
if x is None:
return False
if type(x) is float and isnan(x):
return False
return True

filtered = self.filter(comparable)

# faster than stats()
def minmax(a, b):
return min(a[0], b[0]), max(a[1], b[1])
try:
minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax)
except TypeError as e:
if " empty " in str(e):
raise ValueError("can not generate buckets from empty RDD")
raise

if minv == maxv or buckets == 1:
return [minv, maxv], [filtered.count()]

try:
inc = (maxv - minv) / buckets
except TypeError:
raise TypeError("Can not generate buckets with non-number in RDD")

if isinf(inc):
raise ValueError("Can not generate buckets with infinite value")

# keep them as integer if possible
if inc * buckets != maxv - minv:
inc = (maxv - minv) * 1.0 / buckets

buckets = [i * inc + minv for i in range(buckets)]
buckets.append(maxv) # fix accumulated error
even = True

elif isinstance(buckets, (list, tuple)):
if len(buckets) < 2:
raise ValueError("buckets should have more than one value")

if any(i is None or isinstance(i, float) and isnan(i) for i in buckets):
raise ValueError("can not have None or NaN in buckets")

if sorted(buckets) != list(buckets):
raise ValueError("buckets should be sorted")

if len(set(buckets)) != len(buckets):
raise ValueError("buckets should not contain duplicated values")

minv = buckets[0]
maxv = buckets[-1]
even = False
inc = None
try:
steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)]
except TypeError:
pass # objects in buckets do not support '-'
else:
if max(steps) - min(steps) < 1e-10: # handle precision errors
even = True
inc = (maxv - minv) / (len(buckets) - 1)

else:
raise TypeError("buckets should be a list or tuple or number(int or long)")

def histogram(iterator):
counters = [0] * len(buckets)
for i in iterator:
if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv:
continue
t = (int((i - minv) / inc) if even
else bisect.bisect_right(buckets, i) - 1)
counters[t] += 1
# add last two together
last = counters.pop()
counters[-1] += last
return [counters]

def mergeCounters(a, b):
return [i + j for i, j in zip(a, b)]

return buckets, self.mapPartitions(histogram).reduce(mergeCounters)

def mean(self):
"""
Compute the mean of this RDD's elements.
Expand Down
104 changes: 104 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,110 @@ def test_zip_with_different_number_of_items(self):
self.assertEquals(a.count(), b.count())
self.assertRaises(Exception, lambda: a.zip(b).count())

def test_histogram(self):
# empty
rdd = self.sc.parallelize([])
self.assertEquals([0], rdd.histogram([0, 10])[1])
self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1])
self.assertRaises(ValueError, lambda: rdd.histogram(1))

# out of range
rdd = self.sc.parallelize([10.01, -0.01])
self.assertEquals([0], rdd.histogram([0, 10])[1])
self.assertEquals([0, 0], rdd.histogram((0, 4, 10))[1])

# in range with one bucket
rdd = self.sc.parallelize(range(1, 5))
self.assertEquals([4], rdd.histogram([0, 10])[1])
self.assertEquals([3, 1], rdd.histogram([0, 4, 10])[1])

# in range with one bucket exact match
self.assertEquals([4], rdd.histogram([1, 4])[1])

# out of range with two buckets
rdd = self.sc.parallelize([10.01, -0.01])
self.assertEquals([0, 0], rdd.histogram([0, 5, 10])[1])

# out of range with two uneven buckets
rdd = self.sc.parallelize([10.01, -0.01])
self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1])

# in range with two buckets
rdd = self.sc.parallelize([1, 2, 3, 5, 6])
self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1])

# in range with two bucket and None
rdd = self.sc.parallelize([1, 2, 3, 5, 6, None, float('nan')])
self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1])

# in range with two uneven buckets
rdd = self.sc.parallelize([1, 2, 3, 5, 6])
self.assertEquals([3, 2], rdd.histogram([0, 5, 11])[1])

# mixed range with two uneven buckets
rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01])
self.assertEquals([4, 3], rdd.histogram([0, 5, 11])[1])

# mixed range with four uneven buckets
rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, 200.0, 200.1])
self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])

# mixed range with uneven buckets and NaN
rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0,
199.0, 200.0, 200.1, None, float('nan')])
self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])

# out of range with infinite buckets
rdd = self.sc.parallelize([10.01, -0.01, float('nan'), float("inf")])
self.assertEquals([1, 2], rdd.histogram([float('-inf'), 0, float('inf')])[1])

# invalid buckets
self.assertRaises(ValueError, lambda: rdd.histogram([]))
self.assertRaises(ValueError, lambda: rdd.histogram([1]))
self.assertRaises(ValueError, lambda: rdd.histogram(0))
self.assertRaises(TypeError, lambda: rdd.histogram({}))

# without buckets
rdd = self.sc.parallelize(range(1, 5))
self.assertEquals(([1, 4], [4]), rdd.histogram(1))

# without buckets single element
rdd = self.sc.parallelize([1])
self.assertEquals(([1, 1], [1]), rdd.histogram(1))

# without bucket no range
rdd = self.sc.parallelize([1] * 4)
self.assertEquals(([1, 1], [4]), rdd.histogram(1))

# without buckets basic two
rdd = self.sc.parallelize(range(1, 5))
self.assertEquals(([1, 2.5, 4], [2, 2]), rdd.histogram(2))

# without buckets with more requested than elements
rdd = self.sc.parallelize([1, 2])
buckets = [1 + 0.2 * i for i in range(6)]
hist = [1, 0, 0, 0, 1]
self.assertEquals((buckets, hist), rdd.histogram(5))

# invalid RDDs
rdd = self.sc.parallelize([1, float('inf')])
self.assertRaises(ValueError, lambda: rdd.histogram(2))
rdd = self.sc.parallelize([float('nan')])
self.assertRaises(ValueError, lambda: rdd.histogram(2))

# string
rdd = self.sc.parallelize(["ab", "ac", "b", "bd", "ef"], 2)
self.assertEquals([2, 2], rdd.histogram(["a", "b", "c"])[1])
self.assertEquals((["ab", "ef"], [5]), rdd.histogram(1))
self.assertRaises(TypeError, lambda: rdd.histogram(2))

# mixed RDD
rdd = self.sc.parallelize([1, 4, "ab", "ac", "b"], 2)
self.assertEquals([1, 1], rdd.histogram([0, 4, 10])[1])
self.assertEquals([2, 1], rdd.histogram(["a", "b", "c"])[1])
self.assertEquals(([1, "b"], [5]), rdd.histogram(1))
self.assertRaises(TypeError, lambda: rdd.histogram(2))


class TestIO(PySparkTestCase):

Expand Down

0 comments on commit 3cedc4f

Please sign in to comment.