Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write into txn and txn_participation tables in parallel with other import procedures #805

Merged
merged 3 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions idb/postgres/internal/writer/write_txn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package writer

import (
"context"
"fmt"

"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/protocol"
"github.com/jackc/pgx/v4"

"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/idb/postgres/internal/encoding"
)

// Get the ID of the creatable referenced in the given transaction
// (0 if not an asset or app transaction).
// Note: ConsensusParams.MaxInnerTransactions could be overridden to force
// generating ApplyData.{ApplicationID/ConfigAsset}. This function does
// other things too, so it is not clear we should use it. The only
// real benefit is that it would slightly simplify this function by
// allowing us to leave out the intra / block parameters.
func transactionAssetID(stxnad *transactions.SignedTxnWithAD, intra uint, block *bookkeeping.Block) (uint64, error) {
assetid := uint64(0)

switch stxnad.Txn.Type {
case protocol.ApplicationCallTx:
assetid = uint64(stxnad.Txn.ApplicationID)
if assetid == 0 {
assetid = uint64(stxnad.ApplyData.ApplicationID)
}
if assetid == 0 {
if block == nil {
return 0, fmt.Errorf("transactionAssetID(): Missing ApplicationID for transaction: %s", stxnad.ID())
}
// pre v30 transactions do not have ApplyData.ConfigAsset or InnerTxns
// so txn counter + payset pos calculation is OK
assetid = block.TxnCounter - uint64(len(block.Payset)) + uint64(intra) + 1
}
case protocol.AssetConfigTx:
assetid = uint64(stxnad.Txn.ConfigAsset)
if assetid == 0 {
assetid = uint64(stxnad.ApplyData.ConfigAsset)
}
if assetid == 0 {
if block == nil {
return 0, fmt.Errorf("transactionAssetID(): Missing ConfigAsset for transaction: %s", stxnad.ID())
}
// pre v30 transactions do not have ApplyData.ApplicationID or InnerTxns
// so txn counter + payset pos calculation is OK
assetid = block.TxnCounter - uint64(len(block.Payset)) + uint64(intra) + 1
}
case protocol.AssetTransferTx:
assetid = uint64(stxnad.Txn.XferAsset)
case protocol.AssetFreezeTx:
assetid = uint64(stxnad.Txn.FreezeAsset)
}

return assetid, nil
}

// Traverses the inner transaction tree and writes database rows
// to `outCh`. It performs a preorder traversal to correctly compute
// the intra round offset, the offset for the next transaction is returned.
func yieldInnerTransactions(ctx context.Context, stxnad *transactions.SignedTxnWithAD, block *bookkeeping.Block, intra, rootIntra uint, rootTxid string, outCh chan []interface{}) (uint, error) {
for _, itxn := range stxnad.ApplyData.EvalDelta.InnerTxns {
txn := &itxn.Txn
typeenum, ok := idb.GetTypeEnum(txn.Type)
if !ok {
return 0, fmt.Errorf("yieldInnerTransactions() get type enum")
}
// block shouldn't be used for inner transactions.
assetid, err := transactionAssetID(&itxn, 0, nil)
if err != nil {
return 0, err
}
extra := idb.TxnExtra{
AssetCloseAmount: itxn.ApplyData.AssetClosingAmount,
RootIntra: idb.OptionalUint{Present: true, Value: rootIntra},
RootTxid: rootTxid,
}

// When encoding an inner transaction we remove any further nested inner transactions.
// To reconstruct a full object the root transaction must be fetched.
txnNoInner := *stxnad
txnNoInner.EvalDelta.InnerTxns = nil
row := []interface{}{
uint64(block.Round()), intra, int(typeenum), assetid,
nil, // inner transactions do not have a txid.
encoding.EncodeSignedTxnWithAD(txnNoInner),
encoding.EncodeTxnExtra(&extra)}
select {
case <-ctx.Done():
return 0, fmt.Errorf("yieldInnerTransactions() ctx.Err(): %w", ctx.Err())
case outCh <- row:
}

// Recurse at end for preorder traversal
intra, err =
yieldInnerTransactions(ctx, &itxn, block, intra+1, rootIntra, rootTxid, outCh)
if err != nil {
return 0, err
}
}

return intra, nil
}

// Writes database rows for transactions (including inner transactions) to `outCh`.
func yieldTransactions(ctx context.Context, block *bookkeeping.Block, modifiedTxns []transactions.SignedTxnInBlock, outCh chan []interface{}) error {
intra := uint(0)
for idx, stib := range block.Payset {
var stxnad transactions.SignedTxnWithAD
var err error
// This function makes sure to set correct genesis information so we can get the
// correct transaction hash.
stxnad.SignedTxn, stxnad.ApplyData, err = block.BlockHeader.DecodeSignedTxn(stib)
if err != nil {
return fmt.Errorf("yieldTransactions() decode signed txn err: %w", err)
}

txn := &stxnad.Txn
typeenum, ok := idb.GetTypeEnum(txn.Type)
if !ok {
return fmt.Errorf("yieldTransactions() get type enum")
}
assetid, err := transactionAssetID(&stxnad, intra, block)
if err != nil {
return err
}
id := txn.ID().String()

extra := idb.TxnExtra{
AssetCloseAmount: modifiedTxns[idx].ApplyData.AssetClosingAmount,
}
row := []interface{}{
uint64(block.Round()), intra, int(typeenum), assetid, id,
encoding.EncodeSignedTxnWithAD(stxnad),
encoding.EncodeTxnExtra(&extra)}
select {
case <-ctx.Done():
return fmt.Errorf("yieldTransactions() ctx.Err(): %w", ctx.Err())
case outCh <- row:
}

intra, err = yieldInnerTransactions(
ctx, &stib.SignedTxnWithAD, block, intra+1, intra, id, outCh)
if err != nil {
return fmt.Errorf("yieldTransactions() adding inner: %w", err)
}
}

return nil
}

// AddTransactions adds transactions from `block` to the database.
// `modifiedTxns` contains enhanced apply data generated by evaluator.
func AddTransactions(block *bookkeeping.Block, modifiedTxns []transactions.SignedTxnInBlock, tx pgx.Tx) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

ch := make(chan []interface{}, 1024)
var err0 error
go func() {
err0 = yieldTransactions(ctx, block, modifiedTxns, ch)
close(ch)
}()

_, err1 := tx.CopyFrom(
context.Background(),
pgx.Identifier{"txn"},
[]string{"round", "intra", "typeenum", "asset", "txid", "txn", "extra"},
copyFromChannel(ch))
if err1 != nil {
// Exiting here will call `cancelFunc` which will cause the goroutine above to exit.
return fmt.Errorf("addTransactions() copy from err: %w", err1)
}

// CopyFrom() exited successfully, so `ch` has been closed, so `err0` has been
// written to, and we can read it without worrying about data races.
if err0 != nil {
return fmt.Errorf("addTransactions() err: %w", err0)
}

return nil
}
101 changes: 101 additions & 0 deletions idb/postgres/internal/writer/write_txn_participation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package writer

import (
"context"
"fmt"

"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/transactions"
"github.com/jackc/pgx/v4"

"github.com/algorand/indexer/accounting"
)

// getTransactionParticipants returns referenced addresses from the txn and all inner txns
func getTransactionParticipants(stxnad *transactions.SignedTxnWithAD, includeInner bool) []basics.Address {
const acctsPerTxn = 7

if !includeInner || len(stxnad.ApplyData.EvalDelta.InnerTxns) == 0 {
// if no inner transactions then adding into a slice with in-place de-duplication
res := make([]basics.Address, 0, acctsPerTxn)
add := func(address basics.Address) {
for _, p := range res {
if address == p {
return
}
}
res = append(res, address)
}

accounting.GetTransactionParticipants(stxnad, includeInner, add)
return res
}

// inner transactions might have inner transactions might have inner...
// so the resultant slice is created after collecting all the data from nested transactions.
// this is probably a bit slower than the default case due to two mem allocs and additional iterations
size := acctsPerTxn * (1 + len(stxnad.ApplyData.EvalDelta.InnerTxns)) // approx
participants := make(map[basics.Address]struct{}, size)
add := func(address basics.Address) {
participants[address] = struct{}{}
}

accounting.GetTransactionParticipants(stxnad, includeInner, add)

res := make([]basics.Address, 0, len(participants))
for addr := range participants {
res = append(res, addr)
}

return res
}

// addInnerTransactionParticipation traverses the inner transaction tree and
// adds txn participation records for each. It performs a preorder traversal
// to correctly compute the intra round offset, the offset for the next
// transaction is returned.
func addInnerTransactionParticipation(stxnad *transactions.SignedTxnWithAD, round, intra uint64, rows [][]interface{}) (uint64, [][]interface{}) {
next := intra
for _, itxn := range stxnad.ApplyData.EvalDelta.InnerTxns {
// Only search inner transactions by direct participation.
// TODO: Should inner app calls be surfaced by their participants?
participants := getTransactionParticipants(&itxn, false)

for j := range participants {
rows = append(rows, []interface{}{participants[j][:], round, next})
}

next, rows = addInnerTransactionParticipation(&itxn, round, next+1, rows)
}
return next, rows

}

// AddTransactionParticipation writes account participation info to the
// `txn_participation` table.
func AddTransactionParticipation(block *bookkeeping.Block, tx pgx.Tx) error {
var rows [][]interface{}
next := uint64(0)

for _, stxnib := range block.Payset {
participants := getTransactionParticipants(&stxnib.SignedTxnWithAD, true)

for j := range participants {
rows = append(rows, []interface{}{participants[j][:], uint64(block.Round()), next})
}

next, rows = addInnerTransactionParticipation(&stxnib.SignedTxnWithAD, uint64(block.Round()), next+1, rows)
}

_, err := tx.CopyFrom(
context.Background(),
pgx.Identifier{"txn_participation"},
[]string{"addr", "round", "intra"},
pgx.CopyFromRows(rows))
if err != nil {
return fmt.Errorf("addTransactionParticipation() copy from err: %w", err)
}

return nil
}
Loading