Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Update offline message sending and receiving. #1688

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions core/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,32 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M
}
}
log.Debugf("Sending offline message to: %s, Message Type: %s, PointerID: %s, Location: %s", p.Pretty(), m.MessageType.String(), pointer.Cid.String(), pointer.Value.Addrs[0].String())
OfflineMessageWaitGroup.Add(2)
go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := ipfs.PublishPointer(n.DHT, ctx, pointer)
if err != nil {
log.Error(err)
}

// Push provider to our push nodes for redundancy
for _, p := range n.PushNodes {
// We publish our pointers to three different locations:
// 1. The pushnodes
// 2. The DHT
// 3. Pubsub
// Each one is done in a separate goroutine so as to not block but we
// do increment the OfflineMessageWaitGroup which is used to block
// shutdown until all publishing is finished.
OfflineMessageWaitGroup.Add(2 + len(n.PushNodes))
for _, p := range n.PushNodes {
go func(pid peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := ipfs.PutPointerToPeer(n.DHT, ctx, p, pointer)
err := ipfs.PutPointerToPeer(n.DHT, ctx, pid, pointer)
if err != nil {
log.Error(err)
}
OfflineMessageWaitGroup.Done()
}(p)
}
go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := ipfs.PublishPointer(n.DHT, ctx, pointer)
if err != nil {
log.Error(err)
}

OfflineMessageWaitGroup.Done()
Expand Down
60 changes: 32 additions & 28 deletions net/retriever/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewMessageRetriever(cfg MRConfig) *MessageRetriever {
WaitGroup: new(sync.WaitGroup),
}

mr.Add(1)
mr.Add(2)
return &mr
}

Expand All @@ -105,57 +105,61 @@ func (m *MessageRetriever) Run() {
peers := time.NewTicker(time.Minute * 10)
defer dht.Stop()
defer peers.Stop()
go m.fetchPointers(true)
go m.fetchPointersFromDHT()
go m.fetchPointersFromPushNodes()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separating these is a great idea! 👍

for {
select {
case <-dht.C:
m.Add(1)
go m.fetchPointers(true)
go m.fetchPointersFromDHT()
case <-peers.C:
m.Add(1)
go m.fetchPointers(false)
go m.fetchPointersFromPushNodes()
}
}
}

// RunOnce - used to fetch messages only once
func (m *MessageRetriever) RunOnce() {
m.Add(1)
go m.fetchPointers(true)
go m.fetchPointersFromDHT()
m.Add(1)
go m.fetchPointers(false)
go m.fetchPointersFromPushNodes()
}

func (m *MessageRetriever) fetchPointers(useDHT bool) {
func (m *MessageRetriever) fetchPointersFromDHT() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := new(sync.WaitGroup)
downloaded := 0
mh, _ := multihash.FromB58String(m.node.Identity.Pretty())
peerOut := make(chan ps.PeerInfo)
go func(c chan ps.PeerInfo) {
pwg := new(sync.WaitGroup)
pwg.Add(1)
go func(c chan ps.PeerInfo) {
out := m.getPointersDataPeers()
for p := range out {
c <- p
}
pwg.Done()
}(c)
if useDHT {
pwg.Add(1)
go func(c chan ps.PeerInfo) {
iout := ipfs.FindPointersAsync(m.routing, ctx, mh, m.prefixLen)
for p := range iout {
c <- p
}
pwg.Done()
}(c)
iout := ipfs.FindPointersAsync(m.routing, ctx, mh, m.prefixLen)
for p := range iout {
c <- p
}
close(c)

}(peerOut)

m.downloadMessages(peerOut)
}

func (m *MessageRetriever) fetchPointersFromPushNodes() {
peerOut := make(chan ps.PeerInfo)
go func(c chan ps.PeerInfo) {
out := m.getPointersDataPeers()
for p := range out {
c <- p
}
pwg.Wait()
close(c)

}(peerOut)
m.downloadMessages(peerOut)
}

func (m *MessageRetriever) downloadMessages(peerOut chan ps.PeerInfo) {
wg := new(sync.WaitGroup)
downloaded := 0

inFlight := make(map[string]bool)
// Iterate over the pointers, adding 1 to the waitgroup for each pointer found
Expand Down