From c880a3314c1bfdfa1c2e87dc356baf6770e66eed Mon Sep 17 00:00:00 2001 From: giwa Date: Mon, 4 Aug 2014 09:57:16 -0700 Subject: [PATCH] update comment --- python/pyspark/streaming/dstream.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index c5452b952cac4..37f625e2806e9 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from collections import defaultdict from itertools import chain, ifilter, imap import operator @@ -20,11 +37,13 @@ def __init__(self, jdstream, ssc, jrdd_deserializer): def count(self): """ + Return a new DStream which contains the number of elements in this DStream. """ return self._mapPartitions(lambda i: [sum(1 for _ in i)])._sum() def _sum(self): """ + Add up the elements in this DStream. """ return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add) @@ -41,7 +60,7 @@ def print_(self): def filter(self, f): """ - Return DStream containing only the elements that satisfy predicate. + Return a new DStream containing only the elements that satisfy predicate. """ def func(iterator): return ifilter(f, iterator) return self._mapPartitions(func) @@ -56,7 +75,7 @@ def func(s, iterator): return chain.from_iterable(imap(f, iterator)) def map(self, f): """ - Return DStream by applying a function to each element of DStream. + Return a new DStream by applying a function to each element of DStream. """ def func(iterator): return imap(f, iterator) return self._mapPartitions(func) @@ -71,12 +90,14 @@ def func(s, iterator): return f(iterator) def _mapPartitionsWithIndex(self, f, preservesPartitioning=False): """ Return a new DStream by applying a function to each partition of this DStream, - While tracking the index of the original partition. + while tracking the index of the original partition. """ return PipelinedDStream(self, f, preservesPartitioning) def reduce(self, func): """ + Return a new DStream by reduceing the elements of this RDD using the specified + commutative and associative binary operator. """ return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1]) @@ -267,4 +288,3 @@ def _jdstream(self): def _is_pipelinable(self): return not (self.is_cached) -