Skip to content

Commit

Permalink
Move Archive test into shared integration test suite (#5207)
Browse files Browse the repository at this point in the history
**Which problem is this PR solving?**

This PR addresses the issue [#5203
](#5203)

**Description of the changes**
This PR addresses the issue
[#5203](#5203) by
integrating the testArchiveTrace function into the common
StorageIntegration type. This modification ensures compatibility with
multiple storage backends, allowing the archiving test to be executed
across different environments.

**How was this change tested?**

The changes were tested by running the following command:

```bash
make test
```

**Checklist**

- [x] I have read
[CONTRIBUTING_GUIDELINES.md](https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md)
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - `for jaeger: make lint test`
  - `for jaeger-ui: yarn lint` and `yarn test`

---------

Signed-off-by: Wise-Wizard <[email protected]>
Signed-off-by: Saransh Shankar <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
Wise-Wizard and yurishkuro authored Mar 29, 2024
1 parent 2285143 commit f934846
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 57 deletions.
1 change: 1 addition & 0 deletions plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (s *BadgerIntegrationStorage) initialize() error {

// TODO: remove this badger supports returning spanKind from GetOperations
s.GetOperationsMissingSpanKind = true
s.SkipArchiveTest = true
return nil
}

Expand Down
1 change: 1 addition & 0 deletions plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func newCassandraStorageIntegration() *CassandraStorageIntegration {
s := &CassandraStorageIntegration{
StorageIntegration: StorageIntegration{
GetDependenciesReturnsSource: true,
SkipArchiveTest: true,

Refresh: func() error { return nil },
SkipList: []string{
Expand Down
91 changes: 41 additions & 50 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
estemplate "github.com/jaegertracing/jaeger/pkg/es"
eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -101,7 +100,7 @@ func (s *ESStorageIntegration) getVersion() (uint, error) {
return uint(esVersion), nil
}

func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields, archive bool) error {
func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) error {
rawClient, err := elastic.NewClient(
elastic.SetURL(queryURL),
elastic.SetSniff(false))
Expand All @@ -115,24 +114,25 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields, archi
})
require.NoError(t, err)

s.initSpanstore(t, allTagsAsFields, archive)
s.initSpanstore(t, allTagsAsFields)
s.initSamplingStore(t)

s.CleanUp = func() error {
s.esCleanUp(t, allTagsAsFields, archive)
s.esCleanUp(t, allTagsAsFields)
return nil
}
s.Refresh = s.esRefresh
s.esCleanUp(t, allTagsAsFields, archive)
s.esCleanUp(t, allTagsAsFields)
// TODO: remove this flag after ES support returning spanKind when get operations
s.GetOperationsMissingSpanKind = true
s.SkipArchiveTest = false
return nil
}

func (s *ESStorageIntegration) esCleanUp(t *testing.T, allTagsAsFields, archive bool) {
func (s *ESStorageIntegration) esCleanUp(t *testing.T, allTagsAsFields bool) {
_, err := s.client.DeleteIndex("*").Do(context.Background())
require.NoError(t, err)
s.initSpanstore(t, allTagsAsFields, archive)
s.initSpanstore(t, allTagsAsFields)
}

func (s *ESStorageIntegration) initSamplingStore(t *testing.T) {
Expand Down Expand Up @@ -170,7 +170,7 @@ func (s *ESStorageIntegration) getEsClient(t *testing.T) eswrapper.ClientWrapper
return eswrapper.WrapESClient(s.client, bp, esVersion, s.v8Client)
}

func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields, archive bool) error {
func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) error {
client := s.getEsClient(t)
mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: estemplate.TextTemplateBuilder{},
Expand All @@ -184,6 +184,7 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields, arch
require.NoError(t, err)
clientFn := func() estemplate.Client { return client }

// Initializing Span Reader and Writer
w := spanstore.NewSpanWriter(
spanstore.SpanWriterParams{
Client: clientFn,
Expand All @@ -192,7 +193,7 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields, arch
IndexPrefix: indexPrefix,
AllTagsAsFields: allTagsAsFields,
TagDotReplacement: tagKeyDeDotChar,
Archive: archive,
Archive: false,
})
err = w.CreateTemplates(spanMapping, serviceMapping, indexPrefix)
require.NoError(t, err)
Expand All @@ -206,10 +207,34 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields, arch
IndexPrefix: indexPrefix,
MaxSpanAge: maxSpanAge,
TagDotReplacement: tagKeyDeDotChar,
Archive: archive,
MaxDocCount: defaultMaxDocCount,
Tracer: tracer.Tracer("test"),
Archive: false,
})

// Initializing Archive Span Reader and Writer
s.ArchiveSpanWriter = spanstore.NewSpanWriter(
spanstore.SpanWriterParams{
Client: clientFn,
Logger: s.logger,
MetricsFactory: metrics.NullFactory,
IndexPrefix: indexPrefix,
AllTagsAsFields: allTagsAsFields,
TagDotReplacement: tagKeyDeDotChar,
Archive: true,
})
s.ArchiveSpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{
Client: clientFn,
Logger: s.logger,
MetricsFactory: metrics.NullFactory,
IndexPrefix: indexPrefix,
MaxSpanAge: maxSpanAge,
TagDotReplacement: tagKeyDeDotChar,
MaxDocCount: defaultMaxDocCount,
Tracer: tracer.Tracer("test"),
Archive: true,
})

dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{
Client: clientFn,
Logger: s.logger,
Expand Down Expand Up @@ -246,35 +271,27 @@ func healthCheck() error {
return errors.New("elastic search is not ready")
}

func testElasticsearchStorage(t *testing.T, allTagsAsFields, archive bool) {
func testElasticsearchStorage(t *testing.T, allTagsAsFields bool) {
if os.Getenv("STORAGE") != "elasticsearch" && os.Getenv("STORAGE") != "opensearch" {
t.Skip("Integration test against ElasticSearch skipped; set STORAGE env var to elasticsearch to run this")
}
if err := healthCheck(); err != nil {
t.Fatal(err)
}
s := &ESStorageIntegration{}
s.initializeES(t, allTagsAsFields, archive)
s.initializeES(t, allTagsAsFields)

s.Fixtures = LoadAndParseQueryTestCases(t, "fixtures/queries_es.json")

if archive {
t.Run("ArchiveTrace", s.testArchiveTrace)
} else {
s.IntegrationTestAll(t)
}
s.IntegrationTestAll(t)
}

func TestElasticsearchStorage(t *testing.T) {
testElasticsearchStorage(t, false, false)
testElasticsearchStorage(t, false)
}

func TestElasticsearchStorage_AllTagsAsObjectFields(t *testing.T) {
testElasticsearchStorage(t, true, false)
}

func TestElasticsearchStorage_Archive(t *testing.T) {
testElasticsearchStorage(t, false, true)
testElasticsearchStorage(t, true)
}

func TestElasticsearchStorage_IndexTemplates(t *testing.T) {
Expand All @@ -285,7 +302,7 @@ func TestElasticsearchStorage_IndexTemplates(t *testing.T) {
t.Fatal(err)
}
s := &ESStorageIntegration{}
s.initializeES(t, true, false)
s.initializeES(t, true)
esVersion, err := s.getVersion()
require.NoError(t, err)
// TODO abstract this into pkg/es/client.IndexManagementLifecycleAPI
Expand All @@ -307,32 +324,6 @@ func TestElasticsearchStorage_IndexTemplates(t *testing.T) {
s.cleanESIndexTemplates(t, indexPrefix)
}

func (s *ESStorageIntegration) testArchiveTrace(t *testing.T) {
defer s.cleanUp(t)
tID := model.NewTraceID(uint64(11), uint64(22))
expected := &model.Span{
OperationName: "archive_span",
StartTime: time.Now().Add(-maxSpanAge * 5),
TraceID: tID,
SpanID: model.NewSpanID(55),
References: []model.SpanRef{},
Process: model.NewProcess("archived_service", model.KeyValues{}),
}

require.NoError(t, s.SpanWriter.WriteSpan(context.Background(), expected))
s.refresh(t)

var actual *model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(context.Background(), tID)
return err == nil && len(actual.Spans) == 1
})
if !assert.True(t, found) {
CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual)
}
}

func (s *ESStorageIntegration) cleanESIndexTemplates(t *testing.T, prefix string) error {
version, err := s.getVersion()
require.NoError(t, err)
Expand Down
7 changes: 6 additions & 1 deletion plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ func (s *GRPCStorageIntegrationTestSuite) initialize() error {
if s.SpanReader, err = f.CreateSpanReader(); err != nil {
return err
}

if s.ArchiveSpanReader, err = f.CreateArchiveSpanReader(); err != nil {
return err
}
if s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter(); err != nil {
return err
}
// TODO DependencyWriter is not implemented in grpc store

s.Refresh = s.refresh
Expand Down
48 changes: 42 additions & 6 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,25 @@ var fixtures embed.FS

// StorageIntegration holds components for storage integration test
type StorageIntegration struct {
SpanWriter spanstore.Writer
SpanReader spanstore.Reader
DependencyWriter dependencystore.Writer
DependencyReader dependencystore.Reader
SamplingStore samplingstore.Store
Fixtures []*QueryFixtures
SpanWriter spanstore.Writer
SpanReader spanstore.Reader
ArchiveSpanReader spanstore.Reader
ArchiveSpanWriter spanstore.Writer
DependencyWriter dependencystore.Writer
DependencyReader dependencystore.Reader
SamplingStore samplingstore.Store
Fixtures []*QueryFixtures

// TODO: remove this after all storage backends return spanKind from GetOperations
GetOperationsMissingSpanKind bool

// TODO: remove this after all storage backends return Source column from GetDependencies

GetDependenciesReturnsSource bool

// Skip Archive Test if not supported by the storage backend
SkipArchiveTest bool

// List of tests which has to be skipped, it can be regex too.
SkipList []string

Expand Down Expand Up @@ -144,6 +150,35 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
}
}

func (s *StorageIntegration) testArchiveTrace(t *testing.T) {
s.skipIfNeeded(t)
if s.SkipArchiveTest {
t.Skip("Skipping ArchiveTrace test because archive reader or writer is nil")
}
defer s.cleanUp(t)
tID := model.NewTraceID(uint64(11), uint64(22))
expected := &model.Span{
OperationName: "archive_span",
StartTime: time.Now().Add(-time.Hour * 72 * 5).Truncate(time.Microsecond),
TraceID: tID,
SpanID: model.NewSpanID(55),
References: []model.SpanRef{},
Process: model.NewProcess("archived_service", model.KeyValues{}),
}

require.NoError(t, s.ArchiveSpanWriter.WriteSpan(context.Background(), expected))
s.refresh(t)

var actual *model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.ArchiveSpanReader.GetTrace(context.Background(), tID)
return err == nil && len(actual.Spans) == 1
})
require.True(t, found)
CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual)
}

func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {
s.skipIfNeeded(t)
defer s.cleanUp(t)
Expand Down Expand Up @@ -481,6 +516,7 @@ func (s *StorageIntegration) insertThroughput(t *testing.T) {
// IntegrationTestAll runs all integration tests
func (s *StorageIntegration) IntegrationTestAll(t *testing.T) {
t.Run("GetServices", s.testGetServices)
t.Run("ArchiveTrace", s.testArchiveTrace)
t.Run("GetOperations", s.testGetOperations)
t.Run("GetTrace", s.testGetTrace)
t.Run("GetLargeSpans", s.testGetLargeSpan)
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (s *KafkaIntegrationTestSuite) initialize() error {
s.SpanReader = &ingester{traceStore}
s.Refresh = func() error { return nil }
s.CleanUp = func() error { return nil }
s.SkipArchiveTest = true
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions plugin/storage/integration/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ func (s *MemStorageIntegrationTestSuite) initialize() error {
s.logger, _ = testutils.NewLogger()

store := memory.NewStore()
archiveStore := memory.NewStore()
s.SamplingStore = memory.NewSamplingStore(2)
s.SpanReader = store
s.SpanWriter = store
s.ArchiveSpanReader = archiveStore
s.ArchiveSpanWriter = archiveStore

// TODO DependencyWriter is not implemented in memory store

Expand Down

0 comments on commit f934846

Please sign in to comment.