Skip to content

Commit

Permalink
Revert "Merge pull request #44 from ipfs/chore/update-peertaskqueue"
Browse files Browse the repository at this point in the history
This reverts commit e8245a5, reversing
changes made to 2cbb73f.
  • Loading branch information
hannahhoward committed Nov 13, 2019
1 parent e8245a5 commit 61a51d9
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 47 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/ipfs/go-ipfs-blockstore v0.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-peertaskqueue v0.1.2-0.20191111205511-fd33b91329d3
github.com/ipfs/go-peertaskqueue v0.0.4
github.com/ipld/go-ipld-prime v0.0.2-0.20191025153308-092ea9a7696d
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/libp2p/go-eventbus v0.0.3 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ github.com/ipfs/go-ipfs-ds-help v0.0.1 h1:QBg+Ts2zgeemK/dB0saiF/ykzRGgfoFMT90Rzo
github.com/ipfs/go-ipfs-ds-help v0.0.1/go.mod h1:gtP9xRaZXqIQRh1HRpp595KbBEdgqWFxefeVKOV8sxo=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50=
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc=
Expand All @@ -112,8 +110,6 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.0.4 h1:i0JprfjjILYcWM1xguO/1MCS8XKVxLSl+ECEVr6i8nw=
github.com/ipfs/go-peertaskqueue v0.0.4/go.mod h1:03H8fhyeMfKNFWqzYEVyMbcPUeYrqP1MX6Kd+aN+rMQ=
github.com/ipfs/go-peertaskqueue v0.1.2-0.20191111205511-fd33b91329d3 h1:c9CXamXsukIP0Ij/wIY4VECIq8uUnLPBBBy/XR3wlI8=
github.com/ipfs/go-peertaskqueue v0.1.2-0.20191111205511-fd33b91329d3/go.mod h1:5/eNrBEbtSKWCG+kQK8K8fGNixoYUnr+P7jivavs9lY=
github.com/ipld/go-ipld-prime v0.0.0-20190730002952-369bb56ad071 h1:+jRGf/jb5MnxBsLYczZF0u5pr3nzfbS8pPt/49Yxekw=
github.com/ipld/go-ipld-prime v0.0.0-20190730002952-369bb56ad071/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
github.com/ipld/go-ipld-prime v0.0.1-filecoin h1:jK1bUG/z73GNeKrNlVfdexkEIIWp0BEsBVjHkh9WvXo=
Expand Down
28 changes: 12 additions & 16 deletions responsemanager/responsemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ type responseTaskData struct {
// QueryQueue is an interface that can receive new selector query tasks
// and prioritize them as needed, and pop them off later
type QueryQueue interface {
PushTasks(to peer.ID, tasks ...peertask.Task)
PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int)
Remove(topic peertask.Topic, p peer.ID)
TasksDone(to peer.ID, tasks ...*peertask.Task)
PushBlock(to peer.ID, tasks ...peertask.Task)
PopBlock() *peertask.TaskBlock
Remove(identifier peertask.Identifier, p peer.ID)
ThawRound()
}

Expand Down Expand Up @@ -140,20 +139,20 @@ func (rm *ResponseManager) processQueriesWorker() {
taskDataChan := make(chan *responseTaskData)
var taskData *responseTaskData
for {
p, nextTasks, _ := rm.queryQueue.PopTasks(1)
for nextTasks == nil {
nextTaskBlock := rm.queryQueue.PopBlock()
for nextTaskBlock == nil {
select {
case <-rm.ctx.Done():
return
case <-rm.workSignal:
p, nextTasks, _ = rm.queryQueue.PopTasks(1)
nextTaskBlock = rm.queryQueue.PopBlock()
case <-rm.ticker.C:
rm.queryQueue.ThawRound()
p, nextTasks, _ = rm.queryQueue.PopTasks(1)
nextTaskBlock = rm.queryQueue.PopBlock()
}
}
for _, task := range nextTasks {
key := task.Topic.(responseKey)
for _, task := range nextTaskBlock.Tasks {
key := task.Identifier.(responseKey)
select {
case rm.messages <- &responseDataRequest{key, taskDataChan}:
case <-rm.ctx.Done():
Expand All @@ -170,7 +169,8 @@ func (rm *ResponseManager) processQueriesWorker() {
case <-rm.ctx.Done():
}
}
rm.queryQueue.TasksDone(p, nextTasks...)
nextTaskBlock.Done(nextTaskBlock.Tasks)

}

}
Expand Down Expand Up @@ -249,11 +249,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) {
root: request.Root(),
selector: request.Selector(),
}
rm.queryQueue.PushTasks(prm.p, peertask.Task{
Topic: key,
Priority: int(request.Priority()),
Work: 1,
})
rm.queryQueue.PushBlock(prm.p, peertask.Task{Identifier: key, Priority: int(request.Priority())})
select {
case rm.workSignal <- struct{}{}:
default:
Expand Down
46 changes: 20 additions & 26 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,61 +21,55 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

type taskRQ struct {
tasks []*peertask.Task
target peer.ID
}

type fakeQueryQueue struct {
popWait sync.WaitGroup
queriesLk sync.RWMutex
queries []*taskRQ
queries []*peertask.TaskBlock
}

func (fqq *fakeQueryQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
func (fqq *fakeQueryQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
fqq.queriesLk.Lock()
var ptrs []*peertask.Task
for _, t := range tasks {
ptrs = append(ptrs, &t)
}
fqq.queries = append(fqq.queries, &taskRQ{
tasks: ptrs,
target: to,
fqq.queries = append(fqq.queries, &peertask.TaskBlock{
Tasks: tasks,
Priority: tasks[0].Priority,
Target: to,
Done: func([]peertask.Task) {},
})
fqq.queriesLk.Unlock()
}

func (fqq *fakeQueryQueue) PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) {
func (fqq *fakeQueryQueue) PopBlock() *peertask.TaskBlock {
fqq.popWait.Wait()
fqq.queriesLk.Lock()
defer fqq.queriesLk.Unlock()
if len(fqq.queries) == 0 {
return "", nil, 0
return nil
}
trq := fqq.queries[0]
block := fqq.queries[0]
fqq.queries = fqq.queries[1:]
return trq.target, trq.tasks, 0
return block
}

func (fqq *fakeQueryQueue) Remove(topic peertask.Topic, p peer.ID) {
func (fqq *fakeQueryQueue) Remove(identifier peertask.Identifier, p peer.ID) {
fqq.queriesLk.Lock()
defer fqq.queriesLk.Unlock()
for i, query := range fqq.queries {
if query.target == p {
for j, task := range query.tasks {
if task.Topic == topic {
query.tasks = append(query.tasks[:j], query.tasks[j+1:]...)
if query.Target == p {
for j, task := range query.Tasks {
if task.Identifier == identifier {
query.Tasks = append(query.Tasks[:j], query.Tasks[j+1:]...)
}
}
if len(query.tasks) == 0 {
if len(query.Tasks) == 0 {
fqq.queries = append(fqq.queries[:i], fqq.queries[i+1:]...)
}
}
}
}

func (fqq *fakeQueryQueue) ThawRound() {}
func (fqq *fakeQueryQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {}
func (fqq *fakeQueryQueue) ThawRound() {

}

type fakePeerManager struct {
lastPeer peer.ID
Expand Down

0 comments on commit 61a51d9

Please sign in to comment.