Skip to content

Commit

Permalink
feat(flow): accumlator for aggr func (#3396)
Browse files Browse the repository at this point in the history
* feat: Accumlator trait

* feat: add `OrdValue` accum&use enum_dispatch

* test: more accum test

* feat: eval aggr funcs

* chore: refactor test&fmt clippy

* refactor: less verbose

* test: more tests

* refactor: better err handling&use OrdValue for Count

* refactor: ignore null&more tests for error handle

* refactor: OrdValue accum

* chore: extract null check

* refactor: def&use fn signature

* chore: use extra cond with match guard

* chore: per review
  • Loading branch information
discord9 authored Mar 12, 2024
1 parent 1255c1f commit 7639c22
Show file tree
Hide file tree
Showing 6 changed files with 1,187 additions and 31 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ common-query.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datatypes.workspace = true
enum_dispatch = "0.3"
hydroflow = "0.5.0"
itertools.workspace = true
num-traits = "0.2"
Expand All @@ -27,3 +28,6 @@ session.workspace = true
snafu.workspace = true
tokio.workspace = true
tonic.workspace = true

[dev-dependencies]
serde_json = "1.0"
3 changes: 3 additions & 0 deletions src/flow/src/expr/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,7 @@ pub enum EvalError {

#[snafu(display("Unsupported temporal filter: {reason}"))]
UnsupportedTemporalFilter { reason: String, location: Location },

#[snafu(display("Overflowed during evaluation"))]
Overflow { location: Location },
}
Loading

0 comments on commit 7639c22

Please sign in to comment.