Skip to content

Commit

Permalink
Add e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: jojohappy <[email protected]>
  • Loading branch information
jojohappy committed Oct 9, 2019
1 parent de9a47f commit ecf1456
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 10 deletions.
10 changes: 4 additions & 6 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o
return
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64, relabelConfig []*relabel.Config) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand All @@ -138,7 +138,6 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
labels.FromStrings("a", "2", "c", "2"),
}
extLset := labels.FromStrings("ext1", "value1")
relabelConfig := make([]*relabel.Config, 0)

blocks, minTime, maxTime := prepareTestBlocks(t, time.Now(), 3, dir, bkt,
series, extLset)
Expand Down Expand Up @@ -393,7 +392,7 @@ func TestBucketStore_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig)
defer s.Close()

t.Log("Test with no index cache")
Expand Down Expand Up @@ -442,7 +441,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0)
s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, emptyRelabelConfig)
defer s.Close()

indexCache, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{
Expand Down Expand Up @@ -476,7 +475,6 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
labels.FromStrings("a", "1", "b", "2"),
}
extLset := labels.FromStrings("ext1", "value1")
relabelConfig := make([]*relabel.Config, 0)

_, minTime, _ := prepareTestBlocks(t, time.Now(), 3, dir, bkt, series, extLset)

Expand All @@ -487,7 +485,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
&FilterConfig{
MinTime: minTimeDuration,
MaxTime: filterMaxTime,
}, relabelConfig)
}, emptyRelabelConfig)
testutil.Ok(t, err)

err = store.SyncBlocks(ctx)
Expand Down
66 changes: 62 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"gopkg.in/yaml.v2"
)

var emptyRelabelConfig = make([]*relabel.Config, 0)

func TestBucketBlock_Property(t *testing.T) {
parameters := gopter.DefaultTestParameters()
parameters.Rng.Seed(2000)
Expand Down Expand Up @@ -428,8 +430,7 @@ func TestBucketStore_Info(t *testing.T) {
dir, err := ioutil.TempDir("", "bucketstore-test")
testutil.Ok(t, err)

relabelConfig := make([]*relabel.Config, 0)
bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, relabelConfig)
bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, emptyRelabelConfig)
testutil.Ok(t, err)

resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
Expand All @@ -447,7 +448,6 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) {

series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")}
extLset := labels.FromStrings("ext1", "value1")
relabelConfig := make([]*relabel.Config, 0)

// Create a block in range [-2w, -1w].
id1, err := testutil.CreateBlock(ctx, dir, series, 10,
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) {
&FilterConfig{
MinTime: minTimeDuration,
MaxTime: hourBefore,
}, relabelConfig)
}, emptyRelabelConfig)
testutil.Ok(t, err)

inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1)
Expand Down Expand Up @@ -579,3 +579,61 @@ func TestBucketStore_selectorBlocks(t *testing.T) {
testutil.Equals(t, sc.exceptedIds, ids)
}
}

func TestBucketStore_InfoWithLabels(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dir, err := ioutil.TempDir("", "bucketstore-test")
testutil.Ok(t, err)

bkt := inmem.NewBucket()
series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")}

logger := log.NewNopLogger()
id1, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "A"}}, 0)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id1.String(), block.IndexFilename), path.Join(id1.String(), block.IndexFilename)))

id2, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "B"}}, 0)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id2.String(), block.IndexFilename), path.Join(id2.String(), block.IndexFilename)))

id3, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "B"}}, 0)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id3.String(), block.IndexFilename), path.Join(id3.String(), block.IndexFilename)))

relabelContentYaml := `
- action: drop
regex: "A"
source_labels:
- cluster
`
var relabelConfig []*relabel.Config
err = yaml.Unmarshal([]byte(relabelContentYaml), &relabelConfig)
testutil.Ok(t, err)
bucketStore, err := NewBucketStore(nil, nil, bkt, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, relabelConfig)
testutil.Ok(t, err)

err = bucketStore.SyncBlocks(ctx)
testutil.Ok(t, err)

resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
testutil.Ok(t, err)

testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType)
testutil.Equals(t, int64(0), resp.MinTime)
testutil.Equals(t, int64(1000), resp.MaxTime)
testutil.Equals(t, []storepb.LabelSet{
storepb.LabelSet{
Labels: []storepb.Label{
{
Name: "cluster",
Value: "B",
},
},
},
}, resp.LabelSets)
}
27 changes: 27 additions & 0 deletions test/e2e/spinup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,33 @@ func storeGateway(http, grpc address, bucketConfig []byte) *serverScheduler {
}
}

func storeGatewayWithRelabel(http, grpc address, bucketConfig []byte, relabelConfig []byte) *serverScheduler {
return &serverScheduler{
HTTP: http,
GRPC: grpc,
schedule: func(workDir string) (Exec, error) {
dbDir := path.Join(workDir, "data", "store-gateway", http.Port)

if err := os.MkdirAll(dbDir, 0777); err != nil {
return nil, errors.Wrap(err, "creating store gateway dir failed")
}

return newCmdExec(exec.Command("thanos",
"store",
"--debug.name", fmt.Sprintf("store-gw-%s", http.Port),
"--data-dir", dbDir,
"--grpc-address", grpc.HostPort(),
"--http-address", http.HostPort(),
"--log.level", "debug",
"--objstore.config", string(bucketConfig),
// Accelerated sync time for quicker test (3m by default).
"--sync-block-duration", "5s",
"--selector.relabel-config", string(relabelConfig),
)), nil
},
}
}

func alertManager(http address) *serverScheduler {
return &serverScheduler{
HTTP: http,
Expand Down
110 changes: 110 additions & 0 deletions test/e2e/store_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,113 @@ func TestStoreGateway(t *testing.T) {
"ext1": "value1",
}, res[0].Metric)
}

func TestStoreGateway_SelectorBlock(t *testing.T) {
a := newLocalAddresser()
minioAddr := a.New()

s3Config := s3.Config{
Bucket: "test-storegateway-selectorblock",
AccessKey: "abc",
SecretKey: "mightysecret",
Endpoint: minioAddr.HostPort(),
Insecure: true,
}

bucketConfig := client.BucketConfig{
Type: client.S3,
Config: s3Config,
}

config, err := yaml.Marshal(bucketConfig)
testutil.Ok(t, err)

relabelContentYaml := `
- action: drop
regex: "A"
source_labels:
- cluster
`
s := storeGatewayWithRelabel(a.New(), a.New(), config, []byte(relabelContentYaml))
q := querier(a.New(), a.New(), []address{s.GRPC}, nil)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)

exit, err := e2eSpinupWithS3ObjStorage(t, ctx, minioAddr, &s3Config, s, q)
if err != nil {
t.Errorf("spinup failed: %v", err)
cancel()
return
}

defer func() {
cancel()
<-exit
}()

dir, err := ioutil.TempDir("", "test_store_gateway_selectorblock_local")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

series := []labels.Labels{
labels.FromStrings("a", "1", "b", "2"),
}
extLset := labels.FromStrings("cluster", "A", "replica", "1")
extLset2 := labels.FromStrings("cluster", "B", "replica", "2")
extLset3 := labels.FromStrings("cluster", "A", "replica", "3")

now := time.Now()
id1, err := testutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset, 0)
testutil.Ok(t, err)

id2, err := testutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset2, 0)
testutil.Ok(t, err)

id3, err := testutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset3, 0)
testutil.Ok(t, err)

l := log.NewLogfmtLogger(os.Stdout)

bkt, err := s3.NewBucketWithConfig(l, s3Config, "test-feed-1")
testutil.Ok(t, err)

testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String()))
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id2.String()), id2.String()))
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id3.String()), id3.String()))

var res model.Vector

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
select {
case <-exit:
cancel()
return nil
default:
}

var (
err error
warnings []string
)
res, warnings, err = promclient.QueryInstant(ctx, nil, urlParse(t, q.HTTP.URL()), "{a=\"1\"}", time.Now(), promclient.QueryOptions{
Deduplicate: true,
})
if err != nil {
return err
}

if len(warnings) > 0 {
return errors.Errorf("unexpected warnings %s", warnings)
}

if len(res) != 2 {
return errors.Errorf("unexpected result size %d", len(res))
}
return nil
}))

testutil.Equals(t, model.Metric{
"a": "1",
"b": "2",
"cluster": "B",
}, res[0].Metric)
}

0 comments on commit ecf1456

Please sign in to comment.