Skip to content

Commit

Permalink
perf test: Transaction group handle/verify (#4652)
Browse files Browse the repository at this point in the history
  • Loading branch information
algonautshant authored Oct 24, 2022
1 parent fbd5b17 commit 5e0ea76
Show file tree
Hide file tree
Showing 2 changed files with 322 additions and 0 deletions.
2 changes: 2 additions & 0 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func reencode(stxns []transactions.SignedTxn) []byte {
// backlogWorker is the worker go routine that process the incoming messages from the postVerificationQueue and backlogQueue channels
// and dispatches them further.
func (handler *TxHandler) backlogWorker() {
// Note: TestIncomingTxHandle and TestIncomingTxGroupHandle emulate this function.
// Changes to the behavior in this function should be reflected in the test.
defer handler.backlogWg.Done()
for {
// prioritize the postVerificationQueue
Expand Down
320 changes: 320 additions & 0 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package data

import (
"encoding/binary"
"fmt"
"io"
"math/rand"
"strings"
"sync"
"testing"
"time"

Expand All @@ -38,6 +41,7 @@ import (
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-algorand/util/metrics"
)

func BenchmarkTxHandlerProcessing(b *testing.B) {
Expand Down Expand Up @@ -248,3 +252,319 @@ func BenchmarkTxHandlerDecoderMsgp(b *testing.B) {
require.Equal(b, benchTxnNum, idx)
}
}

func TestIncomingTxHandle(t *testing.T) {
incomingTxHandlerProcessing(1, t)
}

func TestIncomingTxGroupHandle(t *testing.T) {
incomingTxHandlerProcessing(proto.MaxTxGroupSize, t)
}

// incomingTxHandlerProcessing is a comprehensive transaction handling test
// It handles the singed transactions by passing them to the backlog for verification
func incomingTxHandlerProcessing(maxGroupSize int, t *testing.T) {
const numUsers = 100
numberOfTransactionGroups := 1000
log := logging.TestingLog(t)
log.SetLevel(logging.Warn)
addresses := make([]basics.Address, numUsers)
secrets := make([]*crypto.SignatureSecrets, numUsers)

// prepare the accounts
genesis := make(map[basics.Address]basics.AccountData)
for i := 0; i < numUsers; i++ {
secret := keypair()
addr := basics.Address(secret.SignatureVerifier)
secrets[i] = secret
addresses[i] = addr
genesis[addr] = basics.AccountData{
Status: basics.Online,
MicroAlgos: basics.MicroAlgos{Raw: 10000000000000},
}
}
genesis[poolAddr] = basics.AccountData{
Status: basics.NotParticipating,
MicroAlgos: basics.MicroAlgos{Raw: config.Consensus[protocol.ConsensusCurrentVersion].MinBalance},
}

require.Equal(t, len(genesis), numUsers+1)
genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr)
ledgerName := fmt.Sprintf("%s-mem-%d", t.Name(), numberOfTransactionGroups)
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
require.NoError(t, err)

l := ledger
tp := pools.MakeTransactionPool(l.Ledger, cfg, logging.Base())
backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil)
handler := MakeTxHandler(tp, l, &mocks.MockNetwork{}, "", crypto.Digest{}, backlogPool)
defer handler.ctxCancel()

outChan := make(chan *txBacklogMsg, 10)
wg := sync.WaitGroup{}
wg.Add(1)
// Make a test backlog worker, which is simiar to backlogWorker, but sends the results
// through the outChan instead of passing it to postprocessCheckedTxn
go func() {
defer wg.Done()
defer close(outChan)
for {
// prioritize the postVerificationQueue
select {
case wi, ok := <-handler.postVerificationQueue:
if !ok {
return
}
outChan <- wi
// restart the loop so that we could empty out the post verification queue.
continue
default:
}

// we have no more post verification items. wait for either backlog queue item or post verification item.
select {
case wi, ok := <-handler.backlogQueue:
if !ok {
// shut down to end the test
handler.txVerificationPool.Shutdown()
close(handler.postVerificationQueue)
// wait until all the pending responses are obtained.
// this is not in backlogWorker, maybe should be
for wi := range handler.postVerificationQueue {
outChan <- wi
}
return
}
if handler.checkAlreadyCommitted(wi) {
// this is not expected during the test
continue
}

// enqueue the task to the verification pool.
handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, wi, nil)

case wi, ok := <-handler.postVerificationQueue:
if !ok {
return
}
outChan <- wi

case <-handler.ctx.Done():
return
}
}
}()

// Prepare the transactions
signedTransactionGroups, badTxnGroups :=
makeSignedTxnGroups(numberOfTransactionGroups, numUsers, maxGroupSize, 0.5, addresses, secrets)
encodedSignedTransactionGroups := make([]network.IncomingMessage, 0, numberOfTransactionGroups)
for _, stxngrp := range signedTransactionGroups {
data := make([]byte, 0)
for _, stxn := range stxngrp {
data = append(data, protocol.Encode(&stxn)...)
}
encodedSignedTransactionGroups =
append(encodedSignedTransactionGroups, network.IncomingMessage{Data: data})
}

// Process the results and make sure they are correct
wg.Add(1)
go func() {
defer wg.Done()
groupCounter := 0
txnCounter := 0
invalidCounter := 0
defer func() {
t.Logf("processed %d txn groups (%d txns)\n", groupCounter, txnCounter)
}()
for wi := range outChan {
txnCounter = txnCounter + len(wi.unverifiedTxGroup)
groupCounter++
u, _ := binary.Uvarint(wi.unverifiedTxGroup[0].Txn.Note)
_, inBad := badTxnGroups[u]
if wi.verificationErr == nil {
require.False(t, inBad, "No error for invalid signature")
} else {
invalidCounter++
require.True(t, inBad, "Error for good signature")
}
}
t.Logf("Txn groups with invalid sigs: %d\n", invalidCounter)
}()

// Send the transactions to the verifier
for _, tg := range encodedSignedTransactionGroups {
handler.processIncomingTxn(tg)
randduration := time.Duration(uint64(((1 + rand.Float32()) * 3)))
time.Sleep(randduration * time.Microsecond)
}
close(handler.backlogQueue)
wg.Wait()

// Report the number of transactions dropped because the backlog was busy
var buf strings.Builder
metrics.DefaultRegistry().WriteMetrics(&buf, "")
str := buf.String()
x := strings.Index(str, "\nalgod_transaction_messages_dropped_backlog")
str = str[x+44 : x+44+strings.Index(str[x+44:], "\n")]
str = strings.TrimSpace(strings.ReplaceAll(str, "}", " "))
t.Logf("dropped %s txn gropus\n", str)
}

// makeSignedTxnGroups prepares N transaction groups of random (maxGroupSize) sizes with random
// invalid signatures of a given probability (invalidProb)
func makeSignedTxnGroups(N, numUsers, maxGroupSize int, invalidProb float32, addresses []basics.Address,
secrets []*crypto.SignatureSecrets) (ret [][]transactions.SignedTxn,
badTxnGroups map[uint64]interface{}) {
badTxnGroups = make(map[uint64]interface{})

protoMaxGrpSize := proto.MaxTxGroupSize
ret = make([][]transactions.SignedTxn, 0, N)
for u := 0; u < N; u++ {
grpSize := rand.Intn(protoMaxGrpSize-1) + 1
if grpSize > maxGroupSize {
grpSize = maxGroupSize
}
var txGroup transactions.TxGroup
txns := make([]transactions.Transaction, 0, grpSize)
for g := 0; g < grpSize; g++ {
// generate transactions
noteField := make([]byte, binary.MaxVarintLen64)
binary.PutUvarint(noteField, uint64(u))
tx := transactions.Transaction{
Type: protocol.PaymentTx,
Header: transactions.Header{
Sender: addresses[(u+g)%numUsers],
Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2},
FirstValid: 0,
LastValid: basics.Round(proto.MaxTxnLife),
GenesisHash: genesisHash,
Note: noteField,
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: addresses[(u+g+1)%numUsers],
Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)},
},
}
txGroup.TxGroupHashes = append(txGroup.TxGroupHashes, crypto.Digest(tx.ID()))
txns = append(txns, tx)
}
groupHash := crypto.HashObj(txGroup)
signedTxGroup := make([]transactions.SignedTxn, 0, grpSize)
for g, txn := range txns {
txn.Group = groupHash
signedTx := txn.Sign(secrets[(u+g)%numUsers])
signedTx.Txn = txn
signedTxGroup = append(signedTxGroup, signedTx)
}
// randomly make bad signatures
if rand.Float32() < invalidProb {
tinGrp := rand.Intn(grpSize)
signedTxGroup[tinGrp].Sig[0] = signedTxGroup[tinGrp].Sig[0] + 1
badTxnGroups[uint64(u)] = struct{}{}
}
ret = append(ret, signedTxGroup)
}
return
}

// BenchmarkHandler sends singed transactions the the verifier
func BenchmarkHandleTxns(b *testing.B) {
b.N = b.N * proto.MaxTxGroupSize / 2
runHandlerBenchmark(1, b)
}

// BenchmarkHandler sends singed transaction groups to the verifier
func BenchmarkHandleTxnGroups(b *testing.B) {
runHandlerBenchmark(proto.MaxTxGroupSize, b)
}

// runHandlerBenchmark has a similar workflow to incomingTxHandlerProcessing,
// but bypasses the backlog, and sends the transactions directly to the verifier
func runHandlerBenchmark(maxGroupSize int, b *testing.B) {
const numUsers = 100
log := logging.TestingLog(b)
log.SetLevel(logging.Warn)
addresses := make([]basics.Address, numUsers)
secrets := make([]*crypto.SignatureSecrets, numUsers)

// prepare the accounts
genesis := make(map[basics.Address]basics.AccountData)
for i := 0; i < numUsers; i++ {
secret := keypair()
addr := basics.Address(secret.SignatureVerifier)
secrets[i] = secret
addresses[i] = addr
genesis[addr] = basics.AccountData{
Status: basics.Online,
MicroAlgos: basics.MicroAlgos{Raw: 10000000000000},
}
}
genesis[poolAddr] = basics.AccountData{
Status: basics.NotParticipating,
MicroAlgos: basics.MicroAlgos{Raw: config.Consensus[protocol.ConsensusCurrentVersion].MinBalance},
}

require.Equal(b, len(genesis), numUsers+1)
genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr)
ledgerName := fmt.Sprintf("%s-mem-%d", b.Name(), b.N)
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
require.NoError(b, err)

l := ledger
tp := pools.MakeTransactionPool(l.Ledger, cfg, logging.Base())
backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil)
handler := MakeTxHandler(tp, l, &mocks.MockNetwork{}, "", crypto.Digest{}, backlogPool)
defer handler.ctxCancel()

// Prepare the transactions
signedTransactionGroups, badTxnGroups := makeSignedTxnGroups(b.N, numUsers, maxGroupSize, 0.001, addresses, secrets)
outChan := handler.postVerificationQueue
wg := sync.WaitGroup{}

var tt time.Time
// Process the results and make sure they are correct
wg.Add(1)
go func() {
defer wg.Done()
groupCounter := 0
var txnCounter uint64
invalidCounter := 0
for wi := range outChan {
txnCounter = txnCounter + uint64(len(wi.unverifiedTxGroup))
groupCounter++
u, _ := binary.Uvarint(wi.unverifiedTxGroup[0].Txn.Note)
_, inBad := badTxnGroups[u]
if wi.verificationErr == nil {
require.False(b, inBad, "No error for invalid signature")
} else {
invalidCounter++
require.True(b, inBad, "Error for good signature")
}
}
if txnCounter > 0 {
b.Logf("TPS: %d\n", uint64(txnCounter)*1000000000/uint64(time.Since(tt)))
b.Logf("Time/txn: %d(microsec)\n", uint64((time.Since(tt)/time.Microsecond))/txnCounter)
b.Logf("processed total: [%d groups (%d invalid)] [%d txns]\n", groupCounter, invalidCounter, txnCounter)
}
}()

b.ResetTimer()
tt = time.Now()
for _, stxngrp := range signedTransactionGroups {
blm := txBacklogMsg{rawmsg: nil, unverifiedTxGroup: stxngrp}
handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, &blm, nil)
}
// shut down to end the test
handler.txVerificationPool.Shutdown()
close(handler.postVerificationQueue)
close(handler.backlogQueue)
wg.Wait()
}

0 comments on commit 5e0ea76

Please sign in to comment.