Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orchestrator old signatures replay #351

Merged
merged 40 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
555373b
adds valset request by nonce
rach-id Apr 21, 2022
3af0f60
add querier, broadcaster and updates orchestrator to use them
rach-id Apr 21, 2022
6869b2a
partially fix the tests for the new design
rach-id Apr 21, 2022
3c20722
update the deployer for the new design
rach-id Apr 21, 2022
917fbee
update orchestrator to new design
rach-id Apr 21, 2022
266d6a1
Adds querier and evm client update
rach-id Apr 21, 2022
4c2d5b3
update relayer to new design
rach-id Apr 21, 2022
3b0cb49
fix query.proto
rach-id Apr 22, 2022
a44830c
update deploy_command to use new Querier
rach-id Apr 22, 2022
b15f9c6
formatting
rach-id Apr 22, 2022
f27ef2d
formatting
rach-id Apr 22, 2022
57e2d9c
go.sum
rach-id Apr 22, 2022
de19f31
adds query last unbonding height
rach-id Apr 22, 2022
3d072b6
Merge branch 'qgb-integration' into adds_unbonding_height_query
rach-id Apr 25, 2022
5cfbc90
Merge branch 'qgb-integration' into adds_unbonding_height_query
rach-id Apr 25, 2022
c9c6f7c
adds orchestrator valset replay
rach-id Apr 25, 2022
431321b
adds orchestrator data commitment replay
rach-id Apr 25, 2022
d529b20
cosmetics
rach-id Apr 25, 2022
6d2c677
puts valset signature catchup in a separate function
rach-id Apr 26, 2022
7189c2d
puts data commitment signature catchup in a separate function
rach-id Apr 26, 2022
8ca549a
Querier cosmetics
rach-id Apr 26, 2022
adda9e9
go.sum
rach-id Apr 26, 2022
25aaf13
Merge branch 'qgb-integration' into orchestrator_replay
rach-id Apr 26, 2022
5be6b76
rename querier and catchup functions
rach-id Apr 26, 2022
dd5c934
adds genesis case to querylastvalset
rach-id Apr 26, 2022
af00040
format + todo
rach-id Apr 26, 2022
f95b90b
Merge branch 'qgb-integration' into orchestrator_replay
rach-id Apr 27, 2022
65e1ae4
initialized tm logger in test
rach-id Apr 28, 2022
a67947a
update orchestrator code to use uint64 instead of int64
rach-id Apr 28, 2022
6782e68
better logging
rach-id Apr 28, 2022
4a9c05d
Merge branch 'qgb-integration' into orchestrator_replay
rach-id Apr 28, 2022
d35ac96
Update x/qgb/orchestrator/orchestrator_client.go
rach-id Apr 29, 2022
340e39d
formats import
rach-id Apr 29, 2022
98d3860
rename addOldValsetAttestations and addOldDataCommitmentAttestations
rach-id Apr 29, 2022
49aaa7a
defering logging instead of repeating it on every return
rach-id Apr 29, 2022
bdd2ccd
adds comment
rach-id Apr 29, 2022
49ce5a5
Update x/qgb/orchestrator/relayer.go
rach-id Apr 29, 2022
11abe2c
Update x/qgb/orchestrator/querier.go
rach-id Apr 29, 2022
49dedf4
remove unnecessary error
rach-id Apr 29, 2022
b67edd1
Merge branch 'qgb-integration' into orchestrator_replay
rach-id Apr 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions x/qgb/orchestrator/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package orchestrator

import (
"context"
tmlog "github.com/tendermint/tendermint/libs/log"
"os"
"sync"
"testing"
"time"
Expand All @@ -27,6 +29,7 @@ func setupTestOrchestrator(t *testing.T, bc Broadcaster) *orchestrator {
orchestratorAddress: crypto.PubkeyToAddress(priv.PublicKey).Hex(),
bridgeID: types.BridgeId,
evmPrivateKey: *priv,
logger: tmlog.NewTMLogger(os.Stdout),
}
}

Expand Down
21 changes: 14 additions & 7 deletions x/qgb/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"fmt"
"github.com/tendermint/tendermint/libs/log"
"math/big"

"github.com/ethereum/go-ethereum/crypto"
Expand All @@ -13,6 +14,7 @@ import (
)

type orchestrator struct {
logger log.Logger
// TODO this will change once we have the worker pool pattern
broadcaster Broadcaster

Expand All @@ -28,12 +30,14 @@ func (oc *orchestrator) processValsetEvents(ctx context.Context, valsetChannel <
for valset := range valsetChannel {
signBytes, err := valset.SignBytes(oc.bridgeID)
if err != nil {
return err
oc.logger.Error(fmt.Sprintf("valset nonce %d: %s", valset.Nonce, err.Error()))
continue
}

signature, err := types.NewEthereumSignature(signBytes.Bytes(), &oc.evmPrivateKey)
if err != nil {
return err
oc.logger.Error(fmt.Sprintf("valset nonce %d: %s", valset.Nonce, err.Error()))
continue
}

// create and send the valset hash
Expand All @@ -46,9 +50,10 @@ func (oc *orchestrator) processValsetEvents(ctx context.Context, valsetChannel <

hash, err := oc.broadcaster.BroadcastTx(ctx, msg)
if err != nil {
return err
oc.logger.Error(fmt.Sprintf("valset nonce %d: %s", valset.Nonce, err.Error()))
continue
}
fmt.Printf("\nsigned Valset %d : %s\n", msg.Nonce, hash)
oc.logger.Info(fmt.Sprintf("signed Valset %d : %s", msg.Nonce, hash))
}
return nil
}
Expand All @@ -61,7 +66,8 @@ func (oc *orchestrator) processDataCommitmentEvents(
dataRootHash := types.DataCommitmentTupleRootSignBytes(oc.bridgeID, big.NewInt(int64(dc.Nonce)), dc.Commitment)
dcSig, err := types.NewEthereumSignature(dataRootHash.Bytes(), &oc.evmPrivateKey)
if err != nil {
return err
oc.logger.Error(fmt.Sprintf("data commitment range %d-%d: %s", dc.Start, dc.End, err.Error()))
continue
}

msg := &types.MsgDataCommitmentConfirm{
Expand All @@ -75,9 +81,10 @@ func (oc *orchestrator) processDataCommitmentEvents(

hash, err := oc.broadcaster.BroadcastTx(ctx, msg)
if err != nil {
return err
oc.logger.Error(fmt.Sprintf("data commitment range %d-%d: %s", dc.Start, dc.End, err.Error()))
continue
}
fmt.Printf("\nsigned commitment %d-%d: %s\n", msg.BeginBlock, msg.EndBlock, hash)
oc.logger.Info(fmt.Sprintf("signed commitment %d-%d: %s", msg.BeginBlock, msg.EndBlock, hash))
}
return nil
}
167 changes: 155 additions & 12 deletions x/qgb/orchestrator/orchestrator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ type orchestratorClient struct {
orchestratorAddress string
}

func NewOrchestratorClient(logger tmlog.Logger, tendermintRpc string, querier Querier, orchAddr string) (AppClient, error) {
trpc, err := http.New(tendermintRpc, "/websocket")
func NewOrchestratorClient(
logger tmlog.Logger,
tendermintRPC string,
querier Querier,
orchAddr string,
) (AppClient, error) {
trpc, err := http.New(tendermintRPC, "/websocket")
if err != nil {
return nil, err
}
Expand All @@ -41,7 +46,6 @@ func NewOrchestratorClient(logger tmlog.Logger, tendermintRpc string, querier Qu
}, nil
}

// TODO this will be removed when we use the new job/worker design for the client
func contains(s []uint64, nonce uint64) bool {
for _, v := range s {
if v == nonce {
Expand All @@ -52,17 +56,20 @@ func contains(s []uint64, nonce uint64) bool {
}

func (oc *orchestratorClient) SubscribeValset(ctx context.Context) (<-chan types.Valset, error) {
valsetsChan := make(chan types.Valset, 10)
valsetsChan := make(chan types.Valset, 100)

// will change once we have the new design
go oc.addOldValsetAttestations(ctx, valsetsChan) //nolint:errcheck

results, err := oc.tendermintRPC.Subscribe(
ctx,
"valset-changes",
fmt.Sprintf("%s.%s='%s'", types.EventTypeValsetRequest, sdk.AttributeKeyModule, types.ModuleName),
)

if err != nil {
return nil, err
}

nonces := make([]uint64, 10000)

go func() {
Expand All @@ -75,13 +82,13 @@ func (oc *orchestratorClient) SubscribeValset(ctx context.Context) (<-chan types
valsets, err := oc.querier.QueryLastValsets(ctx)
if err != nil {
oc.logger.Error(err.Error())
return
continue
}

// todo: double check that the first validator set is found
if len(valsets) < 1 {
oc.logger.Error("no validator sets found")
return
continue
}

valset := valsets[0]
Expand All @@ -90,11 +97,12 @@ func (oc *orchestratorClient) SubscribeValset(ctx context.Context) (<-chan types
resp, err := oc.querier.QueryValsetConfirm(ctx, valset.Nonce, oc.orchestratorAddress)
if err != nil {
oc.logger.Error(err.Error())
return
continue
}

if resp == nil && !contains(nonces, valset.Nonce) {
nonces = append(nonces, valset.Nonce)
valsetsChan <- valset
nonces = append(nonces, valset.Nonce)
}
}
}
Expand All @@ -103,8 +111,66 @@ func (oc *orchestratorClient) SubscribeValset(ctx context.Context) (<-chan types
return valsetsChan, nil
}

func (oc *orchestratorClient) addOldValsetAttestations(ctx context.Context, valsetsChan chan types.Valset) {
oc.logger.Info("Started adding Valsets attestation to queue")
defer oc.logger.Info("Finished adding Valsets attestation to queue")
lastUnbondingHeight, err := oc.querier.QueryLastUnbondingHeight(ctx)
if err != nil {
oc.logger.Error(err.Error())
return
}

valsets, err := oc.querier.QueryLastValsets(ctx)
if err != nil {
oc.logger.Error(err.Error())
return
}

// todo: double check that the first validator set is found
if len(valsets) < 1 {
oc.logger.Error("no validator sets found")
return
}
valsetsChan <- valsets[0]

previousNonce := valsets[0].Nonce
for {
if previousNonce == 1 {
return
}
previousNonce = previousNonce - 1
lastVsConfirm, err := oc.querier.QueryValsetConfirm(ctx, previousNonce, oc.orchestratorAddress)
if err != nil {
oc.logger.Error(err.Error())
return
}
// The valset signed by the orchestrator to get lastVsConfirm
// Used to get the height that valset was first introduced
correspondingVs, err := oc.querier.QueryValsetByNonce(ctx, previousNonce)
if err != nil {
oc.logger.Error(err.Error())
return
}
if correspondingVs.Height < lastUnbondingHeight {
// Most likely, we're up to date and don't need to catchup anymore
return
}
if lastVsConfirm != nil {
// in case we have holes in the signatures
continue
}

// valsetChan is the ordinary valset channel used above. The orchestrator keeps adding to it
// old attestations same as with new ones when listening.
valsetsChan <- *correspondingVs
rach-id marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (oc *orchestratorClient) SubscribeDataCommitment(ctx context.Context) (<-chan ExtendedDataCommitment, error) {
dataCommitments := make(chan ExtendedDataCommitment)
dataCommitments := make(chan ExtendedDataCommitment, 100)
rach-id marked this conversation as resolved.
Show resolved Hide resolved

// will change once we have the new design
go oc.addOldDataCommitmentAttestations(ctx, dataCommitments) //nolint:errcheck

// queryClient := types.NewQueryClient(orchestratorClient.qgbRPC)

Expand Down Expand Up @@ -164,11 +230,88 @@ func (oc *orchestratorClient) SubscribeDataCommitment(ctx context.Context) (<-ch
End: endHeight,
Nonce: nonce,
}

}
}

}()

return dataCommitments, nil
}

func (oc *orchestratorClient) addOldDataCommitmentAttestations(
ctx context.Context,
dataCommitmentsChan chan ExtendedDataCommitment,
) {
oc.logger.Info("Started adding old Data Commitments attestation to queue")
defer oc.logger.Info("Finished adding old Data Commitments attestation to queue")
lastUnbondingHeight, err := oc.querier.QueryLastUnbondingHeight(ctx)
if err != nil {
oc.logger.Error(err.Error())
return
}

currentHeight, err := oc.querier.QueryHeight(ctx)
if err != nil {
oc.logger.Error(err.Error())
return
}

var previousBeginBlock uint64
var previousEndBlock uint64

if currentHeight%types.DataCommitmentWindow == 0 {
previousBeginBlock = currentHeight
} else {
// to have a correct range
previousBeginBlock = currentHeight - currentHeight%types.DataCommitmentWindow
}

for {
// Will be refactored when we have data commitment requests
previousEndBlock = previousBeginBlock
previousBeginBlock = previousEndBlock - types.DataCommitmentWindow

if previousEndBlock == 0 {
return
}

existingConfirm, err := oc.querier.QueryDataCommitmentConfirm(
ctx,
previousEndBlock,
previousBeginBlock,
oc.orchestratorAddress,
)
if err != nil {
oc.logger.Error(err.Error())
continue
}

if previousEndBlock < lastUnbondingHeight {
// Most likely, we're up to date and don't need to catchup anymore
return
}
if existingConfirm != nil {
// In case we have holes in the signatures
continue
}
previousNonce := previousEndBlock / types.DataCommitmentWindow

previousCommitment, err := oc.tendermintRPC.DataCommitment(
ctx,
fmt.Sprintf("block.height >= %d AND block.height <= %d",
previousBeginBlock,
previousEndBlock,
),
)
if err != nil {
oc.logger.Error(err.Error())
continue
}

dataCommitmentsChan <- ExtendedDataCommitment{
Commitment: previousCommitment.DataCommitment,
Start: previousBeginBlock,
End: previousEndBlock,
Nonce: previousNonce,
}
}
}
1 change: 1 addition & 0 deletions x/qgb/orchestrator/orchestrator_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func OrchestratorCmd() *cobra.Command {
evmPrivateKey: *config.privateKey,
bridgeID: types.BridgeId,
orchestratorAddress: signer.GetSignerInfo().GetAddress().String(),
logger: logger,
}

wg := &sync.WaitGroup{}
Expand Down
Loading