Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

remove unused context in Swarm.dialWorkerLoop #268

Merged
merged 1 commit into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions dial_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// DialWorerFunc is used by DialSync to spawn a new dial worker
type dialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) error
type dialWorkerFunc func(peer.ID, <-chan dialRequest) error

// newDialSync constructs a new DialSync
func newDialSync(worker dialWorkerFunc) *DialSync {
Expand Down Expand Up @@ -94,8 +94,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) {
ds: ds,
}

err := ds.dialWorker(adctx, p, actd.reqch)
if err != nil {
if err := ds.dialWorker(p, actd.reqch); err != nil {
cancel()
return nil, err
}
Expand Down
94 changes: 29 additions & 65 deletions dial_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,58 +15,37 @@ func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{}
dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care
dialctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
f := func(p peer.ID, reqch <-chan dialRequest) error {
defer cancel()
dfcalls <- struct{}{}
go func() {
defer cancel()
for {
select {
case req, ok := <-reqch:
if !ok {
return
}

select {
case <-ch:
req.resch <- dialResponse{conn: new(Conn)}
case <-ctx.Done():
req.resch <- dialResponse{err: ctx.Err()}
return
}
case <-ctx.Done():
return
}
for req := range reqch {
<-ch
req.resch <- dialResponse{conn: new(Conn)}
}
}()
return nil
}

o := new(sync.Once)

return f, func() { o.Do(func() { close(ch) }) }, dialctx, dfcalls
var once sync.Once
return f, func() { once.Do(func() { close(ch) }) }, dialctx, dfcalls
}

func TestBasicDialSync(t *testing.T) {
df, done, _, callsch := getMockDialFunc()

dsync := newDialSync(df)

p := peer.ID("testpeer")

ctx := context.Background()

finished := make(chan struct{})
finished := make(chan struct{}, 2)
go func() {
_, err := dsync.DialLock(ctx, p)
if err != nil {
if _, err := dsync.DialLock(context.Background(), p); err != nil {
t.Error(err)
}
finished <- struct{}{}
}()

go func() {
_, err := dsync.DialLock(ctx, p)
if err != nil {
if _, err := dsync.DialLock(context.Background(), p); err != nil {
t.Error(err)
}
finished <- struct{}{}
Expand Down Expand Up @@ -139,30 +118,26 @@ func TestDialSyncAllCancel(t *testing.T) {
df, done, dctx, _ := getMockDialFunc()

dsync := newDialSync(df)

p := peer.ID("testpeer")

ctx1, cancel1 := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())

finished := make(chan struct{})
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
if _, err := dsync.DialLock(ctx, p); err != ctx.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()

// Add a second dialwait in so two actors are waiting on the same dial
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
if _, err := dsync.DialLock(ctx, p); err != ctx.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()

cancel1()
cancel()
for i := 0; i < 2; i++ {
select {
case <-finished:
Expand All @@ -180,33 +155,27 @@ func TestDialSyncAllCancel(t *testing.T) {

// should be able to successfully dial that peer again
done()
_, err := dsync.DialLock(context.Background(), p)
if err != nil {
if _, err := dsync.DialLock(context.Background(), p); err != nil {
t.Fatal(err)
}
}

func TestFailFirst(t *testing.T) {
var count int32
f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
f := func(p peer.ID, reqch <-chan dialRequest) error {
go func() {
for {
select {
case req, ok := <-reqch:
if !ok {
return
}

if atomic.LoadInt32(&count) > 0 {
req.resch <- dialResponse{conn: new(Conn)}
} else {
req.resch <- dialResponse{err: fmt.Errorf("gophers ate the modem")}
}
atomic.AddInt32(&count, 1)

case <-ctx.Done():
req, ok := <-reqch
if !ok {
return
}

if atomic.LoadInt32(&count) > 0 {
req.resch <- dialResponse{conn: new(Conn)}
} else {
req.resch <- dialResponse{err: fmt.Errorf("gophers ate the modem")}
}
atomic.AddInt32(&count, 1)
}
}()
return nil
Expand Down Expand Up @@ -235,19 +204,14 @@ func TestFailFirst(t *testing.T) {
}

func TestStressActiveDial(t *testing.T) {
ds := newDialSync(func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
ds := newDialSync(func(p peer.ID, reqch <-chan dialRequest) error {
go func() {
for {
select {
case req, ok := <-reqch:
if !ok {
return
}

req.resch <- dialResponse{}
case <-ctx.Done():
req, ok := <-reqch
if !ok {
return
}
req.resch <- dialResponse{}
}
}()
return nil
Expand Down
6 changes: 3 additions & 3 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,16 @@ type dialResponse struct {
}

// startDialWorker starts an active dial goroutine that synchronizes and executes concurrent dials
func (s *Swarm) startDialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
func (s *Swarm) startDialWorker(p peer.ID, reqch <-chan dialRequest) error {
if p == s.local {
return ErrDialToSelf
}

go s.dialWorkerLoop(ctx, p, reqch)
go s.dialWorkerLoop(p, reqch)
return nil
}

func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) {
func (s *Swarm) dialWorkerLoop(p peer.ID, reqch <-chan dialRequest) {
defer s.limiter.clearAllPeerDials(p)

type pendRequest struct {
Expand Down