Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35929][PYTHON] Support to infer nested dict as a struct when creating a DataFrame #33214

Closed
wants to merge 9 commits into from
6 changes: 6 additions & 0 deletions python/pyspark/sql/tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ def test_infer_nested_schema(self):
df = self.spark.createDataFrame(nestedRdd2)
self.assertEqual(Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), df.collect()[0])

with self.sql_conf({"spark.sql.pyspark.inferNestedStructByMap": False}):
itholic marked this conversation as resolved.
Show resolved Hide resolved
nestedRdd3 = self.sc.parallelize([NestedRow([{"payment": 200.5, "name": "A"}], [1, 2]),
NestedRow([{"payment": 100.5, "name": "B"}], [2, 3])])
df = self.spark.createDataFrame(nestedRdd3)
self.assertEqual(Row(f1=[Row(payment=200.5, name='A')], f2=[1, 2]), df.collect()[0])

from collections import namedtuple
CustomRow = namedtuple('CustomRow', 'field1 field2')
rdd = self.sc.parallelize([CustomRow(field1=1, field2="row1"),
Expand Down
17 changes: 13 additions & 4 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,10 +1020,19 @@ def _infer_type(obj):
return dataType()

if isinstance(obj, dict):
for key, value in obj.items():
if key is not None and value is not None:
return MapType(_infer_type(key), _infer_type(value), True)
return MapType(NullType(), NullType(), True)
from pyspark.sql.session import SparkSession
if (SparkSession._activeSession.conf.get(
"spark.sql.pyspark.inferNestedStructByMap").lower() == "true"):
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
for key, value in obj.items():
if key is not None and value is not None:
return MapType(_infer_type(key), _infer_type(value), True)
return MapType(NullType(), NullType(), True)
else:
struct = StructType()
for key, value in obj.items():
if key is not None and value is not None:
struct.add(key, _infer_type(value), True)
return struct
elif isinstance(obj, list):
for v in obj:
if v is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3327,6 +3327,13 @@ object SQLConf {
.intConf
.createWithDefault(0)

val INFER_NESTED_STRUCT_BY_MAP = buildConf("spark.sql.pyspark.inferNestedStructByMap")
itholic marked this conversation as resolved.
Show resolved Hide resolved
.internal()
itholic marked this conversation as resolved.
Show resolved Hide resolved
.doc("When set to false, inferring the nested struct by StructType. MapType is default.")
itholic marked this conversation as resolved.
Show resolved Hide resolved
.version("3.2.0")
itholic marked this conversation as resolved.
Show resolved Hide resolved
.booleanConf
.createWithDefault(true)
itholic marked this conversation as resolved.
Show resolved Hide resolved

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -4040,6 +4047,8 @@ class SQLConf extends Serializable with Logging {

def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)

def inferNestedStructByMap: Boolean = getConf(SQLConf.INFER_NESTED_STRUCT_BY_MAP)
itholic marked this conversation as resolved.
Show resolved Hide resolved

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down