Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Aug 13, 2024
1 parent 543ff9d commit 5ece5d3
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
14 changes: 10 additions & 4 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,12 +658,14 @@ func (lp *logPoller) backgroundWorkerRun() {
}
case <-logPruneTick:
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000
if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
} else if !allRemoved {
allRemoved, err := lp.PruneExpiredLogs(ctx)
if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241))
}
if err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
}
}
}
}
Expand Down Expand Up @@ -1093,7 +1095,11 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
// Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
rowsRemoved, err := lp.orm.DeleteExpiredLogs(ctx, lp.logPrunePageSize)
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
if err != nil || rowsRemoved < lp.logPrunePageSize {
return true, err
}
rowsRemoved, err = lp.orm.DeleteExcessLogs(ctx, lp.logPrunePageSize)
return lp.logPrunePageSize == 0 || err != nil || rowsRemoved < lp.logPrunePageSize, err
}

// Logs returns logs matching topics and address (exactly) in the given block range,
Expand Down
27 changes: 18 additions & 9 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/query"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand All @@ -36,6 +37,7 @@ type ORM interface {
DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error)
DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error
DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error)
DeleteExcessLogs(ctx context.Context, limit int64) (int64, error)

GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error)
SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error)
Expand Down Expand Up @@ -350,23 +352,30 @@ func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, erro
}

// DeleteExcessLogs deletes any logs old enough that MaxLogsKept has been exceeded for every filter they match.
func (o *DSORM) DeleteExcessLogs(ctx context.Context) (int64, error) {
rowIds := struct {
EvmChainId ubig.Big
BlockHash common.Hash
LogIndex uint64
}{}
err := o.ds.GetContext(ctx, &rowIds, `
func (o *DSORM) DeleteExcessLogs(ctx context.Context, limit int64) (int64, error) {
var rowIds []struct {
BlockNumber uint64
LogIndex uint64
}

var limitClause string
if limit > 0 {
limitClause = fmt.Sprintf(" LIMIT %d", limit)
}

query := `
SELECT block_number, log_index FROM (
SELECT max_logs_kept != 0 AND ROW_NUMBER() OVER(PARTITION BY f.id ORDER BY block_number, log_index DESC) > max_logs_kept AS old, block_number, log_index
FROM evm.log_poller_filters f JOIN evm.logs l
ON f.evm_chain_id = l.evm_chain_id AND f.address = l.address AND f.event = l.event_sig WHERE f.evm_chain_id=$1
) x GROUP BY block_number, log_index HAVING BOOL_AND(old)`, ubig.New(o.chainID))
) x GROUP BY block_number, log_index HAVING BOOL_AND(old)` + limitClause

err := o.ds.SelectContext(ctx, &rowIds, query, ubig.New(o.chainID))
if err != nil {
return 0, err
}

result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE block_hash=:block_hash AND log_index=:log_index AND evm_chain_id=5`, rowIds)
result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE evm_chain_id = :evm_chain_id AND (block_number, log_index) IN rowIds`, rowIds)

if err != nil {
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2113,7 +2113,7 @@ func Benchmark_LogPruning(b *testing.B) {
return o.DeleteExpiredLogs(ctx, 1000)
})
runBenchmarking("DeleteExcessLogs", func(ctx context.Context) (int64, error) {
return o.DeleteExcessLogs(ctx)
return o.DeleteExcessLogs(ctx, 0)
})
}

Expand Down

0 comments on commit 5ece5d3

Please sign in to comment.