Skip to content
This repository has been archived by the owner on Sep 20, 2023. It is now read-only.

feat: working Agoric initiator #73

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions blockchain/agoric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package blockchain

import (
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/prometheus/client_golang/prometheus"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/external-initiator/store"
"github.com/smartcontractkit/external-initiator/subscriber"
)

// Agoric is the identifier of this
// blockchain integration.
const Agoric = "agoric"

// linkDecimals is the number of decimal places in $LINK
const linkDecimals = 18

// linkAgoricDecimals is the number of decimal places in a uaglink token
// FIXME: Ideally the same as linkDecimals.
const linkAgoricDecimals = 6

type agoricFilter struct {
JobID string
}

type agoricManager struct {
endpointName string
filter agoricFilter
}

func init() {
if linkAgoricDecimals > linkDecimals {
panic(fmt.Errorf("linkAgoricDecimals %d must be less than or equal to linkDecimals %d", linkAgoricDecimals, linkDecimals))
}
}

func createAgoricManager(t subscriber.Type, conf store.Subscription) (*agoricManager, error) {
if t != subscriber.WS {
return nil, errors.New("only WS connections are allowed for Agoric")
}

return &agoricManager{
endpointName: conf.EndpointName,
filter: agoricFilter{
JobID: conf.Job,
},
}, nil
}

func (sm agoricManager) GetTriggerJson() []byte {
return nil
}

type agoricEvent struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
}

type agoricOnQueryData struct {
QueryID string `json:"queryId"`
Query json.RawMessage `json:"query"`
Fee int64 `json:"fee"`
}

type chainlinkQuery struct {
JobID string `json:"jobId"`
Params map[string]interface{} `json:"params"`
}

func (sm *agoricManager) ParseResponse(data []byte) ([]subscriber.Event, bool) {
promLastSourcePing.With(prometheus.Labels{"endpoint": sm.endpointName, "jobid": string(sm.filter.JobID)}).SetToCurrentTime()

var agEvent agoricEvent
err := json.Unmarshal(data, &agEvent)
if err != nil {
logger.Error("Failed parsing agoricEvent:", err)
return nil, false
}

var subEvents []subscriber.Event

switch agEvent.Type {
case "oracleServer/onQuery":
// Do this below.
break
case "oracleServer/onError":
case "oracleServer/onReply":
return nil, false
default:
// We don't need something so noisy.
// logger.Error("Unimplemented message type:", agEvent.Type)
return nil, false
}

var onQueryData agoricOnQueryData
err = json.Unmarshal(agEvent.Data, &onQueryData)
if err != nil {
logger.Error("Failed parsing queryData:", err)
return nil, false
}

var query chainlinkQuery
err = json.Unmarshal(onQueryData.Query, &query)
if err != nil {
logger.Error("Failed parsing chainlink query:", err)
return nil, false
}

// Check that the job ID matches.
if query.JobID != sm.filter.JobID {
return subEvents, true
}

var requestParams map[string]interface{}
if query.Params == nil {
requestParams = make(map[string]interface{})
} else {
requestParams = query.Params
}
requestParams["request_id"] = onQueryData.QueryID
requestParams["payment"] = fmt.Sprint(onQueryData.Fee) +
strings.Repeat("0", linkDecimals-linkAgoricDecimals)

event, err := json.Marshal(requestParams)
if err != nil {
logger.Error(err)
return nil, false
}
subEvents = append(subEvents, event)

return subEvents, true
}

func (sm *agoricManager) GetTestJson() []byte {
return nil
}

func (sm *agoricManager) ParseTestResponse(data []byte) error {
return nil
}
217 changes: 217 additions & 0 deletions blockchain/agoric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package blockchain

import (
"errors"
"reflect"
"testing"

"github.com/smartcontractkit/external-initiator/store"
"github.com/smartcontractkit/external-initiator/subscriber"
)

func TestCreateAgoricFilterMessage(t *testing.T) {
tests := []struct {
name string
args store.AgoricSubscription
p subscriber.Type
want []byte
err error
}{
{
"empty",
store.AgoricSubscription{},
subscriber.WS,
nil,
nil,
},
{
"address only",
store.AgoricSubscription{},
subscriber.WS,
nil,
nil,
},
{
"empty RPC",
store.AgoricSubscription{},
subscriber.RPC,
nil,
errors.New("only WS connections are allowed for Agoric"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mgr, err := createAgoricManager(tt.p, store.Subscription{Agoric: tt.args})
if !reflect.DeepEqual(err, tt.err) {
t.Errorf("createAgoricManager.err = %s, want %s", err, tt.err)
}
if err == nil {
if got := mgr.GetTriggerJson(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetTriggerJson() = %s, want %s", got, tt.want)
}
}
})
}

t.Run("has invalid filter query", func(t *testing.T) {
got := agoricManager{filter: agoricFilter{JobID: "1919"}}.GetTriggerJson()
if got != nil {
t.Errorf("GetTriggerJson() = %s, want nil", got)
}
})
}

func TestAgoricManager_GetTestJson(t *testing.T) {
type fields struct {
filter agoricFilter
p subscriber.Type
}
tests := []struct {
name string
fields fields
want []byte
}{
{
"returns empty when using RPC",
fields{
p: subscriber.RPC,
},
nil,
},
{
"returns empty when using WS",
fields{
p: subscriber.WS,
},
nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := agoricManager{
filter: tt.fields.filter,
}
if got := e.GetTestJson(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetTestJson() = %v, want %v", got, tt.want)
}
})
}
}

func TestAgoricManager_ParseTestResponse(t *testing.T) {
type fields struct {
f agoricFilter
p subscriber.Type
}
type args struct {
data []byte
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
"does nothing for WS",
fields{f: agoricFilter{}, p: subscriber.WS},
args{},
false,
},
{
"parses RPC responses",
fields{f: agoricFilter{}, p: subscriber.RPC},
args{[]byte(`{"jsonrpc":"2.0","id":1,"result":"0x1"}`)},
false,
},
{
"fails unmarshal payload",
fields{f: agoricFilter{}, p: subscriber.RPC},
args{[]byte(`error`)},
false,
},
{
"fails unmarshal result",
fields{f: agoricFilter{}, p: subscriber.RPC},
args{[]byte(`{"jsonrpc":"2.0","id":1,"result":["0x1"]}`)},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := agoricManager{
filter: tt.fields.f,
}
if err := e.ParseTestResponse(tt.args.data); (err != nil) != tt.wantErr {
t.Errorf("ParseTestResponse() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func TestAgoricManager_ParseResponse(t *testing.T) {
type fields struct {
filter agoricFilter
p subscriber.Type
}
type args struct {
data []byte
}
tests := []struct {
name string
fields fields
args args
want []subscriber.Event
want1 bool
}{
{
"fails parsing invalid payload",
fields{filter: agoricFilter{}, p: subscriber.WS},
args{data: []byte(`invalid`)},
nil,
false,
},
{
"fails parsing invalid WS body",
fields{filter: agoricFilter{}, p: subscriber.WS},
args{data: []byte(`{}`)},
nil,
false,
},
{
"fails parsing invalid WS type",
fields{filter: agoricFilter{}, p: subscriber.WS},
args{data: []byte(`{"type":"oracleServer/wrongType"}`)},
nil,
false,
},
{
"successfully parses WS Oracle request",
fields{filter: agoricFilter{JobID: "9999"}, p: subscriber.WS},
args{data: []byte(`{"type":"oracleServer/onQuery","data":{"query":{"jobID":"9999","params":{"path":"foo"}},"queryId":"123","fee":191919}}`)},
[]subscriber.Event{[]byte(`{"path":"foo","payment":"191919000000000000","request_id":"123"}`)},
true,
},
{
"skips unfiltered WS Oracle request",
fields{filter: agoricFilter{JobID: "Z9999"}, p: subscriber.WS},
args{data: []byte(`{"type":"oracleServer/onQuery","data":{"query":{"jobID":"9999","params":{"path":"foo"}},"queryId":"123","fee":191919}}`)},
nil,
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := agoricManager{
filter: tt.fields.filter,
}
got, got1 := e.ParseResponse(tt.args.data)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ParseResponse() got = %s, want %s", got, tt.want)
}
if got1 != tt.want1 {
t.Errorf("ParseResponse() got1 = %v, want %v", got1, tt.want1)
}
})
}
}
9 changes: 9 additions & 0 deletions blockchain/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var blockchains = []string{
CFX,
Keeper,
BIRITA,
Agoric,
}

type Params struct {
Expand Down Expand Up @@ -73,6 +74,8 @@ func CreateJsonManager(t subscriber.Type, sub store.Subscription) (subscriber.Js
return createNearManager(t, sub)
case CFX:
return createCfxManager(t, sub), nil
case Agoric:
return createAgoricManager(t, sub)
}

return nil, fmt.Errorf("unknown blockchain type %v for JSON manager", sub.Endpoint.Type)
Expand Down Expand Up @@ -167,6 +170,10 @@ func GetValidations(t string, params Params) []int {
return []int{
len(params.Addresses),
}
case Agoric:
return []int{
1,
}
}

return nil
Expand Down Expand Up @@ -216,6 +223,8 @@ func CreateSubscription(sub *store.Subscription, params Params) {
Addresses: params.Addresses,
ServiceName: params.ServiceName,
}
case Agoric:
sub.Agoric = store.AgoricSubscription{}
}
}

Expand Down
Loading