Skip to content

Commit

Permalink
Merge pull request #83 from xataio/handle-schema-delete-synchronously
Browse files Browse the repository at this point in the history
Delete opensearch schema synchronously
  • Loading branch information
eminano authored Oct 24, 2024
2 parents 525af67 + b19bab9 commit 6c2d7cf
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 336 deletions.
29 changes: 12 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,18 @@ One of exponential/constant backoff policies can be provided for the Kafka commi
<details>
<summary>Search Batch Indexer</summary>

| Environment Variable | Default | Required | Description |
| ------------------------------------------------------------ | ------- | -------- | -------------------------------------------------------------------------------------------------------------- |
| PGSTREAM_OPENSEARCH_STORE_URL | N/A | Yes | URL for the opensearch store to connect to (at least one of the URLs must be provided). |
| PGSTREAM_ELASTICSEARCH_STORE_URL | N/A | Yes | URL for the elasticsearch store to connect to (at least one of the URLs must be provided). |
| PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to the search store is triggered. |
| PGSTREAM_SEARCH_INDEXER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to the search store. |
| PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the search batch indexer for inflight batches. |
| PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_INITIAL_INTERVAL | 0 | No | Initial interval for the exponential backoff policy to be applied to the search indexer cleanup retries. |
| PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_INTERVAL | 0 | No | Max interval for the exponential backoff policy to be applied to the search indexer cleanup retries. |
| PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search indexer cleanup retries. |
| PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search indexer cleanup retries. |
| PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search indexer cleanup retries. |
| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_INITIAL_INTERVAL | 1s | No | Initial interval for the exponential backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_INTERVAL | 1min | No | Max interval for the exponential backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search store operation retries. |
| Environment Variable | Default | Required | Description |
| -------------------------------------------------- | ------- | -------- | -------------------------------------------------------------------------------------------------------------- | --- |
| PGSTREAM_OPENSEARCH_STORE_URL | N/A | Yes | URL for the opensearch store to connect to (at least one of the URLs must be provided). |
| PGSTREAM_ELASTICSEARCH_STORE_URL | N/A | Yes | URL for the elasticsearch store to connect to (at least one of the URLs must be provided). |
| PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to the search store is triggered. |
| PGSTREAM_SEARCH_INDEXER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to the search store. |
| PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the search batch indexer for inflight batches. | |
| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_INITIAL_INTERVAL | 1s | No | Initial interval for the exponential backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_INTERVAL | 1min | No | Max interval for the exponential backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search store operation retries. |

One of exponential/constant backoff policies can be provided for the search indexer cleanup retry strategy. If none is provided, no retries apply.

Expand Down
7 changes: 3 additions & 4 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,9 @@ func parseSearchProcessorConfig() *stream.SearchProcessorConfig {

return &stream.SearchProcessorConfig{
Indexer: search.IndexerConfig{
BatchSize: viper.GetInt("PGSTREAM_SEARCH_INDEXER_BATCH_SIZE"),
BatchTime: viper.GetDuration("PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT"),
MaxQueueBytes: viper.GetInt64("PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES"),
CleanupBackoff: parseBackoffConfig("PGSTREAM_SEARCH_INDEXER_CLEANUP"),
BatchSize: viper.GetInt("PGSTREAM_SEARCH_INDEXER_BATCH_SIZE"),
BatchTime: viper.GetDuration("PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT"),
MaxQueueBytes: viper.GetInt64("PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES"),
},
Store: store.Config{
OpenSearchURL: opensearchStore,
Expand Down
3 changes: 0 additions & 3 deletions kafka2os.env
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ PGSTREAM_KAFKA_COMMIT_BACKOFF_MAX_RETRIES=60
# Processor config
PGSTREAM_SEARCH_INDEXER_BATCH_SIZE=100
PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT=5s
PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_INITIAL_INTERVAL=1s
PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_MAX_INTERVAL=1m
PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_MAX_RETRIES=60
PGSTREAM_OPENSEARCH_STORE_URL="http://admin:admin@localhost:9200"
PGSTREAM_SEARCH_STORE_BACKOFF_INITIAL_INTERVAL=1s
PGSTREAM_SEARCH_STORE_BACKOFF_MAX_INTERVAL=1m
Expand Down
3 changes: 0 additions & 3 deletions pg2os.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost?sslmode=d
PGSTREAM_TRANSLATOR_STORE_POSTGRES_URL="postgres://postgres:postgres@localhost?sslmode=disable"
PGSTREAM_SEARCH_INDEXER_BATCH_SIZE=100
PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT=5s
PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_INITIAL_INTERVAL=1s
PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_INTERVAL=1m
PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_RETRIES=60
PGSTREAM_OPENSEARCH_STORE_URL="http://admin:admin@localhost:9200"
PGSTREAM_SEARCH_STORE_EXP_BACKOFF_INITIAL_INTERVAL=1s
PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_INTERVAL=1m
Expand Down
5 changes: 0 additions & 5 deletions pkg/wal/processor/search/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ package search

import (
"time"

"github.com/xataio/pgstream/pkg/backoff"
)

type IndexerConfig struct {
Expand All @@ -18,9 +16,6 @@ type IndexerConfig struct {
// MaxQueueBytes is the max memory used by the batch indexer for inflight
// batches. Defaults to 100MiB
MaxQueueBytes int64
// CleanupBackoff is the retry policy to follow for the async index
// deletion. If no config is provided, no retry policy is applied.
CleanupBackoff backoff.Config
}

const (
Expand Down
24 changes: 4 additions & 20 deletions pkg/wal/processor/search/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ func (m *mockAdapter) walEventToMsg(e *wal.Event) (*msg, error) {
type mockStore struct {
getMapperFn func() Mapper
applySchemaChangeFn func(ctx context.Context, le *schemalog.LogEntry) error
deleteSchemaFn func(ctx context.Context, schemaName string) error
deleteSchemaFn func(ctx context.Context, i uint, schemaName string) error
deleteTableDocumentsFn func(ctx context.Context, schemaName string, tableIDs []string) error
sendDocumentsFn func(ctx context.Context, i uint, docs []Document) ([]DocumentError, error)
sendDocumentsCalls uint
deleteSchemaCalls uint
}

func (m *mockStore) GetMapper() Mapper {
Expand All @@ -39,7 +40,8 @@ func (m *mockStore) ApplySchemaChange(ctx context.Context, le *schemalog.LogEntr
}

func (m *mockStore) DeleteSchema(ctx context.Context, schemaName string) error {
return m.deleteSchemaFn(ctx, schemaName)
m.deleteSchemaCalls++
return m.deleteSchemaFn(ctx, m.deleteSchemaCalls, schemaName)
}

func (m *mockStore) DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error {
Expand All @@ -51,24 +53,6 @@ func (m *mockStore) SendDocuments(ctx context.Context, docs []Document) ([]Docum
return m.sendDocumentsFn(ctx, m.sendDocumentsCalls, docs)
}

type mockCleaner struct {
deleteSchemaFn func(context.Context, string) error
startFn func(context.Context)
stopFn func()
}

func (m *mockCleaner) deleteSchema(ctx context.Context, schema string) error {
return m.deleteSchemaFn(ctx, schema)
}

func (m *mockCleaner) start(ctx context.Context) {
m.startFn(ctx)
}

func (m *mockCleaner) stop() {
m.stopFn()
}

const (
testSchemaName = "test_schema"
testTableName = "test_table"
Expand Down
11 changes: 2 additions & 9 deletions pkg/wal/processor/search/search_batch_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ type BatchIndexer struct {

// checkpoint callback to mark what was safely stored
checkpoint checkpointer.Checkpoint

cleaner cleaner
}

type Option func(*BatchIndexer)
Expand All @@ -66,10 +64,6 @@ func NewBatchIndexer(ctx context.Context, config IndexerConfig, store Store, lsn
opt(indexer)
}

indexer.cleaner = newSchemaCleaner(&config.CleanupBackoff, store, indexer.logger)
// start a goroutine for processing schema deletes asynchronously.
// routine ends when the internal channel is closed.
go indexer.cleaner.start(ctx)
return indexer
}

Expand Down Expand Up @@ -191,7 +185,6 @@ func (i *BatchIndexer) Name() string {

func (i *BatchIndexer) Close() error {
close(i.msgChan)
i.cleaner.stop()
return nil
}

Expand Down Expand Up @@ -270,8 +263,8 @@ func (i *BatchIndexer) applySchemaChange(ctx context.Context, new *schemalog.Log
}

if new.Schema.Dropped {
if err := i.cleaner.deleteSchema(ctx, new.SchemaName); err != nil {
return fmt.Errorf("register schema for delete: %w", err)
if err := i.store.DeleteSchema(ctx, new.SchemaName); err != nil {
return fmt.Errorf("deleting schema: %w", err)
}
return nil
}
Expand Down
15 changes: 4 additions & 11 deletions pkg/wal/processor/search/search_batch_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ func TestBatchIndexer_sendBatch(t *testing.T) {
checkpoint checkpointer.Checkpoint
batch *msgBatch
skipSchema func(string) bool
cleaner cleaner

wantErr error
}{
Expand Down Expand Up @@ -517,9 +516,8 @@ func TestBatchIndexer_sendBatch(t *testing.T) {
},
positions: []wal.CommitPosition{testCommitPos},
},
store: &mockStore{},
cleaner: &mockCleaner{
deleteSchemaFn: func(ctx context.Context, s string) error {
store: &mockStore{
deleteSchemaFn: func(ctx context.Context, _ uint, s string) error {
require.Equal(t, testSchemaName, s)
return nil
},
Expand Down Expand Up @@ -557,9 +555,8 @@ func TestBatchIndexer_sendBatch(t *testing.T) {
},
positions: []wal.CommitPosition{testCommitPos},
},
store: &mockStore{},
cleaner: &mockCleaner{
deleteSchemaFn: func(ctx context.Context, s string) error {
store: &mockStore{
deleteSchemaFn: func(ctx context.Context, _ uint, s string) error {
return errTest
},
},
Expand Down Expand Up @@ -680,10 +677,6 @@ func TestBatchIndexer_sendBatch(t *testing.T) {
indexer.skipSchema = tc.skipSchema
}

if tc.cleaner != nil {
indexer.cleaner = tc.cleaner
}

err := indexer.sendBatch(context.Background(), tc.batch)
require.ErrorIs(t, err, tc.wantErr)
})
Expand Down
103 changes: 0 additions & 103 deletions pkg/wal/processor/search/search_schema_cleaner.go

This file was deleted.

Loading

0 comments on commit 6c2d7cf

Please sign in to comment.