Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce index audit to lokitool #13008

Merged
merged 17 commits into from
Jun 3, 2024
Merged
4 changes: 3 additions & 1 deletion cmd/lokitool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
)

var (
ruleCommand commands.RuleCommand
ruleCommand commands.RuleCommand
auditCommand commands.AuditCommand
)

func main() {
app := kingpin.New("lokitool", "A command-line tool to manage Loki.")
ruleCommand.Register(app)
auditCommand.Register(app)

app.Command("version", "Get the version of the lokitool CLI").Action(func(k *kingpin.ParseContext) error {
fmt.Println(version.Print("loki"))
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ require (
github.com/prometheus/alertmanager v0.27.0
github.com/prometheus/common/sigv4 v0.1.0
github.com/richardartoul/molecule v1.0.0
github.com/schollz/progressbar/v3 v3.14.2
github.com/shirou/gopsutil/v4 v4.24.0-alpha.1
github.com/thanos-io/objstore v0.0.0-20230829152104-1b257a36f9a3
github.com/willf/bloom v2.0.3+incompatible
Expand All @@ -153,6 +154,7 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
Expand Down Expand Up @@ -227,7 +229,7 @@ require (
github.com/envoyproxy/go-control-plane v0.12.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-kit/kit v0.12.0
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/analysis v0.22.2 // indirect
Expand Down Expand Up @@ -286,7 +288,7 @@ require (
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/miekg/dns v1.1.58 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
Expand Down
11 changes: 9 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,7 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/kardianos/service v1.0.0/go.mod h1:8CzDhVuCuugtsHyZoTvsOBuvonN/UDBvl0kH+BUxvbo=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
Expand Down Expand Up @@ -1355,8 +1356,8 @@ github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOA
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
Expand Down Expand Up @@ -1625,6 +1626,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qq
github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03/go.mod h1:gRAiPF5C5Nd0eyyRdqIu9qTiFSoZzpTq727b5B8fkkU=
github.com/richardartoul/molecule v1.0.0 h1:+LFA9cT7fn8KF39zy4dhOnwcOwRoqKiBkPqKqya+8+U=
github.com/richardartoul/molecule v1.0.0/go.mod h1:uvX/8buq8uVeiZiFht+0lqSLBHF+uGV8BrTv8W/SIwk=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand All @@ -1650,6 +1653,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.25 h1:/8rfZAdFfafRXOgz+ZpMZZWZ5pYggCY9t7e/BvjaBHM=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.25/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg=
github.com/schollz/progressbar/v3 v3.14.2 h1:EducH6uNLIWsr560zSV1KrTeUb/wZGAHqyMFIEa99ks=
github.com/schollz/progressbar/v3 v3.14.2/go.mod h1:aQAZQnhF4JGFtRJiw/eobaXpsqpVQAftEQ+hLGXaRc4=
github.com/sean-/conswriter v0.0.0-20180208195008-f5ae3917a627/go.mod h1:7zjs06qF79/FKAJpBvFx3P8Ww4UTIMAe+lpNXDHziac=
github.com/sean-/pager v0.0.0-20180208200047-666be9bf53b5/go.mod h1:BeybITEsBEg6qbIiqJ6/Bqeq25bCLbL7YFmpaFfJDuM=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
Expand Down Expand Up @@ -2257,6 +2262,7 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand All @@ -2266,6 +2272,7 @@ golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
37 changes: 37 additions & 0 deletions pkg/tool/audit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Loki Index Auditing

## Usage

To audit your index data:
1. Make sure you're authenticated to the cloud where your bucket lives in.
In this example I'll be using GCP.
2. Create a new YAML configuration file that defines your storage configuration.
`lokitool` will use it to communicate with your data.
Only TSDB is supported. Make sure you give all three fields: `schema_config`, `storage_config` and `tenant`. In this example I'm naming my file `configfile.yaml`:
```yaml
schema_config:
configs:
- from: "2023-08-21"
index:
period: 24h
prefix: loki_env_tsdb_index_
object_store: gcs
schema: v13
store: tsdb

storage_config:
gcs:
bucket_name: loki-bucket

tenant: 12345
```
3. Build a new `lokitool` binary:
```bash
go build ./cmd/lokitool
```
4. Finally, invoke the `audit index` command the following way:
```bash
./lokitool audit index --config.file=configfile.yaml --index.file=index/loki_env_tsdb_index_19856/12345/1715707992714992001-compactor-1715199977885-1815707796275-g8003361.tsdb.gz
```
The `--config.file` is the YAML configuration described in the first step.
The `--index.file` is the path to the index file you want to audit. Take a look at your bucket to see its exactly path and substitute it accordingly.
133 changes: 133 additions & 0 deletions pkg/tool/audit/audit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package audit

import (
"context"
"fmt"
"io"
"path"
"strings"
"sync/atomic"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/loki/v3/pkg/compactor"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/storage"
loki_storage "github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
indexshipper_storage "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage"
shipperutil "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
progressbar "github.com/schollz/progressbar/v3"
"golang.org/x/sync/errgroup"

util_log "github.com/grafana/loki/v3/pkg/util/log"
)

const (
TsFormat = time.RFC3339Nano
)

func Run(ctx context.Context, cloudIndexPath, table string, cfg Config, logger log.Logger) (int, int, error) {
level.Info(logger).Log("msg", "auditing index", "index", cloudIndexPath, "table", table, "tenant", cfg.Tenant, "working_dir", cfg.WorkingDir)

objClient, err := GetObjectClient(cfg, logger)
if err != nil {
return 0, 0, err
}

localFile, err := DownloadIndexFile(ctx, cfg, cloudIndexPath, objClient, logger)
if err != nil {
return 0, 0, err
}

compactedIdx, err := ParseCompactexIndex(ctx, localFile, table, cfg, logger)
if err != nil {
return 0, 0, err
}
defer compactedIdx.Cleanup()

return AuditCompactedIndex(ctx, objClient, compactedIdx, cfg.Concurrency, logger)
}

func GetObjectClient(cfg Config, logger log.Logger) (client.ObjectClient, error) {

Check warning on line 54 in pkg/tool/audit/audit.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'logger' seems to be unused, consider removing or renaming it as _ (revive)
periodCfg := cfg.SchemaConfig.Configs[len(cfg.SchemaConfig.Configs)-1] // only check the last period.

objClient, err := loki_storage.NewObjectClient(periodCfg.ObjectType, cfg.StorageConfig, storage.NewClientMetrics())
if err != nil {
return nil, fmt.Errorf("couldn't create object client: %w", err)
}

return objClient, nil
}

func DownloadIndexFile(ctx context.Context, cfg Config, cloudIndexPath string, objClient client.ObjectClient, logger log.Logger) (string, error) {
splitPath := strings.Split(cloudIndexPath, "/")
localFileName := splitPath[len(splitPath)-1]
decompress := indexshipper_storage.IsCompressedFile(cloudIndexPath)
if decompress {
// get rid of the last extension, which is .gz
localFileName = strings.TrimSuffix(localFileName, path.Ext(localFileName))
}
localFilePath := path.Join(cfg.WorkingDir, localFileName)
if err := shipperutil.DownloadFileFromStorage(localFilePath, decompress, false, logger, func() (io.ReadCloser, error) {
r, _, err := objClient.GetObject(ctx, cloudIndexPath)
return r, err
}); err != nil {
return "", fmt.Errorf("couldn't download file %q from storage: %w", cloudIndexPath, err)
}

level.Info(logger).Log("msg", "file successfully downloaded from storage", "path", cloudIndexPath)
return localFileName, nil
}

func ParseCompactexIndex(ctx context.Context, localFilePath, table string, cfg Config, logger log.Logger) (compactor.CompactedIndex, error) {

Check warning on line 85 in pkg/tool/audit/audit.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'logger' seems to be unused, consider removing or renaming it as _ (revive)
periodCfg := cfg.SchemaConfig.Configs[len(cfg.SchemaConfig.Configs)-1] // only check the last period.
idxCompactor := tsdb.NewIndexCompactor()
compactedIdx, err := idxCompactor.OpenCompactedIndexFile(ctx, localFilePath, table, cfg.Tenant, cfg.WorkingDir, periodCfg, util_log.Logger)
if err != nil {
return nil, fmt.Errorf("couldn't open compacted index file %q: %w", localFilePath, err)
}
return compactedIdx, nil
}

func AuditCompactedIndex(ctx context.Context, objClient client.ObjectClient, compactedIdx compactor.CompactedIndex, parallelism int, logger log.Logger) (int, int, error) {

Check warning on line 95 in pkg/tool/audit/audit.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

exported: func name will be used as audit.AuditCompactedIndex by other packages, and that stutters; consider calling this CompactedIndex (revive)
var missingChunks, foundChunks atomic.Int32
foundChunks.Store(0)
missingChunks.Store(0)
bar := progressbar.NewOptions(-1,
progressbar.OptionShowCount(),
progressbar.OptionSetDescription("Chunks validated"),
)

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(parallelism)
compactedIdx.ForEachChunk(ctx, func(ce retention.ChunkEntry) (deleteChunk bool, err error) {

Check failure on line 106 in pkg/tool/audit/audit.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

Error return value of `compactedIdx.ForEachChunk` is not checked (errcheck)
bar.Add(1)

Check failure on line 107 in pkg/tool/audit/audit.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

Error return value of `bar.Add` is not checked (errcheck)
g.Go(func() error {
exists, err := CheckChunkExistance(string(ce.ChunkID), objClient)
if err != nil || !exists {
missingChunks.Add(1)
logger.Log("msg", "chunk is missing", "err", err, "chunk_id", string(ce.ChunkID))
return nil
}
foundChunks.Add(1)
return nil
})

return false, nil
})
g.Wait()

Check failure on line 121 in pkg/tool/audit/audit.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

Error return value of `g.Wait` is not checked (errcheck)

return int(foundChunks.Load()), int(missingChunks.Load()), nil
}

func CheckChunkExistance(key string, objClient client.ObjectClient) (bool, error) {
exists, err := objClient.ObjectExists(context.Background(), key)
if err != nil {
return false, err
}

return exists, nil
}
80 changes: 80 additions & 0 deletions pkg/tool/audit/audit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package audit

import (
"context"
"os"
"reflect"
"strings"
"testing"

"github.com/go-kit/log"
"github.com/grafana/loki/v3/pkg/compactor"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/gcp"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/stretchr/testify/require"
)

func TestGetObjectClient(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
objClient, err := GetObjectClient(Config{
StorageConfig: storage.Config{},
SchemaConfig: config.SchemaConfig{
Configs: []config.PeriodConfig{
{
ObjectType: "gcs",
},
},
},
}, logger)

require.NoError(t, err)
require.Equal(t, reflect.TypeOf(&gcp.GCSObjectClient{}), reflect.TypeOf(objClient))
}

type testObjClient struct {
client.ObjectClient
}

func (t testObjClient) ObjectExists(ctx context.Context, object string) (bool, error) {

Check warning on line 41 in pkg/tool/audit/audit_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
if strings.Contains(object, "missing") {
return false, nil
}
return true, nil
}

type testCompactedIdx struct {
compactor.CompactedIndex

chunks []retention.ChunkEntry
}

func (t testCompactedIdx) ForEachChunk(ctx context.Context, f retention.ChunkEntryCallback) error {

Check warning on line 54 in pkg/tool/audit/audit_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
for _, chunk := range t.chunks {
if _, err := f(chunk); err != nil {
return err
}
}
return nil
}

func TestAuditIndex(t *testing.T) {
ctx := context.Background()
objClient := testObjClient{}
compactedIdx := testCompactedIdx{
chunks: []retention.ChunkEntry{
{ChunkRef: retention.ChunkRef{ChunkID: []byte("found-1")}},
{ChunkRef: retention.ChunkRef{ChunkID: []byte("found-2")}},
{ChunkRef: retention.ChunkRef{ChunkID: []byte("found-3")}},
{ChunkRef: retention.ChunkRef{ChunkID: []byte("found-4")}},
{ChunkRef: retention.ChunkRef{ChunkID: []byte("missing-1")}},
},
}
logger := log.NewNopLogger()
found, missing, err := AuditCompactedIndex(ctx, objClient, compactedIdx, 1, logger)
require.NoError(t, err)
require.Equal(t, 4, found)
require.Equal(t, 1, missing)
}
Loading
Loading