Skip to content

Commit

Permalink
Merge pull request #52 from paradigmxyz/feature/column_inc
Browse files Browse the repository at this point in the history
Feature/column inc
  • Loading branch information
sslivkoff authored Aug 30, 2023
2 parents be6de0c + 61715ff commit cabe7b0
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 42 deletions.
51 changes: 51 additions & 0 deletions crates/cli/src/parse/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,57 @@ fn parse_schemas(args: &Args) -> Result<HashMap<Datatype, Table>, ParseError> {
})
})
.collect();

// make sure all included columns ended up in at least one schema
if let (Ok(schemas), Some(include_columns)) = (&schemas, &args.include_columns) {
let mut unknown_columns = Vec::new();
for column in include_columns.iter() {
let mut in_a_schema = false;

for schema in schemas.values() {
if schema.has_column(column) {
in_a_schema = true;
break
}
}

if !in_a_schema {
unknown_columns.push(column);
}
}
if !unknown_columns.is_empty() {
return Err(ParseError::ParseError(format!(
"datatypes do not support these columns: {:?}",
unknown_columns
)))
}
};

// make sure all excluded columns are excluded from at least one schema
if let (Ok(schemas), Some(exclude_columns)) = (&schemas, &args.exclude_columns) {
let mut unknown_columns = Vec::new();
for column in exclude_columns.iter() {
let mut in_a_schema = false;

for datatype in schemas.keys() {
if datatype.dataset().column_types().contains_key(&column.as_str()) {
in_a_schema = true;
break
}
}

if !in_a_schema {
unknown_columns.push(column);
}
}
if !unknown_columns.is_empty() {
return Err(ParseError::ParseError(format!(
"datatypes do not support these columns: {:?}",
unknown_columns
)))
}
};

schemas
}

Expand Down
2 changes: 1 addition & 1 deletion crates/freeze/src/types/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub enum CollectError {
#[derive(Error, Debug)]
pub enum ParseError {
/// Error related to parsing
#[error("Parsing error {0:?}")]
#[error("Parsing error: {0}")]
ParseError(String),

/// Error related to provider operations
Expand Down
158 changes: 117 additions & 41 deletions crates/freeze/src/types/schemas.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::collections::HashSet;

use indexmap::IndexMap;
use indexmap::{IndexMap, IndexSet};
use thiserror::Error;

use crate::types::{ColumnEncoding, Datatype};
Expand Down Expand Up @@ -93,9 +91,15 @@ impl Datatype {
sort: Option<Vec<String>>,
) -> Result<Table, SchemaError> {
let column_types = self.dataset().column_types();
let all_columns = column_types.keys().map(|k| k.to_string()).collect();
let default_columns = self.dataset().default_columns();
let used_columns =
compute_used_columns(default_columns, include_columns, exclude_columns, columns, self);
let used_columns = compute_used_columns(
all_columns,
default_columns,
include_columns,
exclude_columns,
columns,
);
let mut columns = IndexMap::new();
for column in used_columns {
let mut ctype = column_types.get(column.as_str()).ok_or(SchemaError::InvalidColumn)?;
Expand All @@ -110,47 +114,119 @@ impl Datatype {
}

fn compute_used_columns(
all_columns: IndexSet<String>,
default_columns: Vec<&str>,
include_columns: &Option<Vec<String>>,
exclude_columns: &Option<Vec<String>>,
columns: &Option<Vec<String>>,
datatype: &Datatype,
) -> Vec<String> {
match (columns, include_columns, exclude_columns) {
(Some(columns), _, _) if ((columns.len() == 1) & columns.contains(&"all".to_string())) => {
datatype.dataset().column_types().keys().map(|k| k.to_string()).collect()
}
(Some(columns), _, _) => columns.iter().map(|x| x.to_string()).collect(),
(_, Some(include), _) if ((include.len() == 1) & include.contains(&"all".to_string())) => {
datatype.dataset().column_types().keys().map(|k| k.to_string()).collect()
}
(_, Some(include), Some(exclude)) => {
let mut result: Vec<String> = default_columns.iter().map(|s| s.to_string()).collect();
let mut result_set: HashSet<String> = result.iter().cloned().collect();
let exclude_set: HashSet<String> = exclude.iter().cloned().collect();
include
.iter()
.filter(|item| !exclude_set.contains(*item) && result_set.insert(item.to_string()))
.for_each(|item| result.push(item.clone()));
result
) -> IndexSet<String> {
if let Some(columns) = columns {
if (columns.len() == 1) & columns.contains(&"all".to_string()) {
return all_columns
}
(_, Some(include), None) => {
let mut result: Vec<String> = default_columns.iter().map(|s| s.to_string()).collect();
let mut result_set: HashSet<String> = result.iter().cloned().collect();
include
.iter()
.filter(|item| result_set.insert(item.to_string()))
.for_each(|item| result.push(item.clone()));
result
}
(_, None, Some(exclude)) => {
let exclude_set: HashSet<_> = exclude.iter().collect();
default_columns
.into_iter()
.filter(|s| !exclude_set.contains(&s.to_string()))
.map(|s| s.to_string())
.collect()
return columns.iter().map(|x| x.to_string()).collect()
}
let mut result_set = IndexSet::from_iter(default_columns.iter().map(|s| s.to_string()));
if let Some(include) = include_columns {
if (include.len() == 1) & include.contains(&"all".to_string()) {
return all_columns
}
(_, None, None) => default_columns.iter().map(|s| s.to_string()).collect(),
// Permissively skip `include` columns that are not in this dataset (they might apply to
// other dataset)
result_set.extend(include.iter().cloned());
result_set = result_set.intersection(&all_columns).cloned().collect()
}
if let Some(exclude) = exclude_columns {
let exclude_set = IndexSet::<String>::from_iter(exclude.iter().cloned());
result_set = result_set.difference(&exclude_set).cloned().collect()
}
result_set
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_table_schema_explicit_cols() {
let cols = Some(vec!["number".to_string(), "hash".to_string()]);
let table =
Datatype::Blocks.table_schema(&ColumnEncoding::Hex, &None, &None, &cols, None).unwrap();
assert_eq!(vec!["number", "hash"], table.columns());

// "all" marker support
let cols = Some(vec!["all".to_string()]);
let table =
Datatype::Blocks.table_schema(&ColumnEncoding::Hex, &None, &None, &cols, None).unwrap();
assert_eq!(15, table.columns().len());
assert!(table.columns().contains(&"hash"));
assert!(table.columns().contains(&"transactions_root"));
}

#[test]
fn test_table_schema_include_cols() {
let inc_cols = Some(vec!["chain_id".to_string(), "receipts_root".to_string()]);
let table = Datatype::Blocks
.table_schema(&ColumnEncoding::Hex, &inc_cols, &None, &None, None)
.unwrap();
assert_eq!(9, table.columns().len());
assert_eq!(["chain_id", "receipts_root"], table.columns()[7..9]);

// Non-existing include is skipped
let inc_cols = Some(vec!["chain_id".to_string(), "foo_bar".to_string()]);
let table = Datatype::Blocks
.table_schema(&ColumnEncoding::Hex, &inc_cols, &None, &None, None)
.unwrap();
assert_eq!(Some(&"chain_id"), table.columns().last());
assert!(!table.columns().contains(&"foo_bar"));

// "all" marker support
let inc_cols = Some(vec!["all".to_string()]);
let table = Datatype::Blocks
.table_schema(&ColumnEncoding::Hex, &inc_cols, &None, &None, None)
.unwrap();
assert_eq!(15, table.columns().len());
assert!(table.columns().contains(&"hash"));
assert!(table.columns().contains(&"transactions_root"));
}

#[test]
fn test_table_schema_exclude_cols() {
// defaults
let table =
Datatype::Blocks.table_schema(&ColumnEncoding::Hex, &None, &None, &None, None).unwrap();
assert_eq!(7, table.columns().len());
assert!(table.columns().contains(&"author"));
assert!(table.columns().contains(&"extra_data"));

let ex_cols = Some(vec!["author".to_string(), "extra_data".to_string()]);
let table = Datatype::Blocks
.table_schema(&ColumnEncoding::Hex, &None, &ex_cols, &None, None)
.unwrap();
assert_eq!(5, table.columns().len());
assert!(!table.columns().contains(&"author"));
assert!(!table.columns().contains(&"extra_data"));

// Non-existing exclude is ignored
let ex_cols = Some(vec!["timestamp".to_string(), "foo_bar".to_string()]);
let table = Datatype::Blocks
.table_schema(&ColumnEncoding::Hex, &None, &ex_cols, &None, None)
.unwrap();
assert_eq!(6, table.columns().len());
assert!(!table.columns().contains(&"timestamp"));
assert!(!table.columns().contains(&"foo_bar"));
}

#[test]
fn test_table_schema_include_and_exclude_cols() {
let inc_cols = Some(vec!["chain_id".to_string(), "receipts_root".to_string()]);
let ex_cols = Some(vec!["author".to_string(), "extra_data".to_string()]);
let table = Datatype::Blocks
.table_schema(&ColumnEncoding::Hex, &inc_cols, &ex_cols, &None, None)
.unwrap();
assert!(!table.columns().contains(&"author"));
assert!(!table.columns().contains(&"extra_data"));
assert_eq!(7, table.columns().len());
assert_eq!(["chain_id", "receipts_root"], table.columns()[5..7]);
}
}

0 comments on commit cabe7b0

Please sign in to comment.