From 61a51d9f5eaf899a3388a65d382da2720767fb77 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 12 Nov 2019 16:42:37 -0800 Subject: [PATCH] Revert "Merge pull request #44 from ipfs/chore/update-peertaskqueue" This reverts commit e8245a51ced19d9e856b54ccc831fc384d4ab634, reversing changes made to 2cbb73ffe327b65f4caa3b3fa8eee3d5a1fda370. --- go.mod | 2 +- go.sum | 4 --- responsemanager/responsemanager.go | 28 +++++++-------- responsemanager/responsemanager_test.go | 46 +++++++++++-------------- 4 files changed, 33 insertions(+), 47 deletions(-) diff --git a/go.mod b/go.mod index 3c22b705..2932561e 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/ipfs/go-ipfs-blockstore v0.0.1 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-log v0.0.1 - github.com/ipfs/go-peertaskqueue v0.1.2-0.20191111205511-fd33b91329d3 + github.com/ipfs/go-peertaskqueue v0.0.4 github.com/ipld/go-ipld-prime v0.0.2-0.20191025153308-092ea9a7696d github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/libp2p/go-eventbus v0.0.3 // indirect diff --git a/go.sum b/go.sum index 01a556b7..1e286f07 100644 --- a/go.sum +++ b/go.sum @@ -102,8 +102,6 @@ github.com/ipfs/go-ipfs-ds-help v0.0.1 h1:QBg+Ts2zgeemK/dB0saiF/ykzRGgfoFMT90Rzo github.com/ipfs/go-ipfs-ds-help v0.0.1/go.mod h1:gtP9xRaZXqIQRh1HRpp595KbBEdgqWFxefeVKOV8sxo= github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU= github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= -github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY= -github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50= github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc= @@ -112,8 +110,6 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= github.com/ipfs/go-peertaskqueue v0.0.4 h1:i0JprfjjILYcWM1xguO/1MCS8XKVxLSl+ECEVr6i8nw= github.com/ipfs/go-peertaskqueue v0.0.4/go.mod h1:03H8fhyeMfKNFWqzYEVyMbcPUeYrqP1MX6Kd+aN+rMQ= -github.com/ipfs/go-peertaskqueue v0.1.2-0.20191111205511-fd33b91329d3 h1:c9CXamXsukIP0Ij/wIY4VECIq8uUnLPBBBy/XR3wlI8= -github.com/ipfs/go-peertaskqueue v0.1.2-0.20191111205511-fd33b91329d3/go.mod h1:5/eNrBEbtSKWCG+kQK8K8fGNixoYUnr+P7jivavs9lY= github.com/ipld/go-ipld-prime v0.0.0-20190730002952-369bb56ad071 h1:+jRGf/jb5MnxBsLYczZF0u5pr3nzfbS8pPt/49Yxekw= github.com/ipld/go-ipld-prime v0.0.0-20190730002952-369bb56ad071/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w= github.com/ipld/go-ipld-prime v0.0.1-filecoin h1:jK1bUG/z73GNeKrNlVfdexkEIIWp0BEsBVjHkh9WvXo= diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index 1fbed9ac..0cefd04d 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -42,10 +42,9 @@ type responseTaskData struct { // QueryQueue is an interface that can receive new selector query tasks // and prioritize them as needed, and pop them off later type QueryQueue interface { - PushTasks(to peer.ID, tasks ...peertask.Task) - PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) - Remove(topic peertask.Topic, p peer.ID) - TasksDone(to peer.ID, tasks ...*peertask.Task) + PushBlock(to peer.ID, tasks ...peertask.Task) + PopBlock() *peertask.TaskBlock + Remove(identifier peertask.Identifier, p peer.ID) ThawRound() } @@ -140,20 +139,20 @@ func (rm *ResponseManager) processQueriesWorker() { taskDataChan := make(chan *responseTaskData) var taskData *responseTaskData for { - p, nextTasks, _ := rm.queryQueue.PopTasks(1) - for nextTasks == nil { + nextTaskBlock := rm.queryQueue.PopBlock() + for nextTaskBlock == nil { select { case <-rm.ctx.Done(): return case <-rm.workSignal: - p, nextTasks, _ = rm.queryQueue.PopTasks(1) + nextTaskBlock = rm.queryQueue.PopBlock() case <-rm.ticker.C: rm.queryQueue.ThawRound() - p, nextTasks, _ = rm.queryQueue.PopTasks(1) + nextTaskBlock = rm.queryQueue.PopBlock() } } - for _, task := range nextTasks { - key := task.Topic.(responseKey) + for _, task := range nextTaskBlock.Tasks { + key := task.Identifier.(responseKey) select { case rm.messages <- &responseDataRequest{key, taskDataChan}: case <-rm.ctx.Done(): @@ -170,7 +169,8 @@ func (rm *ResponseManager) processQueriesWorker() { case <-rm.ctx.Done(): } } - rm.queryQueue.TasksDone(p, nextTasks...) + nextTaskBlock.Done(nextTaskBlock.Tasks) + } } @@ -249,11 +249,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { root: request.Root(), selector: request.Selector(), } - rm.queryQueue.PushTasks(prm.p, peertask.Task{ - Topic: key, - Priority: int(request.Priority()), - Work: 1, - }) + rm.queryQueue.PushBlock(prm.p, peertask.Task{Identifier: key, Priority: int(request.Priority())}) select { case rm.workSignal <- struct{}{}: default: diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index a210cc2e..331f03a8 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -21,61 +21,55 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -type taskRQ struct { - tasks []*peertask.Task - target peer.ID -} - type fakeQueryQueue struct { popWait sync.WaitGroup queriesLk sync.RWMutex - queries []*taskRQ + queries []*peertask.TaskBlock } -func (fqq *fakeQueryQueue) PushTasks(to peer.ID, tasks ...peertask.Task) { +func (fqq *fakeQueryQueue) PushBlock(to peer.ID, tasks ...peertask.Task) { fqq.queriesLk.Lock() - var ptrs []*peertask.Task - for _, t := range tasks { - ptrs = append(ptrs, &t) - } - fqq.queries = append(fqq.queries, &taskRQ{ - tasks: ptrs, - target: to, + fqq.queries = append(fqq.queries, &peertask.TaskBlock{ + Tasks: tasks, + Priority: tasks[0].Priority, + Target: to, + Done: func([]peertask.Task) {}, }) fqq.queriesLk.Unlock() } -func (fqq *fakeQueryQueue) PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) { +func (fqq *fakeQueryQueue) PopBlock() *peertask.TaskBlock { fqq.popWait.Wait() fqq.queriesLk.Lock() defer fqq.queriesLk.Unlock() if len(fqq.queries) == 0 { - return "", nil, 0 + return nil } - trq := fqq.queries[0] + block := fqq.queries[0] fqq.queries = fqq.queries[1:] - return trq.target, trq.tasks, 0 + return block } -func (fqq *fakeQueryQueue) Remove(topic peertask.Topic, p peer.ID) { +func (fqq *fakeQueryQueue) Remove(identifier peertask.Identifier, p peer.ID) { fqq.queriesLk.Lock() defer fqq.queriesLk.Unlock() for i, query := range fqq.queries { - if query.target == p { - for j, task := range query.tasks { - if task.Topic == topic { - query.tasks = append(query.tasks[:j], query.tasks[j+1:]...) + if query.Target == p { + for j, task := range query.Tasks { + if task.Identifier == identifier { + query.Tasks = append(query.Tasks[:j], query.Tasks[j+1:]...) } } - if len(query.tasks) == 0 { + if len(query.Tasks) == 0 { fqq.queries = append(fqq.queries[:i], fqq.queries[i+1:]...) } } } } -func (fqq *fakeQueryQueue) ThawRound() {} -func (fqq *fakeQueryQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {} +func (fqq *fakeQueryQueue) ThawRound() { + +} type fakePeerManager struct { lastPeer peer.ID