Skip to content

Commit

Permalink
Merge pull request #78 from xataio/update-translator-version-finder
Browse files Browse the repository at this point in the history
Allow to define a version finder that uses LSN
  • Loading branch information
eminano authored Oct 1, 2024
2 parents 3b95c15 + 35e08cb commit 6faa86d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 16 deletions.
29 changes: 20 additions & 9 deletions pkg/wal/processor/translator/wal_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Translator struct {
skipSchemaEvent schemaEventFilter
schemaLogStore schemalog.Store
idFinder columnFinder
versionFinder columnFinder
versionFinder columnFinderWithErr
}

type walToLogEntryAdapter func(*wal.Data) (*schemalog.LogEntry, error)
Expand All @@ -41,13 +41,16 @@ type Config struct {
// configurable filters that allow the user of this library to have flexibility
// when processing and translating the wal event data
type (
dataEventFilter func(*wal.Data) bool
schemaEventFilter func(*schemalog.LogEntry) bool
columnFinder func(*schemalog.Column, *schemalog.Table) bool
dataEventFilter func(*wal.Data) bool
schemaEventFilter func(*schemalog.LogEntry) bool
columnFinder func(*schemalog.Column, *schemalog.Table) bool
columnFinderWithErr func(*schemalog.Column, *schemalog.Table) (bool, error)
)

type Option func(t *Translator)

var ErrUseLSN = errors.New("use LSN as event version")

// New will return a translator processor wrapper that will inject pgstream
// metadata into the wal data events before passing them over to the processor
// on input. By default, all schemas are processed and the pgstream identity
Expand Down Expand Up @@ -86,7 +89,7 @@ func WithIDFinder(idFinder columnFinder) Option {
}
}

func WithVersionFinder(versionFinder columnFinder) Option {
func WithVersionFinder(versionFinder columnFinderWithErr) Option {
return func(t *Translator) {
t.versionFinder = versionFinder
}
Expand Down Expand Up @@ -231,10 +234,18 @@ func (t *Translator) fillEventMetadata(event *wal.Data, log *schemalog.LogEntry,
continue
}

if t.versionFinder != nil && t.versionFinder(col, tbl) && !foundVersion {
foundVersion = true
event.Metadata.InternalColVersion = col.PgstreamID
continue
if t.versionFinder != nil && !foundVersion {
isVersionCol, err := t.versionFinder(col, tbl)
if err != nil && errors.Is(err, ErrUseLSN) {
foundVersion = true
event.Metadata.InternalColVersion = ""
continue
}
if isVersionCol {
foundVersion = true
event.Metadata.InternalColVersion = col.PgstreamID
continue
}
}
}

Expand Down
33 changes: 26 additions & 7 deletions pkg/wal/processor/translator/wal_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestTranslator_ProcessWALEvent(t *testing.T) {
skipDataEvent: func(d *wal.Data) bool { return false },
skipSchemaEvent: func(*schemalog.LogEntry) bool { return false },
idFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-1" },
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-2" },
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return c.Name == "col-2", nil },
walToLogEntryAdapter: func(d *wal.Data) (*schemalog.LogEntry, error) { return testLogEntry, nil },
}

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestTranslator_translate(t *testing.T) {
data *wal.Data
store schemalog.Store
idFinder columnFinder
versionFinder columnFinder
versionFinder columnFinderWithErr

wantData *wal.Data
wantErr error
Expand All @@ -249,7 +249,7 @@ func TestTranslator_translate(t *testing.T) {
},
data: newTestDataEvent("I").Data,
idFinder: primaryKeyFinder,
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-2" },
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return c.Name == "col-2", nil },

wantData: newTestDataEventWithMetadata("I").Data,
wantErr: nil,
Expand All @@ -264,7 +264,7 @@ func TestTranslator_translate(t *testing.T) {
},
data: newTestDataEvent("I").Data,
idFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-1" },
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-2" },
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return c.Name == "col-2", nil },

wantData: newTestDataEventWithMetadata("I").Data,
wantErr: nil,
Expand All @@ -287,6 +287,25 @@ func TestTranslator_translate(t *testing.T) {
}(),
wantErr: nil,
},
{
name: "ok - version finder provided with use LSN error",
store: &schemalogmocks.Store{
FetchFn: func(ctx context.Context, schemaName string, ackedOnly bool) (*schemalog.LogEntry, error) {
require.Equal(t, testSchemaName, schemaName)
return newTestLogEntry(), nil
},
},
data: newTestDataEvent("I").Data,
idFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-1" },
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return false, ErrUseLSN },

wantData: func() *wal.Data {
d := newTestDataEventWithMetadata("I").Data
d.Metadata.InternalColVersion = ""
return d
}(),
wantErr: nil,
},
{
name: "error - fetching schema log entry",
store: &schemalogmocks.Store{
Expand Down Expand Up @@ -342,7 +361,7 @@ func TestTranslator_translate(t *testing.T) {
},
data: newTestDataEvent("I").Data,
idFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return false },
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return false },
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return false, nil },

wantData: func() *wal.Data {
d := newTestDataEvent("I").Data
Expand All @@ -364,7 +383,7 @@ func TestTranslator_translate(t *testing.T) {
},
data: newTestDataEvent("I").Data,
idFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-1" },
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return false },
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return false, nil },

wantData: func() *wal.Data {
d := newTestDataEvent("I").Data
Expand Down Expand Up @@ -393,7 +412,7 @@ func TestTranslator_translate(t *testing.T) {
return d
}(),
idFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-1" },
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-2" },
versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return c.Name == "col-2", nil },

wantData: func() *wal.Data {
d := newTestDataEventWithMetadata("I").Data
Expand Down

0 comments on commit 6faa86d

Please sign in to comment.