Skip to content

Commit

Permalink
Merge pull request #44 from xataio/add-comments
Browse files Browse the repository at this point in the history
Add comments
  • Loading branch information
eminano authored Jun 20, 2024
2 parents 72e3912 + 21cc4cf commit 792f6ab
Show file tree
Hide file tree
Showing 21 changed files with 79 additions and 17 deletions.
4 changes: 4 additions & 0 deletions pkg/stream/stream_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const (
pgstreamSchema = "pgstream"
)

// Init initialises the pgstream state in the postgres database provided, along
// with creating the relevant replication slot.
func Init(ctx context.Context, pgURL string) error {
conn, err := newPGConn(ctx, pgURL)
if err != nil {
Expand Down Expand Up @@ -55,6 +57,8 @@ func Init(ctx context.Context, pgURL string) error {
return nil
}

// TearDown removes the pgstream state from the postgres database provided,
// as well as removing the replication slot.
func TearDown(ctx context.Context, pgURL string) error {
conn, err := newPGConn(ctx, pgURL)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/stream/stream_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"golang.org/x/sync/errgroup"
)

// Start will start the configured pgstream processes. This call is blocking.
func Start(ctx context.Context, logger loglib.Logger, config *Config, meter metric.Meter) error {
if err := config.IsValid(); err != nil {
return fmt.Errorf("incompatible configuration: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/wal/checkpointer/kafka/wal_kafka_checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/xataio/pgstream/pkg/wal"
)

// Checkpointer is a kafka implementation of the wal checkpointer. It commits
// the message offsets to kafka.
type Checkpointer struct {
committer msgCommitter
backoffProvider backoff.Provider
Expand All @@ -32,6 +34,8 @@ type msgCommitter interface {

type Option func(c *Checkpointer)

// New returns a kafka checkpointer that commits the message offsets to kafka by
// partition/topic on demand.
func New(ctx context.Context, cfg Config, opts ...Option) (*Checkpointer, error) {
c := &Checkpointer{
logger: loglib.NewNoopLogger(),
Expand Down
3 changes: 3 additions & 0 deletions pkg/wal/checkpointer/postgres/wal_pg_checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
pgreplication "github.com/xataio/pgstream/pkg/wal/replication/postgres"
)

// Checkpointer is a postgres implementation of a wal checkpointer. It syncs the
// LSN to postgres.
type Checkpointer struct {
syncer lsnSyncer
parser replication.LSNParser
Expand All @@ -24,6 +26,7 @@ type lsnSyncer interface {
Close() error
}

// New returns a postgres checkpointer that syncs the LSN to postgres on demand.
func New(syncer lsnSyncer) *Checkpointer {
return &Checkpointer{
syncer: syncer,
Expand Down
3 changes: 3 additions & 0 deletions pkg/wal/listener/kafka/wal_kafka_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/xataio/pgstream/pkg/wal"
)

// Reader is a kafka reader that listens to wal events.
type Reader struct {
reader kafkaReader
unmarshaler func([]byte, any) error
Expand All @@ -37,6 +38,8 @@ type payloadProcessor func(context.Context, *wal.Event) error

type Option func(*Reader)

// NewReader returns a kafka reader that listens to wal events and calls the
// processor on input.
func NewReader(config ReaderConfig, processRecord payloadProcessor, opts ...Option) (*Reader, error) {
r := &Reader{
logger: loglib.NewNoopLogger(),
Expand Down
1 change: 1 addition & 0 deletions pkg/wal/listener/wal_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package listener

import "context"

// Listener represents a process that listens to WAL events.
type Listener interface {
Listen(ctx context.Context) error
Close() error
Expand Down
2 changes: 2 additions & 0 deletions pkg/wal/processor/kafka/wal_kafka_batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"go.opentelemetry.io/otel/metric"
)

// BatchWriter is a kafka writer that uses batches to send the data to the
// configured kafka topic.
type BatchWriter struct {
writer kafka.MessageWriter
logger loglib.Logger
Expand Down
7 changes: 1 addition & 6 deletions pkg/wal/processor/search/opensearch/opensearch_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/xataio/pgstream/pkg/wal/processor/search"
)

// Adapter converts from/to search types and opensearch types
type Adapter interface {
SchemaNameToIndex(schemaName string) IndexName
IndexToSchemaName(index string) string
Expand All @@ -25,12 +26,6 @@ type adapter struct {
unmarshaler func([]byte, any) error
}

const (
// OpenSearch has a limit of 512 bytes for the ID field. see here:
// https://www.elastic.co/guide/en/elasticsearch/reference/7.10/mapping-id-field.html
osIDFieldLengthLimit = 512
)

func newDefaultAdapter() *adapter {
return &adapter{
marshaler: json.Marshal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
)

// IndexName represents an opensearch index name constructed from a schema name.
type IndexName interface {
Name() string
Version() int
Expand Down
5 changes: 4 additions & 1 deletion pkg/wal/processor/search/opensearch/opensearch_pg_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,15 @@ const (
dateFormat = "2006-01-02"
)

// NewPostgresMapper returns a mapper that maps between postgres and opensearch
// types
func NewPostgresMapper() *Mapper {
return &Mapper{
pgTypeMap: pgtype.NewMap(),
}
}

// ColumnToSearchMapping maps the column on input into the equivalent search mapping
func (m *Mapper) ColumnToSearchMapping(column schemalog.Column) (map[string]any, error) {
searchField, err := m.columnToSearchField(column)
if err != nil {
Expand Down Expand Up @@ -114,7 +117,7 @@ func (m *Mapper) ColumnToSearchMapping(column schemalog.Column) (map[string]any,
}
}

// mapColumnValue maps a value emitted from PG into a value that OS can handle.
// MapColumnValue maps a value emitted from PG into a value that OS can handle.
// If the column is a timestamp: we need to parse it.
// If the column is an array of any type except json, we need to map it to a Go slice.
// If column type is unknown we return nil. This avoids dropping the whole record if one field type is unknown.
Expand Down
4 changes: 4 additions & 0 deletions pkg/wal/processor/search/opensearch/opensearch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const (
openSearchDefaultEFConstruction = 256
openSearchDefaultEFSearch = 100

// OpenSearch has a limit of 512 bytes for the ID field. see here:
// https://www.elastic.co/guide/en/elasticsearch/reference/7.10/mapping-id-field.html
osIDFieldLengthLimit = 512

schemalogIndexName = "pgstream"
)

Expand Down
1 change: 1 addition & 0 deletions pkg/wal/processor/search/search_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/xataio/pgstream/pkg/wal/replication"
)

// walAdapter converts wal events to search messages
type walAdapter interface {
walEventToMsg(*wal.Event) (*msg, error)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/wal/processor/search/search_schema_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type store interface {
DeleteSchema(ctx context.Context, schemaName string) error
}

// schemaCleaner takes care of deleting schemas from the search store
// asynchronously
type schemaCleaner struct {
logger loglib.Logger
deleteSchemaQueue chan string
Expand Down
3 changes: 3 additions & 0 deletions pkg/wal/processor/search/search_store_retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/xataio/pgstream/pkg/schemalog"
)

// StoreRetrier applies a retry strategy to failed search store operations.
type StoreRetrier struct {
inner Store
logger loglib.Logger
Expand Down Expand Up @@ -72,6 +73,8 @@ func (s *StoreRetrier) DeleteTableDocuments(ctx context.Context, schemaName stri
return s.inner.DeleteTableDocuments(ctx, schemaName, tableIDs)
}

// SendDocuments will go over failed documents, identifying any with retriable
// errors and retrying them with the configured backoff policy.
func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error) {
docsToSend := docs
failedDocs := []DocumentError{}
Expand Down
13 changes: 8 additions & 5 deletions pkg/wal/processor/translator/wal_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"github.com/xataio/pgstream/pkg/wal/processor"
)

// Translator is a decorator around a processor that populates the wal data with
// the schemalog entry for the relevant schema (such as pgstream ids). This
// allows following processors to have more information for processing the event
// Translator is a decorator around a wal processor that populates the wal
// metadata with the schemalog entry for the relevant schema. This allows
// following processors to have more information for processing the event
// effectively.
type Translator struct {
logger loglib.Logger
Expand Down Expand Up @@ -45,8 +45,9 @@ type (
type Option func(t *Translator)

// New will return a translator processor wrapper that will inject pgstream
// metadata into the wal data events before passing them over the processor on
// input. By default, all schemas are processed.
// metadata into the wal data events before passing them over to the processor
// on input. By default, all schemas are processed and the pgstream identity
// will be the primary key/not null unique column if present.
func New(cfg *Config, p processor.Processor, opts ...Option) (*Translator, error) {
var schemaLogStore schemalog.Store
var err error
Expand Down Expand Up @@ -98,6 +99,8 @@ func WithLogger(l loglib.Logger) Option {
}
}

// ProcessWALEvent populates the metadata of the wal event on input, before
// passing it over to the configured wal processor.
func (t *Translator) ProcessWALEvent(ctx context.Context, event *wal.Event) error {
if event.Data == nil {
return t.processor.ProcessWALEvent(ctx, event)
Expand Down
6 changes: 6 additions & 0 deletions pkg/wal/processor/wal_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/xataio/pgstream/pkg/wal"
)

// Processor is a general interface to receive and process a wal event
type Processor interface {
ProcessWALEvent(ctx context.Context, walEvent *wal.Event) error
Name() string
Expand All @@ -22,10 +23,15 @@ var (
ErrIncompatibleWalData = errors.New("wal data event is not a schema log entry")
)

// IsSchemaLogEvent will return true if the wal event data originates from the
// pgstream schema and the pgstream schema_log table.
func IsSchemaLogEvent(d *wal.Data) bool {
return d.Schema == schemalog.SchemaName && d.Table == schemalog.TableName
}

// WalDataToLogEntry will convert the wal event data on input into the
// equivalent schemalog entry. It will return an error if the wal event data is
// not from the schema log table.
func WalDataToLogEntry(d *wal.Data) (*schemalog.LogEntry, error) {
if !IsSchemaLogEvent(d) {
return nil, ErrIncompatibleWalData
Expand Down
1 change: 1 addition & 0 deletions pkg/wal/replication/postgres/pg_lsn_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/xataio/pgstream/pkg/wal/replication"
)

// LSNParser is the postgres implementation of the replication.LSNParser
type LSNParser struct{}

func NewLSNParser() *LSNParser {
Expand Down
15 changes: 12 additions & 3 deletions pkg/wal/replication/postgres/pg_replication_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/xataio/pgstream/pkg/wal/replication"
)

// Handler handles the postgres replication slot operations
type Handler struct {
logger loglib.Logger

Expand All @@ -39,6 +40,7 @@ const (
logSystemID = "system_id"
)

// NewHandler returns a new postgres replication handler for the database on input.
func NewHandler(ctx context.Context, cfg Config, opts ...Option) (*Handler, error) {
pgCfg, err := pgx.ParseConfig(cfg.PostgresURL)
if err != nil {
Expand Down Expand Up @@ -78,6 +80,10 @@ func WithLogger(l loglib.Logger) Option {
}
}

// StartReplication will start the replication process on the configured
// replication slot. It will check for the last synced LSN
// (confirmed_flush_lsn), and if there isn't one, it will start replication from
// the restart_lsn position.
func (h *Handler) StartReplication(ctx context.Context) error {
sysID, err := pglogrepl.IdentifySystem(ctx, h.pgReplicationConn)
if err != nil {
Expand Down Expand Up @@ -112,9 +118,6 @@ func (h *Handler) StartReplication(ctx context.Context) error {
})

if startPos == 0 {
// todo(deverts): If we don't have a position. Read from as early as possible.
// this _could_ be too old. In the future, it would be good to calculate if we're
// too far behind, so we can fix it.
startPos, err = h.getRestartLSN(ctx, conn, h.pgReplicationSlotName)
if err != nil {
return fmt.Errorf("get restart LSN: %w", err)
Expand Down Expand Up @@ -147,6 +150,8 @@ func (h *Handler) StartReplication(ctx context.Context) error {
return h.SyncLSN(ctx, startPos)
}

// ReceiveMessage will listen for messages from the WAL. It returns an error if
// an unexpected message is received.
func (h *Handler) ReceiveMessage(ctx context.Context) (replication.Message, error) {
msg, err := h.pgReplicationConn.ReceiveMessage(ctx)
if err != nil {
Expand Down Expand Up @@ -198,6 +203,9 @@ func (h *Handler) SyncLSN(ctx context.Context, lsn replication.LSN) error {
return nil
}

// GetReplicationLag will return the consumer current replication lag. This
// value is different from the postgres replication lag, which takes into
// consideration all consumers and ongoing transactions.
func (h *Handler) GetReplicationLag(ctx context.Context) (int64, error) {
conn, err := h.pgConnBuilder()
if err != nil {
Expand All @@ -214,6 +222,7 @@ func (h *Handler) GetReplicationLag(ctx context.Context) (int64, error) {
return lag, nil
}

// GetLSNParser returns a postgres implementation of the LSN parser.
func (h *Handler) GetLSNParser() replication.LSNParser {
return h.lsnParser
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/wal/replication/postgres/pg_replication_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/xataio/pgstream/pkg/wal/replication"
)

// PrimaryKeepAliveMessage contains no wal data and a flag to indicate if a
// response is requested along with the message metadata (lsn and server time).
type PrimaryKeepAliveMessage pglogrepl.PrimaryKeepaliveMessage

func (pka *PrimaryKeepAliveMessage) GetData() *replication.MessageData {
Expand All @@ -18,6 +20,8 @@ func (pka *PrimaryKeepAliveMessage) GetData() *replication.MessageData {
}
}

// XLogDataMessage contains the wal data along with the message metadata (lsn
// and server time)
type XLogDataMessage pglogrepl.XLogData

func (xld *XLogDataMessage) GetData() *replication.MessageData {
Expand Down
3 changes: 3 additions & 0 deletions pkg/wal/replication/replication_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"
)

// Handler manages the replication operations
type Handler interface {
StartReplication(ctx context.Context) error
ReceiveMessage(ctx context.Context) (Message, error)
Expand All @@ -22,13 +23,15 @@ type Message interface {
GetData() *MessageData
}

// MessageData is the common data for all replication messages
type MessageData struct {
LSN LSN
Data []byte
ServerTime time.Time
ReplyRequested bool
}

// LSNParser handles the LSN type conversion
type LSNParser interface {
ToString(LSN) string
FromString(string) (LSN, error)
Expand Down
Loading

0 comments on commit 792f6ab

Please sign in to comment.