Skip to content

Commit

Permalink
Add Purge method for ES/OS (#5407)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- part of #5345

## Description of the changes
- Added Purge method for ES/OS
- optimized integration test for es/os storage

## How was this change tested?
- `STORAGE=elasticsearch make storage-integration-test`

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Pushkar Mishra <[email protected]>
  • Loading branch information
Pushkarm029 authored May 2, 2024
1 parent 55e991a commit 14aa43d
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 18 deletions.
6 changes: 3 additions & 3 deletions cmd/jaeger/internal/integration/storagecleaner/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@ func (c *storageCleaner) Start(ctx context.Context, host component.Host) error {
return fmt.Errorf("cannot find storage factory '%s': %w", c.config.TraceStorage, err)
}

purgeStorage := func() error {
purgeStorage := func(httpContext context.Context) error {
purger, ok := storageFactory.(storage.Purger)
if !ok {
return fmt.Errorf("storage %s does not implement Purger interface", c.config.TraceStorage)
}
if err := purger.Purge(); err != nil {
if err := purger.Purge(httpContext); err != nil {
return fmt.Errorf("error purging storage: %w", err)
}
return nil
}

purgeHandler := func(w http.ResponseWriter, r *http.Request) {
if err := purgeStorage(); err != nil {
if err := purgeStorage(r.Context()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type PurgerFactory struct {
err error
}

func (f *PurgerFactory) Purge() error {
func (f *PurgerFactory) Purge(_ context.Context) error {
return f.err
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/es/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Client interface {
Index() IndexService
Search(indices ...string) SearchService
MultiSearch() MultiSearchService
DeleteIndex(index string) IndicesDeleteService
io.Closer
GetVersion() uint
}
Expand All @@ -45,6 +46,11 @@ type IndicesCreateService interface {
Do(ctx context.Context) (*elastic.IndicesCreateResult, error)
}

// IndicesDeleteService is an abstraction for elastic.IndicesDeleteService
type IndicesDeleteService interface {
Do(ctx context.Context) (*elastic.IndicesDeleteResponse, error)
}

// TemplateCreateService is an abstraction for creating a mapping
type TemplateCreateService interface {
Body(mapping string) TemplateCreateService
Expand Down
1 change: 1 addition & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Configuration struct {
IndexRolloverFrequencySpans string `mapstructure:"-"`
IndexRolloverFrequencyServices string `mapstructure:"-"`
IndexRolloverFrequencySampling string `mapstructure:"-"`
ServiceCacheTTL time.Duration `mapstructure:"service_cache_ttl"`
AdaptiveSamplingLookback time.Duration `mapstructure:"-"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
Enabled bool `mapstructure:"-"`
Expand Down
20 changes: 20 additions & 0 deletions pkg/es/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions pkg/es/mocks/IndicesDeleteService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions pkg/es/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ func (c ClientWrapper) CreateIndex(index string) es.IndicesCreateService {
return WrapESIndicesCreateService(c.client.CreateIndex(index))
}

// DeleteIndex calls this function to internal client.
func (c ClientWrapper) DeleteIndex(index string) es.IndicesDeleteService {
return WrapESIndicesDeleteService(c.client.DeleteIndex(index))
}

// CreateTemplate calls this function to internal client.
func (c ClientWrapper) CreateTemplate(ttype string) es.TemplateCreateService {
if c.esVersion >= 8 {
Expand Down Expand Up @@ -147,6 +152,21 @@ type TemplateCreateServiceWrapper struct {
mappingCreateService *elastic.IndicesPutTemplateService
}

// IndicesDeleteServiceWrapper is a wrapper around elastic.IndicesDeleteService
type IndicesDeleteServiceWrapper struct {
indicesDeleteService *elastic.IndicesDeleteService
}

// WrapESIndicesDeleteService creates an ESIndicesDeleteService out of *elastic.IndicesDeleteService.
func WrapESIndicesDeleteService(indicesDeleteService *elastic.IndicesDeleteService) IndicesDeleteServiceWrapper {
return IndicesDeleteServiceWrapper{indicesDeleteService: indicesDeleteService}
}

// Do calls this function to internal service.
func (e IndicesDeleteServiceWrapper) Do(ctx context.Context) (*elastic.IndicesDeleteResponse, error) {
return e.indicesDeleteService.Do(ctx)
}

// WrapESTemplateCreateService creates an TemplateCreateService out of *elastic.IndicesPutTemplateService.
func WrapESTemplateCreateService(mappingCreateService *elastic.IndicesPutTemplateService) TemplateCreateServiceWrapper {
return TemplateCreateServiceWrapper{mappingCreateService: mappingCreateService}
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package badger

import (
"context"
"errors"
"expvar"
"flag"
Expand Down Expand Up @@ -308,7 +309,7 @@ func (f *Factory) registerBadgerExpvarMetrics(metricsFactory metrics.Factory) {
// Purge removes all data from the Factory's underlying Badger store.
// This function is intended for testing purposes only and should not be used in production environments.
// Calling Purge in production will result in permanent data loss.
func (f *Factory) Purge() error {
func (f *Factory) Purge(_ context.Context) error {
return f.store.Update(func(txn *badger.Txn) error {
return f.store.DropAll()
})
Expand Down
8 changes: 8 additions & 0 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var ( // interface comformance checks
_ storage.ArchiveFactory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
)

// Factory implements storage.Factory for Elasticsearch backend.
Expand Down Expand Up @@ -279,6 +280,7 @@ func createSpanWriter(
UseReadWriteAliases: cfg.UseReadWriteAliases,
Logger: logger,
MetricsFactory: mFactory,
ServiceCacheTTL: cfg.ServiceCacheTTL,
})

// Creating a template here would conflict with the one created for ILM resulting to no index rollover
Expand Down Expand Up @@ -403,6 +405,12 @@ func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atom
}
}

func (f *Factory) Purge(ctx context.Context) error {
esClient := f.getPrimaryClient()
_, err := esClient.DeleteIndex("*").Do(ctx)
return err
}

func loadTokenFromFile(path string) (string, error) {
b, err := os.ReadFile(filepath.Clean(path))
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
suffixIndexRolloverFrequencySpans = ".index-rollover-frequency-spans"
suffixIndexRolloverFrequencyServices = ".index-rollover-frequency-services"
suffixIndexRolloverFrequencySampling = ".index-rollover-frequency-adaptive-sampling"
suffixServiceCacheTTL = ".service-cache-ttl"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include"
Expand Down Expand Up @@ -197,6 +198,11 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixNumShards,
nsConfig.NumShards,
"The number of shards per index in Elasticsearch")
flagSet.Duration(
nsConfig.namespace+suffixServiceCacheTTL,
nsConfig.ServiceCacheTTL,
"The TTL for the cache of known service names",
)
flagSet.Int64(
nsConfig.namespace+suffixNumReplicas,
nsConfig.NumReplicas,
Expand Down Expand Up @@ -352,6 +358,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions)
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
cfg.ServiceCacheTTL = v.GetDuration(cfg.namespace + suffixServiceCacheTTL)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
cfg.Tags.AllAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll)
cfg.Tags.Include = v.GetString(cfg.namespace + suffixTagsAsFieldsInclude)
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package integration

import (
"context"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -52,7 +53,7 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) {
}

func (s *BadgerIntegrationStorage) cleanUp(t *testing.T) {
s.factory.Purge()
require.NoError(t, s.factory.Purge(context.Background()))
}

func TestBadgerStorage(t *testing.T) {
Expand Down
21 changes: 10 additions & 11 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type ESStorageIntegration struct {
client *elastic.Client
v8Client *elasticsearch8.Client
logger *zap.Logger
factory *es.Factory
}

func (s *ESStorageIntegration) getVersion() (uint, error) {
Expand Down Expand Up @@ -99,19 +100,17 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool)
s.initSpanstore(t, allTagsAsFields)

s.CleanUp = func(t *testing.T) {
s.esCleanUp(t, allTagsAsFields)
s.esCleanUp(t)
}
s.esCleanUp(t, allTagsAsFields)
s.esCleanUp(t)
s.SkipArchiveTest = false
// TODO: remove this flag after ES supports returning spanKind
// Issue https://github.com/jaegertracing/jaeger/issues/1923
s.GetOperationsMissingSpanKind = true
}

func (s *ESStorageIntegration) esCleanUp(t *testing.T, allTagsAsFields bool) {
_, err := s.client.DeleteIndex("*").Do(context.Background())
require.NoError(t, err)
s.initSpanstore(t, allTagsAsFields)
func (s *ESStorageIntegration) esCleanUp(t *testing.T) {
require.NoError(t, s.factory.Purge(context.Background()))
}

func (s *ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool) *es.Factory {
Expand All @@ -123,6 +122,7 @@ func (s *ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields
fmt.Sprintf("--es.num-replicas=%v", 1),
fmt.Sprintf("--es.index-prefix=%v", indexPrefix),
fmt.Sprintf("--es.use-ilm=%v", false),
fmt.Sprintf("--es.service-cache-ttl=%v", 1*time.Second),
fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields),
fmt.Sprintf("--es.bulk.actions=%v", 1),
fmt.Sprintf("--es.bulk.flush-interval=%v", time.Nanosecond),
Expand All @@ -134,16 +134,15 @@ func (s *ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields
f.InitFromViper(v, s.logger)
require.NoError(t, f.Initialize(metrics.NullFactory, s.logger))

// TODO ideally we need to close the factory once the test is finished
// but because esCleanup calls initialize() we get a panic later
// t.Cleanup(func() {
// require.NoError(t, f.Close())
// })
t.Cleanup(func() {
require.NoError(t, f.Close())
})
return f
}

func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) {
f := s.initializeESFactory(t, allTagsAsFields)
s.factory = f
var err error
s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package storage

import (
"context"
"errors"

"go.uber.org/zap"
Expand Down Expand Up @@ -53,7 +54,7 @@ type Factory interface {
// Only meant to be used from integration tests.
type Purger interface {
// Purge removes all data from the storage.
Purge() error
Purge(context.Context) error
}

// SamplingStoreFactory defines an interface that is capable of returning the necessary backends for
Expand Down

0 comments on commit 14aa43d

Please sign in to comment.