Skip to content

Commit

Permalink
Algod: refactor registry operations (#3647)
Browse files Browse the repository at this point in the history
## Summary
Refactored WriteThread such that it'll be easier to add new operations. 
Didn't delete the DBs`inner` functions, instead moved them to be handled by a struct that implements `dbOp` interface
  • Loading branch information
algonathan authored Mar 2, 2022
1 parent e1ae888 commit c43c5e0
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 264 deletions.
278 changes: 20 additions & 258 deletions data/account/participationRegistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"encoding/base32"
"errors"
"fmt"
"strings"
"time"

"github.com/algorand/go-deadlock"
Expand Down Expand Up @@ -288,7 +287,7 @@ func makeParticipationRegistry(accessor db.Pair, log logging.Logger) (*participa
registry := &participationDB{
log: log,
store: accessor,
writeQueue: make(chan partDBWriteRecord, 10),
writeQueue: make(chan opRequest, 10),
writeQueueDone: make(chan struct{}),
flushTimeout: defaultTimeout,
}
Expand Down Expand Up @@ -402,7 +401,7 @@ type participationDB struct {
store db.Pair
mutex deadlock.RWMutex

writeQueue chan partDBWriteRecord
writeQueue chan opRequest
writeQueueDone chan struct{}

flushTimeout time.Duration
Expand All @@ -414,20 +413,6 @@ type updatingParticipationRecord struct {
required bool
}

// partDBWriteRecord event object sent to the writeThread to facilitate async
// database writes. Only one set of event fields should be set at a time.
type partDBWriteRecord struct {
insertID ParticipationID
insert Participation
keys StateProofKeys

registerUpdated map[ParticipationID]updatingParticipationRecord

delete ParticipationID

flushResultChannel chan error
}

func (db *participationDB) initializeCache() error {
db.mutex.Lock()
defer db.mutex.Unlock()
Expand All @@ -453,37 +438,14 @@ func (db *participationDB) initializeCache() error {

func (db *participationDB) writeThread() {
defer close(db.writeQueueDone)
var err error
var lastErr error
for {
var wr partDBWriteRecord
var chanOk bool

// blocking read until next activity or close
wr, chanOk = <-db.writeQueue
if !chanOk {
return // chan closed
}

if len(wr.registerUpdated) != 0 {
err = db.registerInner(wr.registerUpdated)
} else if !wr.insertID.IsZero() {
if wr.insert != (Participation{}) {
err = db.insertInner(wr.insert, wr.insertID)
} else if len(wr.keys) != 0 {
err = db.appendKeysInner(wr.insertID, wr.keys)
}
} else if !wr.delete.IsZero() {
err = db.deleteInner(wr.delete)
} else if wr.flushResultChannel != nil {
err = db.flushInner()
}
if err != nil {
for op := range db.writeQueue {
if err := op.operation.apply(db); err != nil {
lastErr = err
}

if wr.flushResultChannel != nil {
wr.flushResultChannel <- lastErr
if op.errChannel != nil {
op.errChannel <- lastErr
lastErr = nil
}
}
Expand All @@ -504,203 +466,6 @@ func verifyExecWithOneRowEffected(err error, result sql.Result, operationName st
return nil
}

func (db *participationDB) insertInner(record Participation, id ParticipationID) (err error) {
var rawVRF []byte
var rawVoting []byte
var rawStateProofContext []byte

if record.VRF != nil {
rawVRF = protocol.Encode(record.VRF)
}
if record.Voting != nil {
voting := record.Voting.Snapshot()
rawVoting = protocol.Encode(&voting)
}

// This contains all the state proof data except for the actual secret keys (stored in a different table)
if record.StateProofSecrets != nil {
rawStateProofContext = protocol.Encode(&record.StateProofSecrets.SignerContext)
}

err = db.store.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
result, err := tx.Exec(
insertKeysetQuery,
id[:],
record.Parent[:],
record.FirstValid,
record.LastValid,
record.KeyDilution,
rawVRF,
rawStateProofContext)
if err = verifyExecWithOneRowEffected(err, result, "insert keyset"); err != nil {
return err
}
pk, err := result.LastInsertId()
if err != nil {
return fmt.Errorf("unable to get pk from keyset: %w", err)
}

// Create Rolling entry
result, err = tx.Exec(insertRollingQuery, pk, rawVoting)
if err = verifyExecWithOneRowEffected(err, result, "insert rolling"); err != nil {
return err
}

return nil
})
return err
}

func (db *participationDB) appendKeysInner(id ParticipationID, keys StateProofKeys) error {
err := db.store.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
// Fetch primary key
var pk int
row := tx.QueryRow(selectPK, id[:])
err := row.Scan(&pk)
if err == sql.ErrNoRows {
// nothing to do.
return nil
}
if err != nil {
return fmt.Errorf("unable to scan pk: %w", err)
}

stmt, err := tx.Prepare(appendStateProofKeysQuery)
if err != nil {
return fmt.Errorf("unable to prepare state proof insert: %w", err)
}

for _, key := range keys {
result, err := stmt.Exec(pk, key.Round, protocol.Encode(key.Key))
if err = verifyExecWithOneRowEffected(err, result, "append keys"); err != nil {
return err
}
}

return nil
})
return err
}

func (db *participationDB) registerInner(updated map[ParticipationID]updatingParticipationRecord) error {
var cacheDeletes []ParticipationID
err := db.store.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
// Disable active key if there is one
for id, record := range updated {
err := updateRollingFields(ctx, tx, record.ParticipationRecord)
// Repair the case when no keys were updated
if err == ErrNoKeyForID {
db.log.Warn("participationDB unable to update key in cache. Removing from cache.")
cacheDeletes = append(cacheDeletes, id)
if !record.required {
err = nil
}
}
if err != nil {
return fmt.Errorf("unable to disable old key when registering %s: %w", id, err)
}
}
return nil
})

// Update cache
if err == nil && len(cacheDeletes) != 0 {
db.mutex.Lock()
defer db.mutex.Unlock()
for _, id := range cacheDeletes {
delete(db.cache, id)
delete(db.dirty, id)
}
}
return err
}

func (db *participationDB) deleteInner(id ParticipationID) error {
err := db.store.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
// Fetch primary key
var pk int
row := tx.QueryRow(selectPK, id[:])
err := row.Scan(&pk)
if err == sql.ErrNoRows {
// nothing to do.
return nil
}
if err != nil {
return fmt.Errorf("unable to scan pk: %w", err)
}

// Delete rows
result, err := tx.Exec(deleteKeysets, pk)
if err = verifyExecWithOneRowEffected(err, result, "delete keyset"); err != nil {
return err
}

result, err = tx.Exec(deleteRolling, pk)
if err = verifyExecWithOneRowEffected(err, result, "delete rolling"); err != nil {
return err
}

return nil
})
return err
}

func (db *participationDB) flushInner() error {
var dirty map[ParticipationID]struct{}
db.mutex.Lock()
if len(db.dirty) != 0 {
dirty = db.dirty
db.dirty = make(map[ParticipationID]struct{})
} else {
dirty = nil
}

var needsUpdate []ParticipationRecord
// Verify that the dirty flag has not desynchronized from the cache.
for id := range dirty {
if rec, ok := db.cache[id]; !ok {
db.log.Warnf("participationDB fixing dirty flag de-synchronization for %s", id)
delete(db.cache, id)
} else {
needsUpdate = append(needsUpdate, rec)
}
}
db.mutex.Unlock()

if dirty == nil {
return nil
}

err := db.store.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var errorStr strings.Builder
for _, record := range needsUpdate {
err := updateRollingFields(ctx, tx, record)
// This should only be updating key usage so ignoring missing keys is not a problem.
if err != nil && err != ErrNoKeyForID {
if errorStr.Len() > 0 {
errorStr.WriteString(", ")
}
errorStr.WriteString(err.Error())
}
}
if errorStr.Len() > 0 {
return errors.New(errorStr.String())
}
return nil
})

if err != nil {
// put back what we didn't finish with
db.mutex.Lock()
for id, v := range dirty {
db.dirty[id] = v
}
db.mutex.Unlock()
}

return err
}

func (db *participationDB) Insert(record Participation) (id ParticipationID, err error) {
db.mutex.Lock()
defer db.mutex.Unlock()
Expand All @@ -712,10 +477,10 @@ func (db *participationDB) Insert(record Participation) (id ParticipationID, err
return id, ErrAlreadyInserted
}

db.writeQueue <- partDBWriteRecord{
insertID: id,
insert: record,
}
db.writeQueue <- makeOpRequest(&insertOp{
id: id,
record: record,
})

// Make some copies.
var vrf *crypto.VRFSecrets
Expand Down Expand Up @@ -767,10 +532,11 @@ func (db *participationDB) AppendKeys(id ParticipationID, keys StateProofKeys) e
}

// Update the DB asynchronously.
db.writeQueue <- partDBWriteRecord{
insertID: id,
keys: keys,
}
db.writeQueue <- makeOpRequest(&appendKeysOp{
id: id,
keys: keys,
})

return nil
}

Expand All @@ -786,9 +552,8 @@ func (db *participationDB) Delete(id ParticipationID) error {
delete(db.cache, id)

// do the db part async
db.writeQueue <- partDBWriteRecord{
delete: id,
}
db.writeQueue <- makeOpRequest(&deleteOp{id})

return nil
}

Expand Down Expand Up @@ -1100,9 +865,8 @@ func (db *participationDB) Register(id ParticipationID, on basics.Round) error {
}

if len(updated) != 0 {
db.writeQueue <- partDBWriteRecord{
registerUpdated: updated,
}
db.writeQueue <- makeOpRequest(&registerOp{updated: updated})

db.mutex.Lock()
for id, record := range updated {
delete(db.dirty, id)
Expand Down Expand Up @@ -1163,9 +927,7 @@ func (db *participationDB) Record(account basics.Address, round basics.Round, pa
func (db *participationDB) Flush(timeout time.Duration) error {
resultCh := make(chan error, 1)
timeoutCh := time.After(timeout)
writeRecord := partDBWriteRecord{
flushResultChannel: resultCh,
}
writeRecord := makeOpRequestWithError(&flushOp{}, resultCh)

select {
case db.writeQueue <- writeRecord:
Expand Down
Loading

0 comments on commit c43c5e0

Please sign in to comment.