Skip to content

Commit

Permalink
remove numpy in RDDSampler
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Nov 14, 2014
1 parent 78bf997 commit 51649f5
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 66 deletions.
12 changes: 8 additions & 4 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,13 @@ def distinct(self, numPartitions=None):

def sample(self, withReplacement, fraction, seed=None):
"""
Return a sampled subset of this RDD (relies on numpy and falls back
on default random generator if numpy is unavailable).
Return a sampled subset of this RDD.
>>> rdd = sc.parallelize(range(100), 4)
>>> rdd.sample(True, 0.1, 27).count()
10
>>> rdd.sample(False, 0.1, 81).count()
10
"""
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
Expand Down Expand Up @@ -343,8 +348,7 @@ def randomSplit(self, weights, seed=None):
# this is ported from scala/spark/RDD.scala
def takeSample(self, withReplacement, num, seed=None):
"""
Return a fixed-size sampled subset of this RDD (currently requires
numpy).
Return a fixed-size sampled subset of this RDD.
>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
Expand Down
84 changes: 22 additions & 62 deletions python/pyspark/rddsampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,76 +22,34 @@
class RDDSamplerBase(object):

def __init__(self, withReplacement, seed=None):
try:
import numpy
self._use_numpy = True
except ImportError:
print >> sys.stderr, (
"NumPy does not appear to be installed. "
"Falling back to default random generator for sampling.")
self._use_numpy = False

self._seed = seed if seed is not None else random.randint(0, 2 ** 32 - 1)
self._seed = seed if seed is not None else random.randint(0, sys.maxint)
self._withReplacement = withReplacement
self._random = None
self._split = None
self._rand_initialized = False

def initRandomGenerator(self, split):
if self._use_numpy:
import numpy
self._random = numpy.random.RandomState(self._seed ^ split)
else:
self._random = random.Random(self._seed ^ split)
self._random = random.Random(self._seed ^ split)

# mixing because the initial seeds are close to each other
for _ in xrange(10):
self._random.randint(0, 1)

self._split = split
self._rand_initialized = True

def getUniformSample(self, split):
if not self._rand_initialized or split != self._split:
self.initRandomGenerator(split)

if self._use_numpy:
return self._random.random_sample()
else:
return self._random.uniform(0.0, 1.0)

def getPoissonSample(self, split, mean):
if not self._rand_initialized or split != self._split:
self.initRandomGenerator(split)
def getUniformSample(self):
return self._random.random()

if self._use_numpy:
return self._random.poisson(mean)
else:
# here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by
# drawing a sequence of numbers delta_j ~ Exp(mean)
num_arrivals = 1
cur_time = 0.0
def getPoissonSample(self, mean):
# here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by
# drawing a sequence of numbers delta_j ~ Exp(mean)
num_arrivals = 0
cur_time = self._random.expovariate(mean)

while cur_time < 1.0:
cur_time += self._random.expovariate(mean)
num_arrivals += 1

if cur_time > 1.0:
return 0
return num_arrivals

while(cur_time <= 1.0):
cur_time += self._random.expovariate(mean)
num_arrivals += 1

return (num_arrivals - 1)

def shuffle(self, vals):
if self._random is None:
self.initRandomGenerator(0) # this should only ever called on the master so
# the split does not matter

if self._use_numpy:
self._random.shuffle(vals)
else:
self._random.shuffle(vals, self._random.random)
def func(self, split, iterator):
raise NotImplementedError


class RDDSampler(RDDSamplerBase):
Expand All @@ -101,31 +59,32 @@ def __init__(self, withReplacement, fraction, seed=None):
self._fraction = fraction

def func(self, split, iterator):
self.initRandomGenerator(split)
if self._withReplacement:
for obj in iterator:
# For large datasets, the expected number of occurrences of each element in
# a sample with replacement is Poisson(frac). We use that to get a count for
# each element.
count = self.getPoissonSample(split, mean=self._fraction)
count = self.getPoissonSample(self._fraction)
for _ in range(0, count):
yield obj
else:
for obj in iterator:
if self.getUniformSample(split) <= self._fraction:
if self.getUniformSample() <= self._fraction:
yield obj


class RDDRangeSampler(RDDSamplerBase):

def __init__(self, lowerBound, upperBound, seed=None):
RDDSamplerBase.__init__(self, False, seed)
self._use_numpy = False # no performance gain from numpy
self._lowerBound = lowerBound
self._upperBound = upperBound

def func(self, split, iterator):
self.initRandomGenerator(split)
for obj in iterator:
if self._lowerBound <= self.getUniformSample(split) < self._upperBound:
if self._lowerBound <= self.getUniformSample() < self._upperBound:
yield obj


Expand All @@ -136,15 +95,16 @@ def __init__(self, withReplacement, fractions, seed=None):
self._fractions = fractions

def func(self, split, iterator):
self.initRandomGenerator(split)
if self._withReplacement:
for key, val in iterator:
# For large datasets, the expected number of occurrences of each element in
# a sample with replacement is Poisson(frac). We use that to get a count for
# each element.
count = self.getPoissonSample(split, mean=self._fractions[key])
count = self.getPoissonSample(self._fractions[key])
for _ in range(0, count):
yield key, val
else:
for key, val in iterator:
if self.getUniformSample(split) <= self._fractions[key]:
if self.getUniformSample() <= self._fractions[key]:
yield key, val

0 comments on commit 51649f5

Please sign in to comment.