diff --git a/ledger/accountdb.go b/ledger/accountdb.go index 6943ca2941..c82a4e5ee8 100644 --- a/ledger/accountdb.go +++ b/ledger/accountdb.go @@ -301,17 +301,7 @@ func (a *compactResourcesDeltas) resourcesLoadOld(tx *sql.Tx, knownAddresses map if len(a.misses) == 0 { return nil } - selectStmt, err := tx.Prepare("SELECT data FROM resources WHERE addrid = ? AND aidx = ?") - if err != nil { - return - } - defer selectStmt.Close() - - addrRowidStmt, err := tx.Prepare("SELECT rowid FROM accountbase WHERE address=?") - if err != nil { - return - } - defer addrRowidStmt.Close() + arw := store.NewAccountsSQLReaderWriter(tx) defer func() { a.misses = nil @@ -327,7 +317,7 @@ func (a *compactResourcesDeltas) resourcesLoadOld(tx *sql.Tx, knownAddresses map if delta.oldResource.Addrid != 0 { addrid = delta.oldResource.Addrid } else if addrid, ok = knownAddresses[addr]; !ok { - err = addrRowidStmt.QueryRow(addr[:]).Scan(&addrid) + addrid, err = arw.LookupAccountRowID(addr) if err != nil { if err != sql.ErrNoRows { err = fmt.Errorf("base account cannot be read while processing resource for addr=%s, aidx=%d: %w", addr.String(), aidx, err) @@ -340,8 +330,7 @@ func (a *compactResourcesDeltas) resourcesLoadOld(tx *sql.Tx, knownAddresses map continue } } - resDataBuf = nil - err = selectStmt.QueryRow(addrid, aidx).Scan(&resDataBuf) + resDataBuf, err = arw.LookupResourceDataByAddrID(addrid, aidx) switch err { case nil: if len(resDataBuf) > 0 { @@ -476,23 +465,17 @@ func (a *compactAccountDeltas) accountsLoadOld(tx *sql.Tx) (err error) { if len(a.misses) == 0 { return nil } - selectStmt, err := tx.Prepare("SELECT rowid, data FROM accountbase WHERE address=?") - if err != nil { - return - } - defer selectStmt.Close() + arw := store.NewAccountsSQLReaderWriter(tx) defer func() { a.misses = nil }() - var rowid sql.NullInt64 - var acctDataBuf []byte for _, idx := range a.misses { addr := a.deltas[idx].address - err = selectStmt.QueryRow(addr[:]).Scan(&rowid, &acctDataBuf) + rowid, acctDataBuf, err := arw.LookupAccountDataByAddress(addr) switch err { case nil: if len(acctDataBuf) > 0 { - persistedAcctData := &store.PersistedAccountData{Addr: addr, Rowid: rowid.Int64} + persistedAcctData := &store.PersistedAccountData{Addr: addr, Rowid: rowid} err = protocol.Decode(acctDataBuf, &persistedAcctData.AccountData) if err != nil { return err @@ -500,7 +483,7 @@ func (a *compactAccountDeltas) accountsLoadOld(tx *sql.Tx) (err error) { a.updateOld(idx, *persistedAcctData) } else { // to retain backward compatibility, we will treat this condition as if we don't have the account. - a.updateOld(idx, store.PersistedAccountData{Addr: addr, Rowid: rowid.Int64}) + a.updateOld(idx, store.PersistedAccountData{Addr: addr, Rowid: rowid}) } case sql.ErrNoRows: // we don't have that account, just return an empty record. @@ -618,24 +601,17 @@ func (a *compactOnlineAccountDeltas) accountsLoadOld(tx *sql.Tx) (err error) { if len(a.misses) == 0 { return nil } - // fetch the latest entry - selectStmt, err := tx.Prepare("SELECT rowid, data FROM onlineaccounts WHERE address=? ORDER BY updround DESC LIMIT 1") - if err != nil { - return - } - defer selectStmt.Close() + arw := store.NewAccountsSQLReaderWriter(tx) defer func() { a.misses = nil }() - var rowid sql.NullInt64 - var acctDataBuf []byte for _, idx := range a.misses { addr := a.deltas[idx].address - err = selectStmt.QueryRow(addr[:]).Scan(&rowid, &acctDataBuf) + rowid, acctDataBuf, err := arw.LookupOnlineAccountDataByAddress(addr) switch err { case nil: if len(acctDataBuf) > 0 { - persistedAcctData := &store.PersistedOnlineAccountData{Addr: addr, Rowid: rowid.Int64} + persistedAcctData := &store.PersistedOnlineAccountData{Addr: addr, Rowid: rowid} err = protocol.Decode(acctDataBuf, &persistedAcctData.AccountData) if err != nil { return err @@ -643,12 +619,11 @@ func (a *compactOnlineAccountDeltas) accountsLoadOld(tx *sql.Tx) (err error) { a.updateOld(idx, *persistedAcctData) } else { // empty data means offline account - a.updateOld(idx, store.PersistedOnlineAccountData{Addr: addr, Rowid: rowid.Int64}) + a.updateOld(idx, store.PersistedOnlineAccountData{Addr: addr, Rowid: rowid}) } case sql.ErrNoRows: // we don't have that account, just return an empty record. a.updateOld(idx, store.PersistedOnlineAccountData{Addr: addr}) - err = nil default: // unexpected error - let the caller know that we couldn't complete the operation. return err diff --git a/ledger/store/accountsV2.go b/ledger/store/accountsV2.go index e5361e50bd..649e4111bb 100644 --- a/ledger/store/accountsV2.go +++ b/ledger/store/accountsV2.go @@ -32,7 +32,8 @@ import ( ) type accountsV2Reader struct { - q db.Queryable + q db.Queryable + preparedStatements map[string]*sql.Stmt } type accountsV2Writer struct { @@ -47,11 +48,27 @@ type accountsV2ReaderWriter struct { // NewAccountsSQLReaderWriter creates a Catchpoint SQL reader+writer func NewAccountsSQLReaderWriter(e db.Executable) *accountsV2ReaderWriter { return &accountsV2ReaderWriter{ - accountsV2Reader{q: e}, + accountsV2Reader{q: e, preparedStatements: make(map[string]*sql.Stmt)}, accountsV2Writer{e: e}, } } +func (r *accountsV2Reader) getOrPrepare(queryString string) (stmt *sql.Stmt, err error) { + // fetch statement (use the query as the key) + if stmt, ok := r.preparedStatements[queryString]; ok { + return stmt, nil + } + // we do not have it, prepare it + stmt, err = r.q.Prepare(queryString) + if err != nil { + return + } + // cache the statement + r.preparedStatements[queryString] = stmt + + return stmt, nil +} + // AccountsTotals returns account totals func (r *accountsV2Reader) AccountsTotals(ctx context.Context, catchpointStaging bool) (totals ledgercore.AccountTotals, err error) { id := "" @@ -269,6 +286,65 @@ func (r *accountsV2Reader) LookupAccountAddressFromAddressID(ctx context.Context return } +func (r *accountsV2Reader) LookupAccountDataByAddress(addr basics.Address) (rowid int64, data []byte, err error) { + // optimize this query for repeated usage + selectStmt, err := r.getOrPrepare("SELECT rowid, data FROM accountbase WHERE address=?") + if err != nil { + return + } + + err = selectStmt.QueryRow(addr[:]).Scan(&rowid, &data) + if err != nil { + return + } + return rowid, data, err +} + +// LookupOnlineAccountDataByAddress looks up online account data by address. +func (r *accountsV2Reader) LookupOnlineAccountDataByAddress(addr basics.Address) (rowid int64, data []byte, err error) { + // optimize this query for repeated usage + selectStmt, err := r.getOrPrepare("SELECT rowid, data FROM onlineaccounts WHERE address=? ORDER BY updround DESC LIMIT 1") + if err != nil { + return + } + + err = selectStmt.QueryRow(addr[:]).Scan(&rowid, &data) + if err != nil { + return + } + return rowid, data, err +} + +// LookupAccountRowID looks up the rowid of an account based on its address. +func (r *accountsV2Reader) LookupAccountRowID(addr basics.Address) (rowid int64, err error) { + // optimize this query for repeated usage + addrRowidStmt, err := r.getOrPrepare("SELECT rowid FROM accountbase WHERE address=?") + if err != nil { + return + } + + err = addrRowidStmt.QueryRow(addr[:]).Scan(&rowid) + if err != nil { + return + } + return rowid, err +} + +// LookupResourceDataByAddrID looks up the resource data by account rowid + resource aidx. +func (r *accountsV2Reader) LookupResourceDataByAddrID(addrid int64, aidx basics.CreatableIndex) (data []byte, err error) { + // optimize this query for repeated usage + selectStmt, err := r.getOrPrepare("SELECT data FROM resources WHERE addrid = ? AND aidx = ?") + if err != nil { + return + } + + err = selectStmt.QueryRow(addrid, aidx).Scan(&data) + if err != nil { + return + } + return data, err +} + // LoadAllFullAccounts loads all accounts from balancesTable and resourcesTable. // On every account full load it invokes acctCb callback to report progress and data. func (r *accountsV2Reader) LoadAllFullAccounts(