diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 6fb30d65c5edd..85c04624da4a6 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -29,7 +29,7 @@ from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ - PairDeserializer, CompressedSerializer + PairDeserializer, CompressedSerializer, AutoBatchedSerializer from pyspark.storagelevel import StorageLevel from pyspark.rdd import RDD from pyspark.traceback_utils import CallSite, first_spark_call @@ -67,7 +67,7 @@ class SparkContext(object): _default_batch_size_for_serialized_input = 10 def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, - environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, + environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None): """ Create a new SparkContext. At least the master and app name should be set, @@ -83,8 +83,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, :param environment: A dictionary of environment variables to set on worker nodes. :param batchSize: The number of Python objects represented as a single - Java object. Set 1 to disable batching or -1 to use an - unlimited batch size. + Java object. Set 1 to disable batching, 0 to automatically choose + the batch size based on object sizes, or -1 to use an unlimited + batch size :param serializer: The serializer for RDDs. :param conf: A L{SparkConf} object setting Spark properties. :param gateway: Use an existing gateway and JVM, otherwise a new JVM @@ -117,6 +118,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._unbatched_serializer = serializer if batchSize == 1: self.serializer = self._unbatched_serializer + elif batchSize == 0: + self.serializer = AutoBatchedSerializer(self._unbatched_serializer) else: self.serializer = BatchedSerializer(self._unbatched_serializer, batchSize) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 099fa54cf2bd7..3d1a34b281acc 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -220,7 +220,7 @@ class AutoBatchedSerializer(BatchedSerializer): Choose the size of batch automatically based on the size of object """ - def __init__(self, serializer, bestSize=1 << 20): + def __init__(self, serializer, bestSize=1 << 16): BatchedSerializer.__init__(self, serializer, -1) self.bestSize = bestSize @@ -247,7 +247,7 @@ def __eq__(self, other): other.serializer == self.serializer) def __str__(self): - return "BatchedSerializer<%s>" % str(self.serializer) + return "AutoBatchedSerializer<%s>" % str(self.serializer) class CartesianDeserializer(FramedSerializer):