From 319b64dc08d5116742c4b67dc76aef25d16ccbdb Mon Sep 17 00:00:00 2001 From: Juan Hernandez Date: Mon, 29 May 2023 12:15:43 +0200 Subject: [PATCH] WIP: Database change queue This patch adds an object that configures a database to record changes in a table, and process them in order. Signed-off-by: Juan Hernandez --- database/change_queue.go | 831 ++++++++++++++++++++++++++++++++++ database/change_queue_test.go | 388 ++++++++++++++++ database/main_test.go | 32 +- examples/change_queue.go | 120 +++++ leadership/main_test.go | 2 +- testing/database.go | 11 +- 6 files changed, 1378 insertions(+), 6 deletions(-) create mode 100644 database/change_queue.go create mode 100644 database/change_queue_test.go create mode 100644 examples/change_queue.go diff --git a/database/change_queue.go b/database/change_queue.go new file mode 100644 index 00000000..6aeec7d7 --- /dev/null +++ b/database/change_queue.go @@ -0,0 +1,831 @@ +/* +Copyright (c) 2021 Red Hat, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package database + +import ( + "bytes" + "context" + "errors" + "fmt" + "sync/atomic" + "text/template" + "time" + + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" + + "github.com/openshift-online/ocm-sdk-go/logging" +) + +// ChangeQueueBuilder contains the data and logic needed to build a database change queue. +type ChangeQueueBuilder struct { + logger logging.Logger + url string + name string + tables []string + install bool + context func() context.Context + callback func(context.Context, *ChangeQueueItem) + interval time.Duration + timeout time.Duration +} + +// ChangeQueue is a mechanism to asynchronously process changes made to database tables. It uses +// triggers that write to a changes table the details of the changes made to other tables. It then +// waits for data written to that changes table and processes it using the callbaks given in the +// configuration. +type ChangeQueue struct { + // Basic fields: + logger logging.Logger + url string + name string + context func() context.Context + callback func(context.Context, *ChangeQueueItem) + waitIterval time.Duration + retryInterval time.Duration + timeout time.Duration + + // We use the flag to ask the loop to stop as soon as possible, and then the loop uses the + // channel to tell the close method that it actually finished. + closeFlag int32 + closeChan chan struct{} + + // Database connection used to receive notifications. + waitConn *pgx.Conn + + // Function to cancel the context used to run the current wait. + waitCancel func() + + // Database connection used to fetch changes. + fetchConn *pgx.Conn + + // Precalculated SQL for frequently used statements: + fetchSQL string + listenSQL string +} + +// ChangeQueueItem contains the data of one single change. +type ChangeQueueItem struct { + Serial int + Timestamp time.Time + Source string + Operation string + Old []byte + New []byte +} + +// changeQueueRow is used to read rows from the database. +type changeQueueRow struct { + Serial int + Timestamp time.Time + Source string + Operation string + Old []byte + New []byte +} + +// NewChangeQueue creates a builder that can then be used to configure and create a change queue. +// Note that the required database objects (table, functions and triggers) aren't automatically +// created by default. See the documentation of the Install method of the builder for details. +func NewChangeQueue() *ChangeQueueBuilder { + return &ChangeQueueBuilder{ + name: defaultChangeQueueName, + interval: defaultChangeQueueInterval, + timeout: defaultChangeQueueTimeout, + install: false, + context: context.Background, + } +} + +// Logger sets the logger that the queue will use to write to the log. This is mandatory. +func (b *ChangeQueueBuilder) Logger(value logging.Logger) *ChangeQueueBuilder { + b.logger = value + return b +} + +// URL sets the database URL that the queue will use connect to the database. This is mandatory. +func (b *ChangeQueueBuilder) URL(value string) *ChangeQueueBuilder { + b.url = value + return b +} + +// Name sets the name of the queue. This can be used to have different queues for different uses, or +// for different sets of tables, or for different environments that happen to share the database. +// The changes table, function and triggers created will have this name. The default is `changes`. +func (b *ChangeQueueBuilder) Name(value string) *ChangeQueueBuilder { + b.name = value + return b +} + +// Install enables or disables the creation of the database objects (tables, functions and triggers) +// needed by the queue. If set to true the objects will be automatically created if they don't +// exist. +// +// If set to false then you will need to manually create the following objects: +// +// 1. The table where the changes are stored. For example, if the name of the queue is `my_queue`: +// +// create table if not exists my_queue ( +// serial serial not null primary key, +// timestamp timestamp with time zone not null default now(), +// source text, +// operation text, +// old jsonb, +// new jsonb +// ); +// +// 2. The trigger function that will be called by the triggers to write changes to the table: +// +// create or replace function my_queue_save() returns trigger as $$ +// begin +// insert into my_queue ( +// source, +// operation, +// old, +// new +// ) values ( +// tg_table_name, +// lower(tg_op), +// row_to_json(old.*), +// row_to_json(new.*) +// ); +// return null; +// end; +// $$ language plpgsql; +// +// 3. The trigger function that will be called by triggers to send notifications: +// +// create or replace function my_queue_notify() returns trigger as $$ +// begin +// notify my_queue; +// return null; +// end; +// $$ language plpgsql; +// +// 4. For each data table the trigger that calls the save function. For example, if the name of the +// data table is `my_data`: +// +// drop trigger if exists my_data_my_queue_save on my_data; +// create trigger my_data_my_queue_save +// after insert or update or delete on my_data +// for each row execute function my_queue_save(); +// +// 5. For each data table the trigger that calls the notify function: +// +// create trigger my_data_my_queue_notify +// after insert or update or delete on my_data +// execute function my_queue_notify(); +// +// The default value is false. +func (b *ChangeQueueBuilder) Install(value bool) *ChangeQueueBuilder { + b.install = value + return b +} + +// Table adds a table that will be configured so that changes are written to the change queue. +func (b *ChangeQueueBuilder) Table(value string) *ChangeQueueBuilder { + b.tables = append(b.tables, value) + return b +} + +// Table adds a collection of tables that will be configured so that changes are written to the +// changes queue. +func (b *ChangeQueueBuilder) Tables(values ...string) *ChangeQueueBuilder { + b.tables = append(b.tables, values...) + return b +} + +// Context sets a function that the queue will use to create contexts. The default is to create +// contexts using the context.Background function. +func (b *ChangeQueueBuilder) Context(value func() context.Context) *ChangeQueueBuilder { + b.context = value + return b +} + +// Callback sets the function that will be called to process changes. This function will be called +// in the same goroutine that reads the item from the database, so processing of later changes will +// not proceed till this returns. +func (b *ChangeQueueBuilder) Callback( + value func(context.Context, *ChangeQueueItem)) *ChangeQueueBuilder { + b.callback = value + return b +} + +// Interval sets the iterval for periodic checks. Usually changes from the queue will be processed +// inmediately because the database sends notifications when new changes are available. If that +// fails, for whatever the raeson, changes will be processed after this interval. Default value is +// 30 seconds. +func (b *ChangeQueueBuilder) Interval(value time.Duration) *ChangeQueueBuilder { + b.interval = value + return b +} + +// Timeout sets the timeout for database operations. The default is one second. +func (b *ChangeQueueBuilder) Timeout(value time.Duration) *ChangeQueueBuilder { + b.timeout = value + return b +} + +// Build uses the data stored in the builder to configure and create a new change queue. +func (b *ChangeQueueBuilder) Build(ctx context.Context) (result *ChangeQueue, err error) { + // Check parameters: + if b.logger == nil { + err = errors.New("logger is mandatory") + return + } + if b.url == "" { + err = errors.New("database URL is mandatory") + return + } + if b.name == "" { + err = errors.New("name is mandatory") + return + } + if b.context == nil { + err = errors.New("context function is mandatory") + return + } + if b.callback == nil { + err = errors.New("callback function is mandatory") + return + } + if b.interval <= 0 { + err = fmt.Errorf( + "check interval %s isn't valid, should be greater or equal than zero", + b.interval, + ) + return + } + if b.timeout <= 0 { + err = fmt.Errorf( + "timeout %s isn't valid, should be greater or equal than zero", + b.timeout, + ) + return + } + + // Create the database objects if needed. Note that if the install flag is false this will + // only write the SQL code to the log and will not try to create the objects. + err = b.createObjects(ctx) + if err != nil { + return + } + + // Calculate specific intervals from the general interval given in the configuration: + waitInterval := b.interval + retryInterval := b.interval / 10 + + // Calculate the SQL for frequently used statements: + fetchSQL, err := evaluateTemplate( + changeQueueFetchTemplate, + "Name", b.name, + ) + if err != nil { + return + } + listenSQL, err := evaluateTemplate( + changeQueueListenTemplate, + "Name", b.name, + ) + if err != nil { + return + } + + // Create and populate the object: + result = &ChangeQueue{ + logger: b.logger, + url: b.url, + name: b.name, + context: b.context, + callback: b.callback, + waitIterval: waitInterval, + retryInterval: retryInterval, + timeout: b.timeout, + fetchSQL: fetchSQL, + listenSQL: listenSQL, + closeFlag: 0, + closeChan: make(chan struct{}), + } + + // Start the loop: + go result.loop() + + return +} + +// createObjects creates the database objects needed by the queue: tables, triggers and functions. +func (b *ChangeQueueBuilder) createObjects(ctx context.Context) error { + var err error + + // Create the database connection to install database objects: + var conn *pgx.Conn + if b.install { + conn, err = pgx.Connect(ctx, b.url) + if err != nil { + return err + } + defer func() { + err := conn.Close(ctx) + if err != nil { + b.logger.Error(ctx, "Can't close connection: %v", err) + } + }() + } + + // Create the tables: + err = b.createTables(ctx, conn) + if err != nil { + return err + } + + // Create the functions: + err = b.createFunctions(ctx, conn) + if err != nil { + return err + } + + // Configure the tables: + for _, table := range b.tables { + err = b.configureTable(ctx, conn, table) + if err != nil { + return err + } + } + return nil +} + +// createTables creates the changes table if it doesn't already exist. +func (b *ChangeQueueBuilder) createTables(ctx context.Context, conn *pgx.Conn) error { + var err error + createTableSQL, err := evaluateTemplate( + changeQueueTableTemplate, + "Name", b.name, + ) + if err != nil { + return err + } + if b.install { + _, err = conn.Exec(ctx, createTableSQL) + if err != nil { + return err + } + } else { + b.logger.Info( + ctx, + "To create the changes table for queue '%s' run the "+ + "following SQL: %s", + b.name, createTableSQL, + ) + } + return nil +} + +// createFunctions creates the trigger functions if they don't already exist. +func (b *ChangeQueueBuilder) createFunctions(ctx context.Context, conn *pgx.Conn) error { + var err error + // Create the save function: + createSaveFunctionSQL, err := evaluateTemplate( + changeQueueSaveFunctionTemplate, + "Name", b.name, + ) + if err != nil { + return err + } + if b.install { + _, err = conn.Exec(ctx, createSaveFunctionSQL) + if err != nil { + return err + } + } else { + b.logger.Info( + ctx, + "To create the save function for queue '%s' run the "+ + "following SQL: %s", + b.name, createSaveFunctionSQL, + ) + } + + // Create the notify function: + createNotifyFunctionSQL, err := evaluateTemplate( + changeQueueNotifyFunctionTemplate, + "Name", b.name, + ) + if err != nil { + return err + } + if b.install { + _, err = conn.Exec(ctx, createNotifyFunctionSQL) + if err != nil { + return err + } + } else { + b.logger.Info( + ctx, + "To create the notify function for queue '%s' run the "+ + "following SQL: %s", + b.name, createNotifyFunctionSQL, + ) + } + + return nil +} + +// configureTable configures the given table with triggers that call the functions that save the +// changes to the changes table and send the notifications. +func (b *ChangeQueueBuilder) configureTable(ctx context.Context, conn *pgx.Conn, + table string) error { + // Create the save trigger: + createSaveTriggerSQL, err := evaluateTemplate( + changeQueueSaveTriggerTemplate, + "Name", b.name, + "Table", table, + ) + if err != nil { + return err + } + if b.install { + _, err = conn.Exec(ctx, createSaveTriggerSQL) + if err != nil { + return err + } + } else { + b.logger.Info( + ctx, + "To create the save trigger for queue '%s' and table '%s' run the "+ + "following SQL: %s", + b.name, table, createSaveTriggerSQL, + ) + } + + // Create the notify trigger: + createNotifyTriggerSQL, err := evaluateTemplate( + changeQueueNotifyTriggerTemplate, + "Name", b.name, + "Table", table, + ) + if err != nil { + return err + } + if b.install { + _, err = conn.Exec(ctx, createNotifyTriggerSQL) + if err != nil { + return err + } + } else { + b.logger.Info( + ctx, + "To create the notify trigger for queue '%s' and table '%s' run the "+ + "following SQL: %s", + b.name, table, createNotifyTriggerSQL, + ) + } + + return nil +} + +// loop runs the loop that waits for notifications from the database and processes the pending +// changes. +func (q *ChangeQueue) loop() { + // Create a context: + ctx := q.context() + + for !q.closing() { + // Check for pending changes: + q.check(ctx) + + // Wait for notifications. It is normal if this finishes with a timeout. Any other + // error isn't normal and we should wait a bit before trying again to avoid too many + // attempts when the error isn't resolved quickly. + err := q.wait(ctx) + if err != nil { + q.logger.Info( + ctx, + "Wait failed, will wait a bit before trying again: %v", + err, + ) + time.Sleep(q.retryInterval) + } + } + + // Let the close method know that we finished: + close(q.closeChan) +} + +// wait waits for the next notification from the database is received or the operation times out. +func (q *ChangeQueue) wait(ctx context.Context) error { + var err error + + // Start listening if needed: + if q.waitConn == nil { + q.waitConn, err = pgx.Connect(ctx, q.url) + if err != nil { + return err + } + _, err = q.waitConn.Exec(ctx, q.listenSQL) + if err != nil { + return err + } + } + + // Wait for a new notification. We set a timeout so that we will process pending changes + // periodically even if the notification mechanism fails. If the wait results in an error + // other than a timeout then we will close and discard the connection, so that we recover + // from database restarts and other errors that may make the connection unusable. + var waitCtx context.Context + waitCtx, q.waitCancel = context.WithTimeout(ctx, q.waitIterval) + defer q.waitCancel() + _, err = q.waitConn.WaitForNotification(waitCtx) + if pgconn.Timeout(err) { + err = nil + } + if err != nil { + q.logger.Debug(ctx, "Wait failed, will close the connection: %v", err) + closeErr := q.waitConn.Close(ctx) + if closeErr != nil { + q.logger.Info(ctx, "Can't close connection: %v", closeErr) + } + q.waitConn = nil + return err + } + + return nil +} + +// check checks the contents of the changes table and process them. +func (q *ChangeQueue) check(ctx context.Context) { + // Fetch and process all the available changes. + for !q.closing() { + // Fetch the next available row: + found, row, err := q.fetch(ctx) + if err != nil { + q.logger.Error(ctx, "Can't fetch change: %v", err) + return + } + if !found { + break + } + + // Process the change: + q.logger.Debug(ctx, "Processing change %d", row.Serial) + change := &ChangeQueueItem{ + Serial: row.Serial, + Timestamp: row.Timestamp, + Source: row.Source, + Operation: row.Operation, + Old: row.Old, + New: row.New, + } + q.callback(ctx, change) + } +} + +// fetch tries to read the next row from the changes table. It returns a boolean flag indicating if +// there was such a row and the row itself. +func (q *ChangeQueue) fetch(ctx context.Context) (found bool, result *changeQueueRow, + err error) { + // Create the connection if needed: + if q.fetchConn == nil { + q.fetchConn, err = pgx.Connect(ctx, q.url) + if err != nil { + return + } + } + + // Run the query: + queryCtx, queryCancel := context.WithTimeout(ctx, q.timeout) + defer queryCancel() + row := q.fetchConn.QueryRow(queryCtx, q.fetchSQL) + var tmp changeQueueRow + err = row.Scan( + &tmp.Serial, + &tmp.Timestamp, + &tmp.Source, + &tmp.Operation, + &tmp.Old, + &tmp.New, + ) + if errors.Is(err, pgx.ErrNoRows) { + err = nil + return + } + if pgconn.Timeout(err) { + err = nil + return + } + if err != nil { + q.logger.Debug(ctx, "Fetch failed, will close the connection: %v", err) + closeErr := q.fetchConn.Close(ctx) + if closeErr != nil { + q.logger.Info(ctx, "Can't close connection: %v", closeErr) + } + q.fetchConn = nil + err = nil + } + found = true + result = &tmp + return +} + +func evaluateTemplate(source string, args ...interface{}) (result string, + err error) { + // Check that there is an even number of args, and that the first of each pair is a string: + count := len(args) + if count%2 != 0 { + err = fmt.Errorf( + "template '%s' should have an even number of arguments, but it has %d", + source, count, + ) + return + } + for i := 0; i < count; i = i + 2 { + name := args[i] + _, ok := name.(string) + if !ok { + err = fmt.Errorf( + "argument %d of template '%s' is a key, so it should be a string, "+ + "but its type is %T", + i, source, name, + ) + return + } + } + + // Put the variables in the map that will be passed as the data object for the execution of + // the template: + data := make(map[string]interface{}) + for i := 0; i < count; i = i + 2 { + name := args[i].(string) + value := args[i+1] + data[name] = value + } + + // Parse the template: + tmpl, err := template.New("").Parse(source) + if err != nil { + err = fmt.Errorf("can't parse template '%s': %v", source, err) + return + } + + // Execute the template: + var buffer bytes.Buffer + err = tmpl.Execute(&buffer, data) + if err != nil { + err = fmt.Errorf("can't execute template '%s': %v", source, err) + return + } + + // Return the result: + result = buffer.String() + return +} + +// Close releases all the resources used by the queue. Note that the changes will continue to be +// written to the changes table even when the queue is closed, and that may continue to be processed +// by other queue objects running in a this or other processes. +func (q *ChangeQueue) Close() error { + // Create a context: + ctx := q.context() + + // Raise the closing flag so that the loop will finish as soon as it checks it: + q.close() + + // Cancel waiting and close the connection: + if q.waitCancel != nil { + q.waitCancel() + } + + // Wait for the loop to finish: + <-q.closeChan + + // Close the connection used for fetching changes: + if q.fetchConn != nil { + err := q.fetchConn.Close(ctx) + if err != nil { + q.logger.Info(ctx, "Can't close connection: %v", err) + } + } + + // Close the connection used for listening for notifications: + if q.waitConn != nil { + err := q.waitConn.Close(ctx) + if err != nil { + q.logger.Info(ctx, "Can't close connection: %v", err) + } + } + + return nil +} + +// close asks the loop to stop as soon as possible. +func (q *ChangeQueue) close() { + atomic.StoreInt32(&q.closeFlag, 1) +} + +// closing returns true if we are in the process of closing. +func (q *ChangeQueue) closing() bool { + return atomic.LoadInt32(&q.closeFlag) == 1 +} + +// template used to create the table. +const changeQueueTableTemplate = ` +create table if not exists {{ .Name }} ( + serial serial not null primary key, + timestamp timestamp with time zone not null default now(), + source text, + operation text, + old jsonb, + new jsonb +); +` + +// template used to create the save function. +const changeQueueSaveFunctionTemplate = ` +create or replace function {{ .Name }}_save() returns trigger as $$ +begin + insert into {{ .Name }} ( + source, + operation, + old, + new + ) values ( + tg_table_name, + lower(tg_op), + row_to_json(old.*), + row_to_json(new.*) + ); + return null; +end; +$$ language plpgsql; +` + +// template used to create the notify function. +const changeQueueNotifyFunctionTemplate = ` +create or replace function {{ .Name }}_notify() returns trigger as $$ +begin + notify {{ .Name }}; + return null; +end; +$$ language plpgsql; +` + +// template used to create the triggers that save changes to the changes table. +const changeQueueSaveTriggerTemplate = ` +drop trigger if exists {{ .Table }}_{{ .Name}}_save on {{ .Table }}; +create trigger {{ .Table }}_{{ .Name }}_save + after insert or update or delete on {{ .Table }} + for each row execute function {{ .Name }}_save(); +` + +// template used to create the triggers that send notifications. +const changeQueueNotifyTriggerTemplate = ` +drop trigger if exists {{ .Table }}_{{ .Name }}_notify on {{ .Table }}; +create trigger {{ .Table }}_{{ .Name }}_notify + after insert or update or delete on {{ .Table }} + execute function {{ .Name }}_notify(); +` + +// template used to fetch the next change. +const changeQueueFetchTemplate = ` +delete from {{ .Name }} where serial = ( + select serial + from {{ .Name }} + order by serial + for update + skip locked + limit 1 +) +returning + serial, + timestamp, + source, + operation, + old, + new +; +` + +// template used to listen for notifications. +const changeQueueListenTemplate = ` +listen {{ .Name }}; +` + +// Defaults for configuration settings: +const ( + defaultChangeQueueName = "changes" + defaultChangeQueueInterval = 30 * time.Second + defaultChangeQueueTimeout = 1 * time.Second +) diff --git a/database/change_queue_test.go b/database/change_queue_test.go new file mode 100644 index 00000000..7260d43e --- /dev/null +++ b/database/change_queue_test.go @@ -0,0 +1,388 @@ +/* +Copyright (c) 2021 Red Hat, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package database + +import ( + "context" + "database/sql" + "time" + + . "github.com/onsi/ginkgo/v2/dsl/core" // nolint + . "github.com/onsi/gomega" // nolint + . "github.com/openshift-online/ocm-sdk-go/testing" // nolint +) + +var _ = Describe("Change log behaviour", func() { + var ctx context.Context + var dbObject *Database + var dbURL string + var dbHandle *sql.DB + + BeforeEach(func() { + // Create a context: + ctx = context.Background() + + // Create a database: + dbObject = dbServer.MakeDatabase() + dbURL = dbObject.MakeURL() + dbHandle = dbObject.MakeHandle() + + // Create the data table: + _, err := dbHandle.Exec(` + create table my_data ( + id integer not null primary key, + name text not null + ) + `) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + // Close the database handle: + err := dbHandle.Close() + Expect(err).ToNot(HaveOccurred()) + + // Close the database server: + dbObject.Close() + }) + + // NopCallback is a callback that doesn nothing with the change. + var NopCallback = func(ctx context.Context, change *ChangeQueueItem) { + // Do nothing, as the name says. + } + + It("Can't be created without a logger", func() { + _, err := NewChangeQueue(). + URL(dbURL). + Install(true). + Callback(NopCallback). + Build(ctx) + Expect(err).To(HaveOccurred()) + message := err.Error() + Expect(message).To(ContainSubstring("logger")) + Expect(message).To(ContainSubstring("mandatory")) + }) + + It("Can't be created without a database URL", func() { + _, err := NewChangeQueue(). + Logger(logger). + Install(true). + Callback(NopCallback). + Build(ctx) + Expect(err).To(HaveOccurred()) + message := err.Error() + Expect(message).To(ContainSubstring("database")) + Expect(message).To(ContainSubstring("URL")) + Expect(message).To(ContainSubstring("mandatory")) + }) + + It("Can't be created with zero interval", func() { + _, err := NewChangeQueue(). + Logger(logger). + URL(dbURL). + Install(true). + Callback(NopCallback). + Interval(0). + Build(ctx) + Expect(err).To(HaveOccurred()) + message := err.Error() + Expect(message).To(ContainSubstring("interval")) + Expect(message).To(ContainSubstring("greater or equal than zero")) + }) + + It("Can't be created with zero timeout", func() { + _, err := NewChangeQueue(). + Logger(logger). + URL(dbURL). + Install(true). + Callback(NopCallback). + Timeout(0). + Build(ctx) + Expect(err).To(HaveOccurred()) + message := err.Error() + Expect(message).To(ContainSubstring("timeout")) + Expect(message).To(ContainSubstring("greater or equal than zero")) + }) + + It("Creates the changes table if it doesn't exist", func() { + // Create the queue: + queue, err := NewChangeQueue(). + Logger(logger). + URL(dbURL). + Name("my_changes"). + Install(true). + Callback(NopCallback). + Interval(100 * time.Millisecond). + Build(ctx) + Expect(err).ToNot(HaveOccurred()) + defer func() { + err = queue.Close() + Expect(err).ToNot(HaveOccurred()) + }() + + // Check that the table exists: + rows, err := dbHandle.Query(` + select + serial, + timestamp, + source, + operation, + old, + new + from + my_changes + `) + Expect(err).ToNot(HaveOccurred()) + err = rows.Close() + Expect(err).ToNot(HaveOccurred()) + }) + + It("Can be created if the changes table already exists", func() { + // Create the changes table: + _, err := dbHandle.Exec(` + create table my_changes ( + serial serial primary key, + timestamp timestamp with time zone not null default now(), + source text, + operation text, + old jsonb, + new jsonb + ) + `) + Expect(err).ToNot(HaveOccurred()) + + // Create the queue: + queue, err := NewChangeQueue(). + Logger(logger). + URL(dbURL). + Name("my_changes"). + Install(true). + Callback(NopCallback). + Build(ctx) + Expect(err).ToNot(HaveOccurred()) + defer func() { + err = queue.Close() + Expect(err).ToNot(HaveOccurred()) + }() + }) + + It("Processes insert, update and delete", func() { + // Create a callback function that stores the changes in an array: + var changes []*ChangeQueueItem + callback := func(ctx context.Context, item *ChangeQueueItem) { + changes = append(changes, item) + } + + // Create the queue: + queue, err := NewChangeQueue(). + Logger(logger). + URL(dbURL). + Name("my_changes"). + Table("my_data"). + Install(true). + Callback(callback). + Interval(100 * time.Millisecond). + Build(ctx) + Expect(err).ToNot(HaveOccurred()) + defer func() { + err = queue.Close() + Expect(err).ToNot(HaveOccurred()) + }() + + // Insert: + _, err = dbHandle.Exec(`insert into my_data (id, name) values (123, 'my_name')`) + Expect(err).ToNot(HaveOccurred()) + + // Update: + _, err = dbHandle.Exec(`update my_data set name = 'your_name' where id = 123`) + Expect(err).ToNot(HaveOccurred()) + + // Delete: + _, err = dbHandle.Exec(`delete from my_data where id = 123`) + Expect(err).ToNot(HaveOccurred()) + + // Wait a bit so that the changes can be processed: + time.Sleep(150 * time.Millisecond) + + // Check the number of changes: + Expect(changes).To(HaveLen(3)) + + // Check the insert: + insertChange := changes[0] + Expect(insertChange.Serial).To(Equal(1)) + Expect(insertChange.Source).To(Equal("my_data")) + Expect(insertChange.Operation).To(Equal("insert")) + Expect(insertChange.Old).To(BeEmpty()) + Expect(insertChange.New).To(MatchJSON(`{ + "id": 123, + "name": "my_name" + }`)) + + // Check the update: + updateChange := changes[1] + Expect(updateChange.Serial).To(Equal(2)) + Expect(updateChange.Source).To(Equal("my_data")) + Expect(updateChange.Operation).To(Equal("update")) + Expect(updateChange.Old).To(MatchJSON(`{ + "id": 123, + "name": "my_name" + }`)) + Expect(updateChange.New).To(MatchJSON(`{ + "id": 123, + "name": "your_name" + }`)) + + // Check the delete: + deleteChange := changes[2] + Expect(deleteChange.Serial).To(Equal(3)) + Expect(deleteChange.Source).To(Equal("my_data")) + Expect(deleteChange.Operation).To(Equal("delete")) + Expect(deleteChange.Old).To(MatchJSON(`{ + "id": 123, + "name": "your_name" + }`)) + Expect(deleteChange.New).To(BeEmpty()) + }) + + It("Processes changes made before creation", func() { + // Create the queue and close it inmediately so that it will only install the + // database objects but not process any changes: + queue, err := NewChangeQueue(). + Logger(logger). + URL(dbURL). + Name("my_changes"). + Table("my_data"). + Install(true). + Callback(NopCallback). + Interval(100 * time.Millisecond). + Build(ctx) + Expect(err).ToNot(HaveOccurred()) + err = queue.Close() + Expect(err).ToNot(HaveOccurred()) + + // Make a change: + _, err = dbHandle.Exec(`insert into my_data (id, name) values (123, 'my_name')`) + Expect(err).ToNot(HaveOccurred()) + + // Create the queue again, this time letting it run till the end of the test: + var change *ChangeQueueItem + callback := func(ctx context.Context, item *ChangeQueueItem) { + change = item + } + queue, err = NewChangeQueue(). + Logger(logger). + URL(dbURL). + Name("my_changes"). + Table("my_data"). + Install(true). + Callback(callback). + Interval(100 * time.Millisecond). + Build(ctx) + Expect(err).ToNot(HaveOccurred()) + defer func() { + err = queue.Close() + Expect(err).ToNot(HaveOccurred()) + }() + + // Wait a bit so that the changes can be processed: + time.Sleep(150 * time.Millisecond) + + // Check the change: + Expect(change).ToNot(BeNil()) + Expect(change.Serial).To(Equal(1)) + Expect(change.Source).To(Equal("my_data")) + Expect(change.Operation).To(Equal("insert")) + Expect(change.Old).To(BeEmpty()) + Expect(change.New).To(MatchJSON(`{ + "id": 123, + "name": "my_name" + }`)) + }) + + It("Process changes made after creation", func() { + // Create the queue: + var change *ChangeQueueItem + callback := func(ctx context.Context, item *ChangeQueueItem) { + change = item + } + queue, err := NewChangeQueue(). + Logger(logger). + URL(dbURL). + Name("my_changes"). + Table("my_data"). + Install(true). + Callback(callback). + Interval(100 * time.Millisecond). + Build(ctx) + Expect(err).ToNot(HaveOccurred()) + defer func() { + err = queue.Close() + Expect(err).ToNot(HaveOccurred()) + }() + + // Do a change: + _, err = dbHandle.Exec(`insert into my_data (id, name) values (123, 'my_name')`) + Expect(err).ToNot(HaveOccurred()) + + // Wait a bit so that the changes can be processed: + time.Sleep(150 * time.Millisecond) + + // Check the change: + Expect(change).ToNot(BeNil()) + Expect(change.Serial).To(Equal(1)) + Expect(change.Source).To(Equal("my_data")) + Expect(change.Operation).To(Equal("insert")) + Expect(change.Old).To(BeEmpty()) + Expect(change.New).To(MatchJSON(`{ + "id": 123, + "name": "my_name" + }`)) + }) + + It("Passes custom context to callback", func() { + // Create the queue: + queue, err := NewChangeQueue(). + Logger(logger). + URL(dbURL). + Name("my_changes"). + Table("my_data"). + Install(true). + Context(func() context.Context { + // nolint + return context.WithValue(context.Background(), "my_key", "my_value") + }). + Callback(func(ctx context.Context, item *ChangeQueueItem) { + defer GinkgoRecover() + key := ctx.Value("my_key") + Expect(key).To(Equal("my_value")) + }). + Interval(100 * time.Millisecond). + Build(ctx) + Expect(err).ToNot(HaveOccurred()) + defer func() { + err = queue.Close() + Expect(err).ToNot(HaveOccurred()) + }() + + // Do a change: + _, err = dbHandle.Exec(`insert into my_data (id, name) values (123, 'my_name')`) + Expect(err).ToNot(HaveOccurred()) + + // Wait a bit so that the changes can be processed: + time.Sleep(150 * time.Millisecond) + }) +}) diff --git a/database/main_test.go b/database/main_test.go index be9703ab..a0833c1e 100644 --- a/database/main_test.go +++ b/database/main_test.go @@ -19,11 +19,39 @@ package database import ( "testing" - . "github.com/onsi/ginkgo/v2/dsl/core" // nolint - . "github.com/onsi/gomega" // nolint + . "github.com/onsi/ginkgo/v2/dsl/core" // nolint + . "github.com/onsi/gomega" // nolint + . "github.com/openshift-online/ocm-sdk-go/testing" // nolint + + "github.com/openshift-online/ocm-sdk-go/logging" ) func TestDatabase(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Database") } + +// logger is the logger that will be used by the tests. +var logger logging.Logger + +// dbServer is the database dbServer that will be used to create the databases used by the tests. +var dbServer *DatabaseServer + +var _ = BeforeSuite(func() { + var err error + + // Create a logger that writes to the Ginkgo stream: + logger, err = logging.NewStdLoggerBuilder(). + Streams(GinkgoWriter, GinkgoWriter). + Debug(true). + Build() + Expect(err).ToNot(HaveOccurred()) + + // Start the database server: + dbServer = MakeDatabaseServer() +}) + +var _ = AfterSuite(func() { + // Stop the database server: + dbServer.Close() +}) diff --git a/examples/change_queue.go b/examples/change_queue.go new file mode 100644 index 00000000..7ffe62ed --- /dev/null +++ b/examples/change_queue.go @@ -0,0 +1,120 @@ +/* +Copyright (c) 2021 Red Hat, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "os" + "strings" + "time" + + "github.com/openshift-online/ocm-sdk-go/database" + "github.com/openshift-online/ocm-sdk-go/logging" +) + +// This example shows how to create a queue that will call a function for each change that is done +// in a database table. To use adjust the `URL` parameter of the constructor of the queue and create +// a `my_data` table, for example: +// +// create table my_data ( +// id integer not null primary key, +// name text not null +// ); +// +// Then run this example in one window and in another window do some operations with that table, for +// example insert a new record: +// +// insert into my_data (id, name) values (123, 'my_name'); +// +// If everything works correctly you will see something like this in the output: +// +// Serial: 57 +// Timestamp: 2021-09-02T19:14:23+02:00 +// Source: my_data +// Operation: insert +// Old: +// New: { +// "id": 124, +// "name": "my_name" +// } +// +// Try some other operations on the table to see more results. + +func main() { + // Create a context: + ctx := context.Background() + + // Create a logger that has the debug level enabled: + logger, err := logging.NewGoLoggerBuilder(). + Debug(true). + Build() + if err != nil { + fmt.Fprintf(os.Stderr, "Can't build logger: %v\n", err) + os.Exit(1) + } + + // Create the queue: + queue, err := database.NewChangeQueue(). + Logger(logger). + URL("postgres://service:service123@localhost/service?sslmode=disable"). + Name("my_queue"). + Table("my_data"). + Install(true). + Callback(print). + Build(ctx) + if err != nil { + fmt.Fprintf(os.Stderr, "Can't create queue: %v\n", err) + os.Exit(1) + } + + // Wait a while and then close the queue: + time.Sleep(5 * time.Minute) + err = queue.Close() + if err != nil { + fmt.Fprintf(os.Stderr, "Can't close queue: %v\n", err) + os.Exit(1) + } +} + +func print(ctx context.Context, item *database.ChangeQueueItem) { + fmt.Printf("Serial: %d\n", item.Serial) + fmt.Printf("Timestamp: %s\n", item.Timestamp.Format(time.RFC3339)) + fmt.Printf("Source: %s\n", item.Source) + fmt.Printf("Operation: %s\n", item.Operation) + fmt.Printf("Old: %s\n", render(item.Old)) + fmt.Printf("New: %s\n", render(item.New)) + fmt.Printf("\n") +} + +func render(data []byte) string { + var object map[string]interface{} + err := json.Unmarshal(data, &object) + if err != nil { + return string(data) + } + var buffer bytes.Buffer + encoder := json.NewEncoder(&buffer) + encoder.SetIndent("", " ") + err = encoder.Encode(object) + if err != nil { + return string(data) + } + return strings.TrimSpace(buffer.String()) +} diff --git a/leadership/main_test.go b/leadership/main_test.go index 2d624699..6e0c4921 100644 --- a/leadership/main_test.go +++ b/leadership/main_test.go @@ -37,7 +37,7 @@ func TestLeadership(t *testing.T) { // logger is the logger that will be used by the tests. var logger logging.Logger -// dbServer is the database dbServer that will be used to create the databases used by the tests. +// dbServer is the database server that will be used to create the databases used by the tests. var dbServer *DatabaseServer var _ = BeforeSuite(func() { diff --git a/testing/database.go b/testing/database.go index 88e65d01..ad1f5c34 100644 --- a/testing/database.go +++ b/testing/database.go @@ -196,12 +196,17 @@ func (s *DatabaseServer) MakeDatabase() *Database { return db } -// MakeHandle creates a new database handle for this database. -func (d *Database) MakeHandle() *sql.DB { - url := fmt.Sprintf( +// MakeURL calculates the URL for this database. +func (d *Database) MakeURL() string { + return fmt.Sprintf( "postgres://%s:%s@%s:%s/%s?sslmode=disable", d.user, d.password, d.server.host, d.server.port, d.name, ) +} + +// MakeHandle creates a new database handle for this database. +func (d *Database) MakeHandle() *sql.DB { + url := d.MakeURL() handle, err := sql.Open("pgx", url) Expect(err).ToNot(HaveOccurred()) return handle