Skip to content

Commit

Permalink
util/parquet: add compression options
Browse files Browse the repository at this point in the history
This change updates the parquet writer to be able to use
GZIP, ZSTD, SNAPPY, and BROTLI compression codecs. By
default, no compression is used. LZO and LZ4 are unsupported
by the library.

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Informs: cockroachdb#99028
Release note: None
  • Loading branch information
jayshrivastava committed Jun 21, 2023
1 parent 1439e45 commit d3144f6
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/util/parquet/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/util/timeofday",
"//pkg/util/uuid",
"@com_github_apache_arrow_go_v11//parquet",
"@com_github_apache_arrow_go_v11//parquet/compress",
"@com_github_apache_arrow_go_v11//parquet/file",
"@com_github_apache_arrow_go_v11//parquet/schema",
"@com_github_cockroachdb_errors//:errors",
Expand Down
47 changes: 46 additions & 1 deletion pkg/util/parquet/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"io"

"github.com/apache/arrow/go/v11/parquet"
"github.com/apache/arrow/go/v11/parquet/compress"
"github.com/apache/arrow/go/v11/parquet/file"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
Expand All @@ -22,6 +23,7 @@ import (
type config struct {
maxRowGroupLength int64
version parquet.Version
compression compress.Compression
}

// An Option is a configurable setting for the Writer.
Expand Down Expand Up @@ -57,12 +59,54 @@ func WithVersion(v string) Option {
}
}

// WithCompressionCodec specifies the compression codec to use when writing
// columns.
func WithCompressionCodec(compression CompressionCodec) Option {
return func(c *config) error {
if _, ok := compressionCodecToParquet[compression]; !ok {
return errors.AssertionFailedf("invalid compression codec")
}

c.compression = compressionCodecToParquet[compression]
return nil
}
}

var allowedVersions = map[string]parquet.Version{
"v1.0": parquet.V1_0,
"v2.4": parquet.V1_0,
"v2.6": parquet.V2_6,
}

// compressionCodecToParquet is a mapping between CompressionCodec values and
// compress.Codecs.
var compressionCodecToParquet = map[CompressionCodec]compress.Compression{
CompressionNone: compress.Codecs.Uncompressed,
CompressionGZIP: compress.Codecs.Gzip,
CompressionZSTD: compress.Codecs.Zstd,
CompressionSnappy: compress.Codecs.Snappy,
CompressionBrotli: compress.Codecs.Brotli,
}

// A CompressionCodec is the codec used to compress columns when writing
// parquet files.
type CompressionCodec int64

const (
// CompressionNone represents no compression.
CompressionNone CompressionCodec = iota + 1
// CompressionGZIP is the GZIP compression codec.
CompressionGZIP
// CompressionZSTD is the ZSTD compression codec.
CompressionZSTD
// CompressionSnappy is the Snappy compression codec.
CompressionSnappy
// CompressionBrotli is the Brotli compression codec.
CompressionBrotli
// LZO and LZ4 are unsupported. See comments on compress.Codecs.Lzo
// and compress.Codecs.Lz4.
)

// A Writer writes datums into an io.Writer sink. The Writer should be Close()ed
// before attempting to read from the output sink so all data is flushed and
// parquet metadata is written.
Expand All @@ -86,6 +130,7 @@ func NewWriter(sch *SchemaDefinition, sink io.Writer, opts ...Option) (*Writer,
cfg := config{
maxRowGroupLength: parquet.DefaultMaxRowGroupLen,
version: parquet.V2_6,
compression: compress.Codecs.Uncompressed,
}
for _, opt := range opts {
err := opt.apply(&cfg)
Expand All @@ -95,7 +140,7 @@ func NewWriter(sch *SchemaDefinition, sink io.Writer, opts ...Option) (*Writer,
}

parquetOpts := []parquet.WriterProperty{parquet.WithCreatedBy("cockroachdb"),
parquet.WithVersion(cfg.version)}
parquet.WithVersion(cfg.version), parquet.WithCompression(cfg.compression)}
props := parquet.NewWriterProperties(parquetOpts...)
writer := file.NewParquetWriter(sink, sch.schema.Root(), file.WithWriterProps(props))

Expand Down
34 changes: 34 additions & 0 deletions pkg/util/parquet/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,3 +546,37 @@ func TestVersions(t *testing.T) {
_, err = NewWriter(schemaDef, &buf, WithVersion("invalid"))
require.Error(t, err)
}

func TestCompressionCodecs(t *testing.T) {
schemaDef, err := NewSchema([]string{"a"}, []*types.T{types.Int})
require.NoError(t, err)

for compression := range compressionCodecToParquet {
fileName := "TestCompressionCodecs.parquet"
f, err := os.CreateTemp("", fileName)
require.NoError(t, err)

writer, err := NewWriter(schemaDef, f, WithCompressionCodec(compression))
require.NoError(t, err)

err = writer.AddRow([]tree.Datum{tree.NewDInt(0)})
require.NoError(t, err)

err = writer.Close()
require.NoError(t, err)

f, err = os.Open(f.Name())
require.NoError(t, err)

reader, err := file.NewParquetReader(f)
require.NoError(t, err)

colChunk, err := reader.MetaData().RowGroup(0).ColumnChunk(0)
require.NoError(t, err)

require.Equal(t, colChunk.Compression(), compressionCodecToParquet[compression])

err = reader.Close()
require.NoError(t, err)
}
}

0 comments on commit d3144f6

Please sign in to comment.