Skip to content

Commit

Permalink
[SPARK-3886] [PySpark] use AutoBatchedSerializer by default
Browse files Browse the repository at this point in the history
Use AutoBatchedSerializer by default, which will choose the proper batch size based on size of serialized objects, let the size of serialized batch fall in into  [64k - 640k].

In JVM, the serializer will also track the objects in batch to figure out duplicated objects, larger batch may cause OOM in JVM.

Author: Davies Liu <[email protected]>

Closes apache#2740 from davies/batchsize and squashes the following commits:

52cdb88 [Davies Liu] update docs
185f2b9 [Davies Liu] use AutoBatchedSerializer by default
  • Loading branch information
davies authored and JoshRosen committed Oct 10, 2014
1 parent 90f73fc commit 72f36ee
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
11 changes: 7 additions & 4 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, CompressedSerializer
PairDeserializer, CompressedSerializer, AutoBatchedSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call
Expand Down Expand Up @@ -67,7 +67,7 @@ class SparkContext(object):
_default_batch_size_for_serialized_input = 10

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
gateway=None):
"""
Create a new SparkContext. At least the master and app name should be set,
Expand All @@ -83,8 +83,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
:param environment: A dictionary of environment variables to set on
worker nodes.
:param batchSize: The number of Python objects represented as a single
Java object. Set 1 to disable batching or -1 to use an
unlimited batch size.
Java object. Set 1 to disable batching, 0 to automatically choose
the batch size based on object sizes, or -1 to use an unlimited
batch size
:param serializer: The serializer for RDDs.
:param conf: A L{SparkConf} object setting Spark properties.
:param gateway: Use an existing gateway and JVM, otherwise a new JVM
Expand Down Expand Up @@ -117,6 +118,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._unbatched_serializer = serializer
if batchSize == 1:
self.serializer = self._unbatched_serializer
elif batchSize == 0:
self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
else:
self.serializer = BatchedSerializer(self._unbatched_serializer,
batchSize)
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class AutoBatchedSerializer(BatchedSerializer):
Choose the size of batch automatically based on the size of object
"""

def __init__(self, serializer, bestSize=1 << 20):
def __init__(self, serializer, bestSize=1 << 16):
BatchedSerializer.__init__(self, serializer, -1)
self.bestSize = bestSize

Expand All @@ -247,7 +247,7 @@ def __eq__(self, other):
other.serializer == self.serializer)

def __str__(self):
return "BatchedSerializer<%s>" % str(self.serializer)
return "AutoBatchedSerializer<%s>" % str(self.serializer)


class CartesianDeserializer(FramedSerializer):
Expand Down

0 comments on commit 72f36ee

Please sign in to comment.