Skip to content

Commit

Permalink
use AutoBatchedSerializer by default
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Oct 9, 2014
1 parent 4e9b551 commit 185f2b9
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
11 changes: 7 additions & 4 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, CompressedSerializer
PairDeserializer, CompressedSerializer, AutoBatchedSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call
Expand Down Expand Up @@ -67,7 +67,7 @@ class SparkContext(object):
_default_batch_size_for_serialized_input = 10

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
gateway=None):
"""
Create a new SparkContext. At least the master and app name should be set,
Expand All @@ -83,8 +83,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
:param environment: A dictionary of environment variables to set on
worker nodes.
:param batchSize: The number of Python objects represented as a single
Java object. Set 1 to disable batching or -1 to use an
unlimited batch size.
Java object. Set 1 to disable batching, or 0 to choose batch size
based on size of objects automaticly, or -1 to use an unlimited
batch size.
:param serializer: The serializer for RDDs.
:param conf: A L{SparkConf} object setting Spark properties.
:param gateway: Use an existing gateway and JVM, otherwise a new JVM
Expand Down Expand Up @@ -117,6 +118,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._unbatched_serializer = serializer
if batchSize == 1:
self.serializer = self._unbatched_serializer
elif batchSize == 0:
self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
else:
self.serializer = BatchedSerializer(self._unbatched_serializer,
batchSize)
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class AutoBatchedSerializer(BatchedSerializer):
Choose the size of batch automatically based on the size of object
"""

def __init__(self, serializer, bestSize=1 << 20):
def __init__(self, serializer, bestSize=1 << 16):
BatchedSerializer.__init__(self, serializer, -1)
self.bestSize = bestSize

Expand All @@ -247,7 +247,7 @@ def __eq__(self, other):
other.serializer == self.serializer)

def __str__(self):
return "BatchedSerializer<%s>" % str(self.serializer)
return "AutoBatchedSerializer<%s>" % str(self.serializer)


class CartesianDeserializer(FramedSerializer):
Expand Down

0 comments on commit 185f2b9

Please sign in to comment.