-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
parquet.go
134 lines (113 loc) · 4.15 KB
/
parquet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package changefeedccl
import (
"io"
"strings"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
)
type parquetWriter struct {
inner *parquet.Writer
datumAlloc []tree.Datum
}
// newParquetSchemaDefintion returns a parquet schema definition based on the
// cdcevent.Row and the number of cols in the schema.
func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) {
numCols := len(row.ResultColumns()) + 1
columnNames := make([]string, numCols)
columnTypes := make([]*types.T, numCols)
idx := 0
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
columnNames[idx] = col.Name
columnTypes[idx] = col.Typ
idx += 1
return nil
}); err != nil {
return nil, 0, err
}
columnNames[idx] = parquetCrdbEventTypeColName
columnTypes[idx] = types.String
schemaDef, err := parquet.NewSchema(columnNames, columnTypes)
if err != nil {
return nil, 0, err
}
return schemaDef, numCols, nil
}
// newParquetWriterFromRow constructs a new parquet writer which outputs to
// the given sink. This function interprets the schema from the supplied row.
func newParquetWriterFromRow(
row cdcevent.Row, sink io.Writer, opts ...parquet.Option,
) (*parquetWriter, error) {
schemaDef, numCols, err := newParquetSchemaDefintion(row)
if err != nil {
return nil, err
}
writerConstructor := parquet.NewWriter
if includeParquetTestMetadata {
if opts, err = addCDCTestMetadata(row, opts); err != nil {
return nil, err
}
// To use parquet test utils for reading datums, the writer needs to be
// configured with additional metadata.
writerConstructor = parquet.NewWriterWithReaderMeta
}
writer, err := writerConstructor(schemaDef, sink, opts...)
if err != nil {
return nil, err
}
return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, numCols)}, nil
}
// addData writes the updatedRow, adding the row's event type. There is no guarantee
// that data will be flushed after this function returns.
func (w *parquetWriter) addData(updatedRow cdcevent.Row, prevRow cdcevent.Row) error {
if err := populateDatums(updatedRow, prevRow, w.datumAlloc); err != nil {
return err
}
return w.inner.AddRow(w.datumAlloc)
}
// close closes the writer and flushes any buffered data to the sink.
func (w *parquetWriter) close() error {
return w.inner.Close()
}
// populateDatums writes the appropriate datums into the datumAlloc slice.
func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error {
datums := datumAlloc[:0]
if err := updatedRow.ForEachColumn().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
datums = append(datums, d)
return nil
}); err != nil {
return err
}
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())
return nil
}
// addCDCTestMetadata appends options to the provided options to configure the
// parquet writer to write metadata required by cdc test feed factories.
func addCDCTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) {
keyCols := make([]string, 0)
if err := row.ForEachKeyColumn().Col(func(col cdcevent.ResultColumn) error {
keyCols = append(keyCols, col.Name)
return nil
}); err != nil {
return opts, err
}
opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": strings.Join(keyCols, ",")}))
allCols := make([]string, 0)
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
allCols = append(allCols, col.Name)
return nil
}); err != nil {
return opts, err
}
allCols = append(allCols, parquetCrdbEventTypeColName)
opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": strings.Join(allCols, ",")}))
return opts, nil
}