Skip to content

Commit

Permalink
lightning: Compress Reader/Writer supports reading/writing Snappy/Zst…
Browse files Browse the repository at this point in the history
…d type compressed files (#38603)

ref #38514
  • Loading branch information
lyzx2001 authored Oct 31, 2022
1 parent bc0b419 commit 312155a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 7 deletions.
11 changes: 9 additions & 2 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er
}

type compressReader struct {
io.ReadCloser
io.Reader
io.Closer
}

// nolint:interfacer
Expand All @@ -94,14 +95,20 @@ func newInterceptReader(fileReader ExternalFileReader, compressType CompressType
return nil, errors.Trace(err)
}
return &compressReader{
ReadCloser: r,
Reader: r,
Closer: fileReader,
}, nil
}

func (*compressReader) Seek(_ int64, _ int) (int64, error) {
return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now")
}

func (c *compressReader) Close() error {
err := c.Closer.Close()
return err
}

type flushStorageWriter struct {
writer io.Writer
flusher flusher
Expand Down
35 changes: 34 additions & 1 deletion br/pkg/storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import (
"context"
"io"

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
)

// CompressType represents the type of compression.
Expand All @@ -17,6 +21,10 @@ const (
NoCompression CompressType = iota
// Gzip will compress given bytes in gzip format.
Gzip
// Snappy will compress given bytes in snappy format.
Snappy
// Zstd will compress given bytes in zstd format.
Zstd
)

type flusher interface {
Expand All @@ -39,6 +47,19 @@ type interceptBuffer interface {
Compressed() bool
}

func createSuffixString(compressType CompressType) string {
if compressType == Gzip {
return ".txt.gz"
}
if compressType == Snappy {
return ".txt.snappy"
}
if compressType == Zstd {
return ".txt.zst"
}
return ""
}

func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer {
if compressType == NoCompression {
return newNoCompressionBuffer(chunkSize)
Expand All @@ -50,15 +71,27 @@ func newCompressWriter(compressType CompressType, w io.Writer) simpleCompressWri
switch compressType {
case Gzip:
return gzip.NewWriter(w)
case Snappy:
return snappy.NewBufferedWriter(w)
case Zstd:
newWriter, err := zstd.NewWriter(w)
if err != nil {
log.Warn("Met error when creating new writer for Zstd type file", zap.Error(err))
}
return newWriter
default:
return nil
}
}

func newCompressReader(compressType CompressType, r io.Reader) (io.ReadCloser, error) {
func newCompressReader(compressType CompressType, r io.Reader) (io.Reader, error) {
switch compressType {
case Gzip:
return gzip.NewReader(r)
case Snappy:
return snappy.NewReader(r), nil
case Zstd:
return zstd.NewReader(r)
default:
return nil, nil
}
Expand Down
9 changes: 5 additions & 4 deletions br/pkg/storage/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ func TestCompressReaderWriter(t *testing.T) {
ctx := context.Background()
storage, err := Create(ctx, backend, true)
require.NoError(t, err)
storage = WithCompression(storage, Gzip)
fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt.gz"
storage = WithCompression(storage, test.compressType)
suffix := createSuffixString(test.compressType)
fileName := strings.ReplaceAll(test.name, " ", "-") + suffix
writer, err := storage.Create(ctx, fileName)
require.NoError(t, err)
for _, str := range test.content {
Expand All @@ -124,7 +125,6 @@ func TestCompressReaderWriter(t *testing.T) {
_, err = bf.ReadFrom(r)
require.NoError(t, err)
require.Equal(t, strings.Join(test.content, ""), bf.String())
require.Nil(t, r.Close())

// test withCompression Open
r, err = storage.Open(ctx, fileName)
Expand All @@ -135,7 +135,8 @@ func TestCompressReaderWriter(t *testing.T) {

require.Nil(t, file.Close())
}
compressTypeArr := []CompressType{Gzip}
compressTypeArr := []CompressType{Gzip, Snappy, Zstd}

tests := []testcase{
{
name: "long text medium chunks",
Expand Down

0 comments on commit 312155a

Please sign in to comment.