Skip to content

Commit

Permalink
Merge pull request #257 from tsukanov-as/tags-uploading-optimization3
Browse files Browse the repository at this point in the history
Tags uploading optimization
  • Loading branch information
lomik authored Dec 26, 2023
2 parents d7e7ba7 + da889a4 commit ec51e46
Show file tree
Hide file tree
Showing 94 changed files with 27,268 additions and 57 deletions.
20 changes: 14 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/lomik/zapwriter"

"github.com/lomik/graphite-clickhouse/cache"
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/helper/date"
"github.com/lomik/graphite-clickhouse/helper/rollup"
"github.com/lomik/graphite-clickhouse/limiter"
Expand Down Expand Up @@ -260,11 +261,15 @@ func clickhouseURLValidate(chURL string) (*url.URL, error) {

// Tags config
type Tags struct {
Rules string `toml:"rules" json:"rules"`
Date string `toml:"date" json:"date"`
ExtraWhere string `toml:"extra-where" json:"extra-where"`
InputFile string `toml:"input-file" json:"input-file"`
OutputFile string `toml:"output-file" json:"output-file"`
Rules string `toml:"rules" json:"rules"`
Date string `toml:"date" json:"date"`
ExtraWhere string `toml:"extra-where" json:"extra-where"`
InputFile string `toml:"input-file" json:"input-file"`
OutputFile string `toml:"output-file" json:"output-file"`
Threads int `toml:"threads" json:"threads" comment:"number of threads for uploading tags to clickhouse (1 by default)"`
Compression clickhouse.ContentEncoding `toml:"compression" json:"compression" comment:"compression method for tags before sending them to clickhouse (i.e. content encoding): gzip (default), none, zstd"`
Version uint32 `toml:"version" json:"version" comment:"fixed tags version for testing purposes (by default the current timestamp is used for each upload)"`
SelectChunksCount int `toml:"select-chunks-count" json:"select-chunks-count" comment:"number of chunks for selecting metrics from clickhouse (10 by default)"`
}

// Carbonlink configuration
Expand Down Expand Up @@ -385,7 +390,10 @@ func New() *Config {
FindLimiter: limiter.NoopLimiter{},
TagsLimiter: limiter.NoopLimiter{},
},
Tags: Tags{},
Tags: Tags{
Threads: 1,
Compression: "gzip",
},
Carbonlink: Carbonlink{
Threads: 10,
Retries: 2,
Expand Down
18 changes: 15 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ date = "2012-12-12"
extra-where = "AND case"
input-file = "input"
output-file = "output"
threads = 5
compression = "zstd"
version = 42
select-chunks-count = 15
[carbonlink]
server = "server:3333"
Expand Down Expand Up @@ -382,7 +386,7 @@ sample-thereafter = 12
assert.Equal(t, expected.ClickHouse, config.ClickHouse)

// Tags
expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output"}
expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output", 5, "zstd", 42, 15}
assert.Equal(t, expected.Tags, config.Tags)

// Carbonlink
Expand Down Expand Up @@ -504,6 +508,10 @@ date = "2012-12-12"
extra-where = "AND case"
input-file = "input"
output-file = "output"
threads = 5
compression = "zstd"
version = 42
select-chunks-count = 15
[carbonlink]
server = "server:3333"
Expand Down Expand Up @@ -682,7 +690,7 @@ sample-thereafter = 12
assert.Equal(t, expected.ClickHouse, config.ClickHouse)

// Tags
expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output"}
expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output", 5, "zstd", 42, 15}
assert.Equal(t, expected.Tags, config.Tags)

// Carbonlink
Expand Down Expand Up @@ -814,6 +822,10 @@ date = "2012-12-12"
extra-where = "AND case"
input-file = "input"
output-file = "output"
threads = 5
compression = "zstd"
version = 42
select-chunks-count = 15
[carbonlink]
server = "server:3333"
Expand Down Expand Up @@ -997,7 +1009,7 @@ sample-thereafter = 12
assert.Equal(t, expected.ClickHouse, config.ClickHouse)

// Tags
expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output"}
expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output", 5, "zstd", 42, 15}
assert.Equal(t, expected.Tags, config.Tags)

// Carbonlink
Expand Down
8 changes: 8 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,14 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
# extra-where = ""
# input-file = ""
# output-file = ""
# number of threads for uploading tags to clickhouse (1 by default)
# threads = 1
# compression method for tags before sending them to clickhouse (i.e. content encoding): gzip (default), none, zstd
# compression = "gzip"
# fixed tags version for testing purposes (by default the current timestamp is used for each upload)
# version = 0
# number of chunks for selecting metrics from clickhouse (10 by default)
# select-chunks-count = 0

[carbonlink]
server = ""
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/klauspost/compress v1.17.3 // indirect
github.com/lomik/stop v0.0.0-20161127103810-188e98d969bd // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA=
github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
38 changes: 31 additions & 7 deletions helper/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"
Expand All @@ -30,6 +31,14 @@ type ErrWithDescr struct {
data string
}

type ContentEncoding string

const (
ContentEncodingNone ContentEncoding = "none"
ContentEncodingGzip ContentEncoding = "gzip"
ContentEncodingZstd ContentEncoding = "zstd"
)

func NewErrWithDescr(err string, data string) error {
return &ErrWithDescr{err, data}
}
Expand Down Expand Up @@ -183,18 +192,23 @@ func Query(ctx context.Context, dsn string, query string, opts Options, extData
}

func Post(ctx context.Context, dsn string, query string, postBody io.Reader, opts Options, extData *ExternalData) ([]byte, int64, int64, error) {
return do(ctx, dsn, query, postBody, false, opts, extData)
return do(ctx, dsn, query, postBody, ContentEncodingNone, opts, extData)
}

// Deprecated: use PostWithEncoding instead
func PostGzip(ctx context.Context, dsn string, query string, postBody io.Reader, opts Options, extData *ExternalData) ([]byte, int64, int64, error) {
return do(ctx, dsn, query, postBody, true, opts, extData)
return do(ctx, dsn, query, postBody, ContentEncodingGzip, opts, extData)
}

func PostWithEncoding(ctx context.Context, dsn string, query string, postBody io.Reader, encoding ContentEncoding, opts Options, extData *ExternalData) ([]byte, int64, int64, error) {
return do(ctx, dsn, query, postBody, encoding, opts, extData)
}

func Reader(ctx context.Context, dsn string, query string, opts Options, extData *ExternalData) (*LoggedReader, error) {
return reader(ctx, dsn, query, nil, false, opts, extData)
return reader(ctx, dsn, query, nil, ContentEncodingNone, opts, extData)
}

func reader(ctx context.Context, dsn string, query string, postBody io.Reader, gzip bool, opts Options, extData *ExternalData) (bodyReader *LoggedReader, err error) {
func reader(ctx context.Context, dsn string, query string, postBody io.Reader, encoding ContentEncoding, opts Options, extData *ExternalData) (bodyReader *LoggedReader, err error) {
if postBody != nil && extData != nil {
err = fmt.Errorf("postBody and extData could not be passed in one request")
return
Expand Down Expand Up @@ -265,8 +279,15 @@ func reader(ctx context.Context, dsn string, query string, postBody io.Reader, g
req.Header.Add("Content-Type", contentHeader)
}

if gzip {
switch encoding {
case ContentEncodingNone:
// no encoding
case ContentEncodingGzip:
req.Header.Add("Content-Encoding", "gzip")
case ContentEncodingZstd:
req.Header.Add("Content-Encoding", "zstd")
default:
return nil, fmt.Errorf("unknown encoding: %s", encoding)
}

client := &http.Client{
Expand Down Expand Up @@ -305,6 +326,9 @@ func reader(ctx context.Context, dsn string, query string, postBody io.Reader, g
read_bytes, _ = strconv.ParseInt(v, 10, 64)
}
}
sort.Slice(fields, func(i int, j int) bool {
return fields[i].Key < fields[j].Key
})
logger = logger.With(fields...)
} else {
logger.Warn("query", zap.Error(err), zap.String("clickhouse-summary", summaryHeader))
Expand Down Expand Up @@ -337,8 +361,8 @@ func reader(ctx context.Context, dsn string, query string, postBody io.Reader, g
return
}

func do(ctx context.Context, dsn string, query string, postBody io.Reader, gzip bool, opts Options, extData *ExternalData) ([]byte, int64, int64, error) {
bodyReader, err := reader(ctx, dsn, query, postBody, gzip, opts, extData)
func do(ctx context.Context, dsn string, query string, postBody io.Reader, encoding ContentEncoding, opts Options, extData *ExternalData) ([]byte, int64, int64, error) {
bodyReader, err := reader(ctx, dsn, query, postBody, encoding, opts, extData)
if err != nil {
return nil, 0, 0, err
}
Expand Down
Loading

0 comments on commit ec51e46

Please sign in to comment.