-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35929][PYTHON] Support to infer nested dict as a struct when creating a DataFrame #33214
Conversation
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #140644 has finished for PR 33214 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
Test build #140689 has finished for PR 33214 at commit
|
Kubernetes integration test starting |
Kubernetes integration test starting |
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
Kubernetes integration test status success |
Can you also update the PR description? |
Kubernetes integration test status success |
Test build #140691 has finished for PR 33214 at commit
|
Test build #140706 has finished for PR 33214 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status success |
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good otherwise
Thanks! :) |
for key, value in obj.items(): | ||
if key is not None and value is not None: | ||
return MapType(_infer_type(key, infer_dict_as_struct), | ||
_infer_type(value, infer_dict_as_struct), True) | ||
return MapType(NullType(), NullType(), True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to log warning if inferred value types are not inconsistent? We can recommend users to use the config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment! :)
Actually PySpark merging one only handles null cases only (that's called out here) at
spark/python/pyspark/sql/types.py
Lines 1096 to 1133 in 52a9a70
def _merge_type(a, b, name=None): | |
if name is None: | |
new_msg = lambda msg: msg | |
new_name = lambda n: "field %s" % n | |
else: | |
new_msg = lambda msg: "%s: %s" % (name, msg) | |
new_name = lambda n: "field %s in %s" % (n, name) | |
if isinstance(a, NullType): | |
return b | |
elif isinstance(b, NullType): | |
return a | |
elif type(a) is not type(b): | |
# TODO: type cast (such as int -> long) | |
raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b)))) | |
# same type | |
if isinstance(a, StructType): | |
nfs = dict((f.name, f.dataType) for f in b.fields) | |
fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()), | |
name=new_name(f.name))) | |
for f in a.fields] | |
names = set([f.name for f in fields]) | |
for n in nfs: | |
if n not in names: | |
fields.append(StructField(n, nfs[n])) | |
return StructType(fields) | |
elif isinstance(a, ArrayType): | |
return ArrayType(_merge_type(a.elementType, b.elementType, | |
name='element in array %s' % name), True) | |
elif isinstance(a, MapType): | |
return MapType(_merge_type(a.keyType, b.keyType, name='key of map %s' % name), | |
_merge_type(a.valueType, b.valueType, name='value of map %s' % name), | |
True) | |
else: | |
return a |
It actually fails for different types (unlike JSON or CSV type inference).
I am not sure what's the ideal behavior for the null case pointed out here though.
Let me separate it from this PR in any event if you're fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay to me. Just one minor suggestion.
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #140719 has finished for PR 33214 at commit
|
Test build #140723 has finished for PR 33214 at commit
|
Merged to master |
What changes were proposed in this pull request?
Currently, inferring nested structs is always using
MapType
.This behavior causes an issue because it infers the schema with a value type of the first field of the struct as below:
The "name" became
null
, but it should've been"Lee"
.In this case, we need to be able to infer the schema with a
StructType
instead of aMapType
.Therefore, this PR proposes adding an new configuration
spark.sql.pyspark.inferNestedDictAsStruct.enabled
to handle which type is used for inferring nested structs.spark.sql.pyspark.inferNestedDictAsStruct.enabled
isfalse
(by default), inferring nested structs byMapType
spark.sql.pyspark.inferNestedDictAsStruct.enabled
istrue
, inferring nested structs byStructType
Why are the changes needed?
Because always inferring the nested structs by
MapType
doesn't work properly for some cases.Does this PR introduce any user-facing change?
New configuration
spark.sql.pyspark.inferNestedDictAsStruct.enabled
is added.How was this patch tested?
Added an unit test