Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
remove unused context in Swarm.dialWorkerLoop
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Aug 23, 2021
1 parent fa91592 commit 21500a3
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 62 deletions.
5 changes: 2 additions & 3 deletions dial_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// DialWorerFunc is used by DialSync to spawn a new dial worker
type dialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) error
type dialWorkerFunc func(peer.ID, <-chan dialRequest) error

// newDialSync constructs a new DialSync
func newDialSync(worker dialWorkerFunc) *DialSync {
Expand Down Expand Up @@ -94,8 +94,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) {
ds: ds,
}

err := ds.dialWorker(adctx, p, actd.reqch)
if err != nil {
if err := ds.dialWorker(p, actd.reqch); err != nil {
cancel()
return nil, err
}
Expand Down
80 changes: 24 additions & 56 deletions dial_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,60 +13,38 @@ import (

func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{}) {
dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care
dialctx, cancel := context.WithCancel(context.Background())
// dialctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
f := func(p peer.ID, reqch <-chan dialRequest) error {
dfcalls <- struct{}{}
go func() {
defer cancel()
for {
select {
case req, ok := <-reqch:
if !ok {
return
}

select {
case <-ch:
req.resch <- dialResponse{conn: new(Conn)}
case <-ctx.Done():
req.resch <- dialResponse{err: ctx.Err()}
return
}
case <-ctx.Done():
return
}
for req := range reqch {
<-ch
req.resch <- dialResponse{conn: new(Conn)}
}
}()
return nil
}

o := new(sync.Once)

return f, func() { o.Do(func() { close(ch) }) }, dialctx, dfcalls
var once sync.Once
return f, func() { once.Do(func() { close(ch) }) }, context.Background(), dfcalls
}

func TestBasicDialSync(t *testing.T) {
df, done, _, callsch := getMockDialFunc()

dsync := newDialSync(df)

p := peer.ID("testpeer")

ctx := context.Background()

finished := make(chan struct{})
finished := make(chan struct{}, 2)
go func() {
_, err := dsync.DialLock(ctx, p)
if err != nil {
if _, err := dsync.DialLock(context.Background(), p); err != nil {
t.Error(err)
}
finished <- struct{}{}
}()

go func() {
_, err := dsync.DialLock(ctx, p)
if err != nil {
if _, err := dsync.DialLock(context.Background(), p); err != nil {
t.Error(err)
}
finished <- struct{}{}
Expand Down Expand Up @@ -188,25 +166,20 @@ func TestDialSyncAllCancel(t *testing.T) {

func TestFailFirst(t *testing.T) {
var count int32
f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
f := func(p peer.ID, reqch <-chan dialRequest) error {
go func() {
for {
select {
case req, ok := <-reqch:
if !ok {
return
}

if atomic.LoadInt32(&count) > 0 {
req.resch <- dialResponse{conn: new(Conn)}
} else {
req.resch <- dialResponse{err: fmt.Errorf("gophers ate the modem")}
}
atomic.AddInt32(&count, 1)

case <-ctx.Done():
req, ok := <-reqch
if !ok {
return
}

if atomic.LoadInt32(&count) > 0 {
req.resch <- dialResponse{conn: new(Conn)}
} else {
req.resch <- dialResponse{err: fmt.Errorf("gophers ate the modem")}
}
atomic.AddInt32(&count, 1)
}
}()
return nil
Expand Down Expand Up @@ -235,19 +208,14 @@ func TestFailFirst(t *testing.T) {
}

func TestStressActiveDial(t *testing.T) {
ds := newDialSync(func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
ds := newDialSync(func(p peer.ID, reqch <-chan dialRequest) error {
go func() {
for {
select {
case req, ok := <-reqch:
if !ok {
return
}

req.resch <- dialResponse{}
case <-ctx.Done():
req, ok := <-reqch
if !ok {
return
}
req.resch <- dialResponse{}
}
}()
return nil
Expand Down
6 changes: 3 additions & 3 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,16 @@ type dialResponse struct {
}

// startDialWorker starts an active dial goroutine that synchronizes and executes concurrent dials
func (s *Swarm) startDialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
func (s *Swarm) startDialWorker(p peer.ID, reqch <-chan dialRequest) error {
if p == s.local {
return ErrDialToSelf
}

go s.dialWorkerLoop(ctx, p, reqch)
go s.dialWorkerLoop(p, reqch)
return nil
}

func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) {
func (s *Swarm) dialWorkerLoop(p peer.ID, reqch <-chan dialRequest) {
defer s.limiter.clearAllPeerDials(p)

type pendRequest struct {
Expand Down

0 comments on commit 21500a3

Please sign in to comment.