Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix intersection checking when unioning schemas #3039

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions daft/logical/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,11 @@ def _truncated_table_string(self) -> str:
def apply_hints(self, hints: Schema) -> Schema:
return Schema._from_pyschema(self._schema.apply_hints(hints._schema))

# Takes the unions between two schemas. Throws an error if the schemas contain overlapping keys.
def union(self, other: Schema) -> Schema:
if not isinstance(other, Schema):
raise ValueError(f"Expected Schema, got other: {type(other)}")

intersecting_names = self.to_name_set().intersection(other.to_name_set())
if intersecting_names:
raise ValueError(f"Cannot union schemas with overlapping names: {intersecting_names}")

return Schema._from_pyschema(self._schema.union(other._schema))

def __reduce__(self) -> tuple:
Expand Down
8 changes: 4 additions & 4 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ pub fn read_csv_into_micropartition(
let unioned_schema = tables
.iter()
.map(|tbl| tbl.schema.clone())
.try_reduce(|s1, s2| s1.union(s2.as_ref()).map(Arc::new))?
.reduce(|s1, s2| Arc::new(s1.non_distinct_union(s2.as_ref())))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we have two tables that we are unioning that have common columns? Just want to make sure that this non-distinct union is the correct behavior.

Copy link
Contributor Author

@desmondcheongzx desmondcheongzx Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really only gets used in the MicroPartition API for reading multiple parquet files. E.g. daft.table.MicroPartition.read_parquet_bulk(["file1.parquet", "file2.parquet"])

Here both files can have the same columns.

The other MicroPartition APIs for read_parquet, read_csv, and read_json are non-concerns because they only ever take in one uri. But the read_{csv, json, parquet}_into_micropartition functions they call under the hood take in a slice of uris and can run into the same problem that read_parquet_bulk currently does. As of today there are no other users besides read_parquet_bulk that read more than one uri.

FWIW I believe the original authors (@jaychia and @clarkzinzow) intended to use the semantics of a non-distinct union. But I'm not 100% sure why we would bother with the cases where the schemas were mismatched---I imagine this would quickly blow up elsewhere.

.unwrap();
let tables = tables
.into_iter()
Expand Down Expand Up @@ -919,7 +919,7 @@ pub fn read_json_into_micropartition(
let unioned_schema = tables
.iter()
.map(|tbl| tbl.schema.clone())
.try_reduce(|s1, s2| s1.union(s2.as_ref()).map(Arc::new))?
.reduce(|s1, s2| Arc::new(s1.non_distinct_union(s2.as_ref())))
.unwrap();
let tables = tables
.into_iter()
Expand Down Expand Up @@ -1082,7 +1082,7 @@ fn _read_parquet_into_loaded_micropartition<T: AsRef<str>>(
let unioned_schema = all_tables
.iter()
.map(|t| t.schema.clone())
.try_reduce(|l, r| DaftResult::Ok(l.union(&r)?.into()))?;
.reduce(|l, r| l.non_distinct_union(&r).into());
unioned_schema.expect("we need at least 1 schema")
};

Expand Down Expand Up @@ -1231,7 +1231,7 @@ pub fn read_parquet_into_micropartition<T: AsRef<str>>(
} else {
let unioned_schema = schemas
.into_iter()
.try_reduce(|l, r| l.union(&r).map(Arc::new))?;
.reduce(|l, r| Arc::new(l.non_distinct_union(&r)));
unioned_schema.expect("we need at least 1 schema")
};

Expand Down
33 changes: 22 additions & 11 deletions src/daft-schema/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,32 @@ impl Schema {
self.fields.is_empty()
}

/// Takes the disjoint union over the `self` and `other` schemas, throwing an error if the
/// schemas contain overlapping keys.
pub fn union(&self, other: &Self) -> DaftResult<Self> {
let self_keys: HashSet<&String> = HashSet::from_iter(self.fields.keys());
let other_keys: HashSet<&String> = HashSet::from_iter(self.fields.keys());
match self_keys.difference(&other_keys).count() {
0 => {
let mut fields = IndexMap::new();
for (k, v) in self.fields.iter().chain(other.fields.iter()) {
fields.insert(k.clone(), v.clone());
}
Ok(Self { fields })
let other_keys: HashSet<&String> = HashSet::from_iter(other.fields.keys());
if self_keys.is_disjoint(&other_keys) {
let mut fields = IndexMap::new();
for (k, v) in self.fields.iter().chain(other.fields.iter()) {
fields.insert(k.clone(), v.clone());
}
desmondcheongzx marked this conversation as resolved.
Show resolved Hide resolved
_ => Err(DaftError::ValueError(
"Cannot union two schemas with overlapping keys".to_string(),
)),
Ok(Self { fields })
} else {
Err(DaftError::ValueError(
"Cannot disjoint union two schemas with overlapping keys".to_string(),
))
}
}

/// Takes the non-distinct union of two schemas. If there are overlapping keys, then we take the
/// corresponding field from one of the two schemas.
pub fn non_distinct_union(&self, other: &Self) -> Self {
let mut fields = IndexMap::new();
for (k, v) in self.fields.iter().chain(other.fields.iter()) {
fields.insert(k.clone(), v.clone());
}
desmondcheongzx marked this conversation as resolved.
Show resolved Hide resolved
Self { fields }
}

pub fn apply_hints(&self, hints: &Self) -> DaftResult<Self> {
Expand Down
Loading