-
Notifications
You must be signed in to change notification settings - Fork 123
/
squash.go
188 lines (157 loc) · 4.04 KB
/
squash.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package gitbase
import (
"fmt"
"strings"
"github.com/src-d/go-mysql-server/sql"
)
// SquashedTable is a table that combines the output of some tables as the
// inputs of others with chaining so it's less expensive to compute.
type SquashedTable struct {
partitioned
iter ChainableIter
tables []string
schemaMappings []int
filters []sql.Expression
indexedTables []string
schema sql.Schema
}
// NewSquashedTable creates a new SquashedTable.
func NewSquashedTable(
iter ChainableIter,
mapping []int,
filters []sql.Expression,
indexedTables []string,
tables ...string,
) *SquashedTable {
return &SquashedTable{
iter: iter,
tables: tables,
schemaMappings: mapping,
filters: filters,
indexedTables: indexedTables,
}
}
var _ sql.Table = (*SquashedTable)(nil)
var _ sql.PartitionCounter = (*SquashedTable)(nil)
// Name implements the sql.Table interface.
func (t *SquashedTable) Name() string {
return fmt.Sprintf("SquashedTable(%s)", strings.Join(t.tables, ", "))
}
// Schema implements the sql.Table interface.
func (t *SquashedTable) Schema() sql.Schema {
if len(t.schemaMappings) == 0 {
return t.iter.Schema()
}
if t.schema == nil {
schema := t.iter.Schema()
t.schema = make(sql.Schema, len(schema))
for i, j := range t.schemaMappings {
t.schema[i] = schema[j]
}
}
return t.schema
}
// PartitionRows implements the sql.Table interface.
func (t *SquashedTable) PartitionRows(ctx *sql.Context, p sql.Partition) (sql.RowIter, error) {
span, ctx := ctx.Span("gitbase.SquashedTable")
session, err := getSession(ctx)
if err != nil {
return nil, err
}
repo, err := getPartitionRepo(ctx, p)
if err != nil {
span.Finish()
if session.SkipGitErrors {
return noRows, nil
}
return nil, err
}
iter, err := t.iter.New(ctx, repo)
if err != nil {
span.Finish()
return nil, errorWithRepo(repo, err)
}
if len(t.schemaMappings) == 0 {
return sql.NewSpanIter(
span,
newRepoRowIter(repo, NewChainableRowIter(iter)),
), nil
}
return sql.NewSpanIter(
span,
newRepoRowIter(
repo,
NewSchemaMapperIter(NewChainableRowIter(iter), t.schemaMappings),
),
), nil
}
func (t *SquashedTable) String() string {
s := t.Schema()
cp := sql.NewTreePrinter()
_ = cp.WriteNode("Columns")
var schema = make([]string, len(s))
for i, col := range s {
schema[i] = fmt.Sprintf(
"Column(%s, %s, nullable=%v)",
col.Name,
col.Type.Type().String(),
col.Nullable,
)
}
_ = cp.WriteChildren(schema...)
fp := sql.NewTreePrinter()
_ = fp.WriteNode("Filters")
var filters = make([]string, len(t.filters))
for i, f := range t.filters {
filters[i] = f.String()
}
_ = fp.WriteChildren(filters...)
children := []string{cp.String(), fp.String()}
if len(t.indexedTables) > 0 {
ip := sql.NewTreePrinter()
_ = ip.WriteNode("IndexedTables")
_ = ip.WriteChildren(t.indexedTables...)
children = append(children, ip.String())
}
p := sql.NewTreePrinter()
_ = p.WriteNode("SquashedTable(%s)", strings.Join(t.tables, ", "))
_ = p.WriteChildren(children...)
return p.String()
}
type chainableRowIter struct {
ChainableIter
}
// NewChainableRowIter converts a ChainableIter into a sql.RowIter.
func NewChainableRowIter(iter ChainableIter) sql.RowIter {
return &chainableRowIter{iter}
}
func (i *chainableRowIter) Next() (sql.Row, error) {
if err := i.Advance(); err != nil {
return nil, err
}
return i.Row(), nil
}
type schemaMapperIter struct {
iter sql.RowIter
mappings []int
}
// NewSchemaMapperIter reorders the rows in the given row iter according to the
// given column mappings.
func NewSchemaMapperIter(iter sql.RowIter, mappings []int) sql.RowIter {
return &schemaMapperIter{iter, mappings}
}
func (i schemaMapperIter) Next() (sql.Row, error) {
childRow, err := i.iter.Next()
if err != nil {
return nil, err
}
if len(i.mappings) == 0 {
return childRow, nil
}
var row = make(sql.Row, len(i.mappings))
for i, j := range i.mappings {
row[i] = childRow[j]
}
return row, nil
}
func (i schemaMapperIter) Close() error { return i.iter.Close() }