diff --git a/core/net.go b/core/net.go index a605e6e4c8..590cd699b1 100644 --- a/core/net.go +++ b/core/net.go @@ -98,23 +98,32 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M } } log.Debugf("Sending offline message to: %s, Message Type: %s, PointerID: %s, Location: %s", p.Pretty(), m.MessageType.String(), pointer.Cid.String(), pointer.Value.Addrs[0].String()) - OfflineMessageWaitGroup.Add(2) - go func() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - err := ipfs.PublishPointer(n.DHT, ctx, pointer) - if err != nil { - log.Error(err) - } - // Push provider to our push nodes for redundancy - for _, p := range n.PushNodes { + // We publish our pointers to three different locations: + // 1. The pushnodes + // 2. The DHT + // 3. Pubsub + // Each one is done in a separate goroutine so as to not block but we + // do increment the OfflineMessageWaitGroup which is used to block + // shutdown until all publishing is finished. + OfflineMessageWaitGroup.Add(2 + len(n.PushNodes)) + for _, p := range n.PushNodes { + go func(pid peer.ID) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := ipfs.PutPointerToPeer(n.DHT, ctx, p, pointer) + err := ipfs.PutPointerToPeer(n.DHT, ctx, pid, pointer) if err != nil { log.Error(err) } + OfflineMessageWaitGroup.Done() + }(p) + } + go func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := ipfs.PublishPointer(n.DHT, ctx, pointer) + if err != nil { + log.Error(err) } OfflineMessageWaitGroup.Done() diff --git a/net/retriever/retriever.go b/net/retriever/retriever.go index b66125c59e..022cae79bc 100644 --- a/net/retriever/retriever.go +++ b/net/retriever/retriever.go @@ -96,7 +96,7 @@ func NewMessageRetriever(cfg MRConfig) *MessageRetriever { WaitGroup: new(sync.WaitGroup), } - mr.Add(1) + mr.Add(2) return &mr } @@ -105,15 +105,16 @@ func (m *MessageRetriever) Run() { peers := time.NewTicker(time.Minute * 10) defer dht.Stop() defer peers.Stop() - go m.fetchPointers(true) + go m.fetchPointersFromDHT() + go m.fetchPointersFromPushNodes() for { select { case <-dht.C: m.Add(1) - go m.fetchPointers(true) + go m.fetchPointersFromDHT() case <-peers.C: m.Add(1) - go m.fetchPointers(false) + go m.fetchPointersFromPushNodes() } } } @@ -121,41 +122,44 @@ func (m *MessageRetriever) Run() { // RunOnce - used to fetch messages only once func (m *MessageRetriever) RunOnce() { m.Add(1) - go m.fetchPointers(true) + go m.fetchPointersFromDHT() m.Add(1) - go m.fetchPointers(false) + go m.fetchPointersFromPushNodes() } -func (m *MessageRetriever) fetchPointers(useDHT bool) { +func (m *MessageRetriever) fetchPointersFromDHT() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - wg := new(sync.WaitGroup) - downloaded := 0 mh, _ := multihash.FromB58String(m.node.Identity.Pretty()) peerOut := make(chan ps.PeerInfo) go func(c chan ps.PeerInfo) { - pwg := new(sync.WaitGroup) - pwg.Add(1) - go func(c chan ps.PeerInfo) { - out := m.getPointersDataPeers() - for p := range out { - c <- p - } - pwg.Done() - }(c) - if useDHT { - pwg.Add(1) - go func(c chan ps.PeerInfo) { - iout := ipfs.FindPointersAsync(m.routing, ctx, mh, m.prefixLen) - for p := range iout { - c <- p - } - pwg.Done() - }(c) + iout := ipfs.FindPointersAsync(m.routing, ctx, mh, m.prefixLen) + for p := range iout { + c <- p + } + close(c) + + }(peerOut) + + m.downloadMessages(peerOut) +} + +func (m *MessageRetriever) fetchPointersFromPushNodes() { + peerOut := make(chan ps.PeerInfo) + go func(c chan ps.PeerInfo) { + out := m.getPointersDataPeers() + for p := range out { + c <- p } - pwg.Wait() close(c) + }(peerOut) + m.downloadMessages(peerOut) +} + +func (m *MessageRetriever) downloadMessages(peerOut chan ps.PeerInfo) { + wg := new(sync.WaitGroup) + downloaded := 0 inFlight := make(map[string]bool) // Iterate over the pointers, adding 1 to the waitgroup for each pointer found