Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3886] [PySpark] use AutoBatchedSerializer by default #2740

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, CompressedSerializer
PairDeserializer, CompressedSerializer, AutoBatchedSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call
Expand Down Expand Up @@ -67,7 +67,7 @@ class SparkContext(object):
_default_batch_size_for_serialized_input = 10

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
gateway=None):
"""
Create a new SparkContext. At least the master and app name should be set,
Expand All @@ -83,8 +83,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
:param environment: A dictionary of environment variables to set on
worker nodes.
:param batchSize: The number of Python objects represented as a single
Java object. Set 1 to disable batching or -1 to use an
unlimited batch size.
Java object. Set 1 to disable batching, or 0 to choose batch size
based on size of objects automaticly, or -1 to use an unlimited
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling: automatically. How about "Set 1 to disable batching, 0 to automatically choose the batch size based on object sizes, or -1 to use an unlimited batch size"?

batch size.
:param serializer: The serializer for RDDs.
:param conf: A L{SparkConf} object setting Spark properties.
:param gateway: Use an existing gateway and JVM, otherwise a new JVM
Expand Down Expand Up @@ -117,6 +118,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._unbatched_serializer = serializer
if batchSize == 1:
self.serializer = self._unbatched_serializer
elif batchSize == 0:
self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
else:
self.serializer = BatchedSerializer(self._unbatched_serializer,
batchSize)
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class AutoBatchedSerializer(BatchedSerializer):
Choose the size of batch automatically based on the size of object
"""

def __init__(self, serializer, bestSize=1 << 20):
def __init__(self, serializer, bestSize=1 << 16):
BatchedSerializer.__init__(self, serializer, -1)
self.bestSize = bestSize

Expand All @@ -247,7 +247,7 @@ def __eq__(self, other):
other.serializer == self.serializer)

def __str__(self):
return "BatchedSerializer<%s>" % str(self.serializer)
return "AutoBatchedSerializer<%s>" % str(self.serializer)


class CartesianDeserializer(FramedSerializer):
Expand Down