Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publishing option for signing a message with a custom private key #486

Merged
merged 7 commits into from
May 26, 2022
55 changes: 55 additions & 0 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-core/crypto"
pb "github.com/libp2p/go-libp2p-pubsub/pb"

"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -15,6 +16,12 @@ import (
// ErrTopicClosed is returned if a Topic is utilized after it has been closed
var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")

// ErrNilSignKey is returned if a nil private key was provided
var ErrNilSignKey = errors.New("nil sign key")

// ErrEmptyPeerID is returned if an empty peer ID was provided
var ErrEmptyPeerID = errors.New("empty peer ID")

// Topic is the handle for a pubsub topic
type Topic struct {
p *PubSub
Expand Down Expand Up @@ -286,6 +293,54 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil})
}

// PublishWithSk publishes data to topic using the provided secret key and generated peer ID. Providing also the peer ID
// (which can be cached) will improve the performance as it does not require the generation of the public key
// and the resulting peer ID.
// This method can be used as to be able to publish messages on the network without having a "real", connectable host.
func (t *Topic) PublishWithSk(ctx context.Context, data []byte, signKey crypto.PrivKey, pid peer.ID, opts ...PubOpt) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not abbreviate the Sk and say PublishWithKey or PublishWithSecretKey.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will refactor

if signKey == nil {
return ErrNilSignKey
}
if len(pid) == 0 {
return ErrEmptyPeerID
}

t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return ErrTopicClosed
}

m := &pb.Message{
Data: data,
Topic: &t.topic,
From: nil,
Seqno: nil,
}
if t.p.signID != "" {
m.Seqno = t.p.nextSeqno()
}
m.From = []byte(pid)
err := signMessage(pid, signKey, m)
if err != nil {
return err
}

pub := &PublishOptions{}
for _, opt := range opts {
errOpts := opt(pub)
if errOpts != nil {
return errOpts
}
}

if pub.ready != nil {
t.p.disc.Bootstrap(ctx, t.topic, pub.ready)
}

return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil})
}

// WithReadiness returns a publishing option for only publishing when the router is ready.
// This option is not useful unless PubSub is also using WithDiscovery
func WithReadiness(ready RouterReady) PubOpt {
Expand Down
95 changes: 94 additions & 1 deletion topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/libp2p/go-libp2p-core/peer"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
tnet "github.com/libp2p/go-libp2p-testing/net"
)

func getTopics(psubs []*PubSub, topicID string, opts ...TopicOpt) []*Topic {
Expand Down Expand Up @@ -381,7 +382,7 @@ func TestSubscriptionLeaveNotification(t *testing.T) {

// Test removing peers and verifying that they cause events
subs[1].Cancel()
hosts[2].Close()
_ = hosts[2].Close()
psubs[0].BlacklistPeer(hosts[3].ID())

leavingPeers := make(map[peer.ID]struct{})
Expand Down Expand Up @@ -920,3 +921,95 @@ func TestWithTopicMsgIdFunction(t *testing.T) {
t.Fatal("msg ids are equal")
}
}

func TestTopic_PublishWithSk(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

const topic = "foobar"
const numHosts = 5

virtualPeer := tnet.RandPeerNetParamsOrFatal(t)
hosts := getNetHosts(t, ctx, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)

t.Run("nil sign private key should error", func(t *testing.T) {
err := topics[0].PublishWithSk(ctx, []byte("buff"), nil, virtualPeer.ID)
if err != ErrNilSignKey {
t.Fatal("error should have been of type errNilSignKey")
}
})
t.Run("empty peer ID should error", func(t *testing.T) {
err := topics[0].PublishWithSk(ctx, []byte("buff"), virtualPeer.PrivKey, "")
if err != ErrEmptyPeerID {
t.Fatal("error should have been of type errEmptyPeerID")
}
})
}

func TestTopicRelay_PublishWithSk(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

const topic = "foobar"
const numHosts = 5

virtualPeer := tnet.RandPeerNetParamsOrFatal(t)
hosts := getNetHosts(t, ctx, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)

// [0.Rel] - [1.Rel] - [2.Sub]
// |
// [3.Rel] - [4.Sub]

connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
connect(t, hosts[1], hosts[3])
connect(t, hosts[3], hosts[4])

time.Sleep(time.Millisecond * 100)

var subs []*Subscription

for i, topicValue := range topics {
if i == 2 || i == 4 {
sub, err := topicValue.Subscribe()
if err != nil {
t.Fatal(err)
}

subs = append(subs, sub)
} else {
_, err := topicValue.Relay()
if err != nil {
t.Fatal(err)
}
}
}

time.Sleep(time.Millisecond * 100)

for i := 0; i < 100; i++ {
msg := []byte("message")

owner := rand.Intn(len(topics))

err := topics[owner].PublishWithSk(ctx, msg, virtualPeer.PrivKey, virtualPeer.ID)
if err != nil {
t.Fatal(err)
}

for _, sub := range subs {
received, errSub := sub.Next(ctx)
if errSub != nil {
t.Fatal(errSub)
}

if !bytes.Equal(msg, received.Data) {
t.Fatal("received message is other than expected")
}
}
}
}