Skip to content

Commit

Permalink
sort: Allow ordering on a per key basis (#5203)
Browse files Browse the repository at this point in the history
Change the sort operator so that different ordering can be applied to
each key in the sort.

Closes #4945
  • Loading branch information
mattnibs authored Sep 3, 2024
1 parent b6d853f commit 17e513f
Show file tree
Hide file tree
Showing 43 changed files with 2,897 additions and 2,641 deletions.
14 changes: 10 additions & 4 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/brimdata/zed/compiler/parser"
"github.com/brimdata/zed/lakeparse"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/pkg/nano"
"github.com/brimdata/zed/zbuf"
"github.com/segmentio/ksuid"
Expand Down Expand Up @@ -36,10 +37,15 @@ type VersionResponse struct {
}

type PoolPostRequest struct {
Name string `json:"name"`
SortKey order.SortKey `json:"layout"`
SeekStride int `json:"seek_stride"`
Thresh int64 `json:"thresh"`
Name string `json:"name"`
SortKeys SortKeys `json:"layout"`
SeekStride int `json:"seek_stride"`
Thresh int64 `json:"thresh"`
}

type SortKeys struct {
Order order.Which `json:"order" zed:"order"`
Keys field.List `json:"keys" zed:"keys"`
}

type PoolPutRequest struct {
Expand Down
2 changes: 1 addition & 1 deletion cmd/zed/create/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c *Command) Run(args []string) error {
if err != nil {
return err
}
sortKey, err := order.ParseSortKey(c.sortKey)
sortKey, err := order.ParseSortKeys(c.sortKey)
if err != nil {
return err
}
Expand Down
39 changes: 23 additions & 16 deletions compiler/ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package ast

import (
astzed "github.com/brimdata/zed/compiler/ast/zed"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
)

Expand Down Expand Up @@ -466,11 +465,11 @@ type (
Rparen int `json:"rparen"`
}
Sort struct {
Kind string `json:"kind" unpack:""`
KeywordPos int `json:"keyword_pos"`
Args []Expr `json:"args"`
Order order.Which `json:"order"`
NullsFirst bool `json:"nullsfirst"`
Kind string `json:"kind" unpack:""`
KeywordPos int `json:"keyword_pos"`
Reverse bool `json:"reverse"`
NullsFirst bool `json:"nullsfirst"`
Args []SortExpr `json:"args"`
}
Cut struct {
Kind string `json:"kind" unpack:""`
Expand Down Expand Up @@ -638,19 +637,19 @@ type (

type (
File struct {
Kind string `json:"kind" unpack:""`
KeywordPos int `json:"keyword_pos"`
Path Pattern `json:"path"`
Format string `json:"format"`
SortKey *SortKey `json:"sort_key"`
EndPos int `json:"end_pos"`
Kind string `json:"kind" unpack:""`
KeywordPos int `json:"keyword_pos"`
Path Pattern `json:"path"`
Format string `json:"format"`
SortKeys []SortExpr `json:"sort_keys"`
EndPos int `json:"end_pos"`
}
HTTP struct {
Kind string `json:"kind" unpack:""`
KeywordPos int `json:"keyword_pos"`
URL Pattern `json:"url"`
Format string `json:"format"`
SortKey *SortKey `json:"sort_key"`
SortKeys []SortExpr `json:"sort_keys"`
Method string `json:"method"`
Headers *RecordExpr `json:"headers"`
Body string `json:"body"`
Expand Down Expand Up @@ -690,10 +689,18 @@ func (x *Pool) End() int { return x.EndPos }
func (x *File) End() int { return x.EndPos }
func (x *HTTP) End() int { return x.EndPos }

type SortKey struct {
type SortExpr struct {
Kind string `json:"kind" unpack:""`
Keys []Expr `json:"keys"`
Order string `json:"order"`
Expr Expr `json:"expr"`
Order *ID `json:"order"`
}

func (s SortExpr) Pos() int { return s.Expr.Pos() }
func (s SortExpr) End() int {
if s.Order != nil {
s.Order.End()
}
return s.Expr.End()
}

type Trunk struct {
Expand Down
6 changes: 6 additions & 0 deletions compiler/ast/dag/expr.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package dag

import "github.com/brimdata/zed/order"

type (
Expr interface {
ExprDAG()
Expand Down Expand Up @@ -117,6 +119,10 @@ type (
From Expr `json:"from"`
To Expr `json:"to"`
}
SortExpr struct {
Key Expr `json:"key"`
Order order.Which `json:"order"`
}
This struct {
Kind string `json:"kind" unpack:""`
Path []string `json:"path"`
Expand Down
38 changes: 19 additions & 19 deletions compiler/ast/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ type (
Kind string `json:"kind" unpack:""`
}
Sort struct {
Kind string `json:"kind" unpack:""`
Args []Expr `json:"args"`
Order order.Which `json:"order"`
NullsFirst bool `json:"nullsfirst"`
Kind string `json:"kind" unpack:""`
Args []SortExpr `json:"args"`
NullsFirst bool `json:"nullsfirst"`
Reverse bool `json:"reverse"`
}
Summarize struct {
Kind string `json:"kind" unpack:""`
Expand Down Expand Up @@ -202,25 +202,25 @@ type (

// DefaultScan scans an input stream provided by the runtime.
DefaultScan struct {
Kind string `json:"kind" unpack:""`
Filter Expr `json:"filter"`
SortKey order.SortKey `json:"sort_key"`
Kind string `json:"kind" unpack:""`
Filter Expr `json:"filter"`
SortKeys order.SortKeys `json:"sort_keys"`
}
FileScan struct {
Kind string `json:"kind" unpack:""`
Path string `json:"path"`
Format string `json:"format"`
SortKey order.SortKey `json:"sort_key"`
Filter Expr `json:"filter"`
Kind string `json:"kind" unpack:""`
Path string `json:"path"`
Format string `json:"format"`
SortKeys order.SortKeys `json:"sort_keys"`
Filter Expr `json:"filter"`
}
HTTPScan struct {
Kind string `json:"kind" unpack:""`
URL string `json:"url"`
Format string `json:"format"`
SortKey order.SortKey `json:"sort_key"`
Method string `json:"method"`
Headers map[string][]string `json:"headers"`
Body string `json:"body"`
Kind string `json:"kind" unpack:""`
URL string `json:"url"`
Format string `json:"format"`
SortKeys order.SortKeys `json:"sort_keys"`
Method string `json:"method"`
Headers map[string][]string `json:"headers"`
Body string `json:"body"`
}
PoolScan struct {
Kind string `json:"kind" unpack:""`
Expand Down
6 changes: 3 additions & 3 deletions compiler/data/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ func (s *Source) CommitObject(ctx context.Context, id ksuid.KSUID, name string)
return ksuid.Nil, nil
}

func (s *Source) SortKey(ctx context.Context, src dag.Op) order.SortKey {
func (s *Source) SortKeys(ctx context.Context, src dag.Op) order.SortKeys {
if s.lake != nil {
return s.lake.SortKey(ctx, src)
return s.lake.SortKeys(ctx, src)
}
return order.Nil
return nil
}

func (s *Source) Open(ctx context.Context, zctx *zed.Context, path, format string, pushdown zbuf.Filter, demandOut demand.Demand) (zbuf.Puller, error) {
Expand Down
10 changes: 2 additions & 8 deletions compiler/describe/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (*Path) Source() {}
type Channel struct {
Name string `json:"name"`
AggregationKeys field.List `json:"aggregation_keys"`
Sort *order.SortKey `json:"sort"`
Sort order.SortKeys `json:"sort"`
}

func Analyze(ctx context.Context, query string, src *data.Source, head *lakeparse.Commitish) (*Info, error) {
Expand Down Expand Up @@ -85,13 +85,7 @@ func AnalyzeDAG(ctx context.Context, entry dag.Seq, src *data.Source, head *lake
aggKeys := describeAggs(entry, []field.List{nil})
outputs := collectOutputs(entry)
m := make(map[string]int)
for i := range sortKeys {
// Convert SortKey to a pointer so a nil sort is encoded as null for
// JSON/ZSON.
var s *order.SortKey
if !sortKeys[i].IsNil() {
s = &sortKeys[i]
}
for i, s := range sortKeys {
name := outputs[i].Name
if k, ok := m[name]; ok {
// If output already exists, this means the outputs will be
Expand Down
15 changes: 9 additions & 6 deletions compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/brimdata/zed/compiler/optimizer"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/lake"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/runtime"
"github.com/brimdata/zed/runtime/sam/expr"
Expand Down Expand Up @@ -187,11 +186,15 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
return op.NewApplier(b.rctx, parent, dropper, expr.Resetters{}), nil
case *dag.Sort:
b.resetResetters()
fields, err := b.compileExprs(v.Args)
if err != nil {
return nil, err
var sortExprs []expr.SortEvaluator
for _, s := range v.Args {
k, err := b.compileExpr(s.Key)
if err != nil {
return nil, err
}
sortExprs = append(sortExprs, expr.NewSortEvaluator(k, s.Order))
}
sort, err := sort.New(b.rctx, parent, fields, v.Order, v.NullsFirst, b.resetters)
sort, err := sort.New(b.rctx, parent, sortExprs, v.NullsFirst, v.Reverse, b.resetters)
if err != nil {
return nil, fmt.Errorf("compiling sort: %w", err)
}
Expand Down Expand Up @@ -672,7 +675,7 @@ func (b *Builder) compile(o dag.Op, parents []zbuf.Puller) ([]zbuf.Puller, error
if err != nil {
return nil, err
}
cmp := expr.NewComparator(true, o.Order == order.Desc, e).WithMissingAsNull()
cmp := expr.NewComparator(true, expr.NewSortEvaluator(e, o.Order)).WithMissingAsNull()
return []zbuf.Puller{merge.New(b.rctx, parents, cmp.Compare, b.resetters)}, nil
case *dag.Combine:
return []zbuf.Puller{combine.New(b.rctx, parents)}, nil
Expand Down
Loading

0 comments on commit 17e513f

Please sign in to comment.