Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP price cache use DB timestamp #13133

Merged
merged 3 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/poor-gorillas-give.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#changed CCIP price cache to use DB timestamp
22 changes: 10 additions & 12 deletions core/services/ccip/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 12 additions & 14 deletions core/services/ccip/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type ORM interface {
InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error
InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error

ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error
ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error
ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error
ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nothing is consuming this interface yet so safe to modify

}

type orm struct {
Expand Down Expand Up @@ -99,20 +99,19 @@ func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector
return nil
}

now := time.Now()
insertData := make([]map[string]interface{}, 0, len(gasPrices))
for _, price := range gasPrices {
insertData = append(insertData, map[string]interface{}{
"chain_selector": destChainSelector,
"job_id": jobId,
"source_chain_selector": price.SourceChainSelector,
"gas_price": price.GasPrice,
"created_at": now,
})
}

// using statement_timestamp() to make testing easier
stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, job_id, source_chain_selector, gas_price, created_at)
VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, :created_at);`
VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, statement_timestamp());`
_, err := o.ds.NamedExecContext(ctx, stmt, insertData)
if err != nil {
err = fmt.Errorf("error inserting gas prices for job %d: %w", jobId, err)
Expand All @@ -126,20 +125,19 @@ func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelect
return nil
}

now := time.Now()
insertData := make([]map[string]interface{}, 0, len(tokenPrices))
for _, price := range tokenPrices {
insertData = append(insertData, map[string]interface{}{
"chain_selector": destChainSelector,
"job_id": jobId,
"token_addr": price.TokenAddr,
"token_price": price.TokenPrice,
"created_at": now,
})
}

// using statement_timestamp() to make testing easier
stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, job_id, token_addr, token_price, created_at)
VALUES (:chain_selector, :job_id, :token_addr, :token_price, :created_at);`
VALUES (:chain_selector, :job_id, :token_addr, :token_price, statement_timestamp());`
_, err := o.ds.NamedExecContext(ctx, stmt, insertData)
if err != nil {
err = fmt.Errorf("error inserting token prices for job %d: %w", jobId, err)
Expand All @@ -148,16 +146,16 @@ func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelect
return err
}

func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error {
stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < $2`
func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error {
stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')`

_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, to)
_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec)
return err
}

func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error {
stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < $2`
func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error {
stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')`

_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, to)
_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec)
return err
}
22 changes: 12 additions & 10 deletions core/services/ccip/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) {
assert.NoError(t, err)
}

interimTimeStamp := time.Now()
sleepSec := 2
time.Sleep(time.Duration(sleepSec) * time.Second)

// insert for the 2nd time after interimTimeStamp
for _, updatesPerSelector := range updates {
Expand All @@ -222,13 +223,13 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) {

assert.Equal(t, 2*numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db))

// clear by interimTimeStamp should delete rows inserted before it
err := orm.ClearGasPricesByDestChain(ctx, destSelector, interimTimeStamp)
// clear by sleepSec should delete rows inserted before it
err := orm.ClearGasPricesByDestChain(ctx, destSelector, sleepSec)
assert.NoError(t, err)
assert.Equal(t, numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db))

// clear by Now() should delete all rows
err = orm.ClearGasPricesByDestChain(ctx, destSelector, time.Now())
// clear by 0 expiration seconds should delete all rows
err = orm.ClearGasPricesByDestChain(ctx, destSelector, 0)
assert.NoError(t, err)
assert.Equal(t, 0, getGasTableRowCount(t, db))
}
Expand Down Expand Up @@ -324,7 +325,8 @@ func TestORM_InsertAndDeleteTokenPrices(t *testing.T) {
assert.NoError(t, err)
}

interimTimeStamp := time.Now()
sleepSec := 2
time.Sleep(time.Duration(sleepSec) * time.Second)

// insert for the 2nd time after interimTimeStamp
for _, updatesPerAddr := range updates {
Expand All @@ -334,13 +336,13 @@ func TestORM_InsertAndDeleteTokenPrices(t *testing.T) {

assert.Equal(t, 2*numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db))

// clear by interimTimeStamp should delete rows inserted before it
err := orm.ClearTokenPricesByDestChain(ctx, destSelector, interimTimeStamp)
// clear by sleepSec should delete rows inserted before it
err := orm.ClearTokenPricesByDestChain(ctx, destSelector, sleepSec)
assert.NoError(t, err)
assert.Equal(t, numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db))

// clear by Now() should delete all rows
err = orm.ClearTokenPricesByDestChain(ctx, destSelector, time.Now())
// clear by 0 expiration seconds should delete all rows
err = orm.ClearTokenPricesByDestChain(ctx, destSelector, 0)
assert.NoError(t, err)
assert.Equal(t, 0, getTokenTableRowCount(t, db))
}
Loading