Skip to content

Commit

Permalink
Remove lock
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Aug 28, 2024
1 parent 729ad10 commit 5cfae4c
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ func newTestHandlerHashring(
}

var (
mu sync.Mutex
closers = make([]func() error, 0)

ag = addrGen{}
Expand Down Expand Up @@ -252,17 +251,12 @@ func newTestHandlerHashring(
srv := NewCapNProtoServer(listener, handler)
client := writecapnp.NewRemoteWriteClient(listener)
peer = newPeerWorker(client, prometheus.NewHistogram(prometheus.HistogramOpts{}), 1)
go func() {
mu.Lock()
closers = append(closers, func() error {
srv.Shutdown()
peer.wp.Close()
return goerrors.Join(listener.Close(), client.Close())
})
mu.Unlock()

_ = srv.ListenAndServe()
}()
closers = append(closers, func() error {
srv.Shutdown()
peer.wp.Close()
return goerrors.Join(listener.Close(), client.Close())
})
go func() { _ = srv.ListenAndServe() }()
fakePeers.clients[endpoint] = peer
} else {
peer = newPeerWorker(&fakeRemoteWriteGRPCServer{h: h}, prometheus.NewHistogram(prometheus.HistogramOpts{}), 1)
Expand Down Expand Up @@ -662,7 +656,7 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist
},
},
} {
t.Run(fmt.Sprintf("%s/%s=%t", tc.name, "capnp-replication", capnpReplication), func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
handlers, hashring, closeFunc, err := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo, capnpReplication)
if err != nil {
t.Fatalf("unable to create test handler: %v", err)
Expand Down Expand Up @@ -743,25 +737,33 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist

func TestReceiveQuorumHashmod(t *testing.T) {
for _, capnpReplication := range []bool{false, true} {
testReceiveQuorum(t, AlgorithmHashmod, false, capnpReplication)
t.Run(fmt.Sprintf("capnproto-replication=%t", capnpReplication), func(t *testing.T) {
testReceiveQuorum(t, AlgorithmHashmod, false, capnpReplication)
})
}
}

func TestReceiveQuorumKetama(t *testing.T) {
for _, capnpReplication := range []bool{false, true} {
testReceiveQuorum(t, AlgorithmKetama, false, capnpReplication)
t.Run(fmt.Sprintf("capnproto-replication=%t", capnpReplication), func(t *testing.T) {
testReceiveQuorum(t, AlgorithmKetama, false, capnpReplication)
})
}
}

func TestReceiveWithConsistencyDelayHashmod(t *testing.T) {
for _, capnpReplication := range []bool{false, true} {
testReceiveQuorum(t, AlgorithmHashmod, true, capnpReplication)
t.Run(fmt.Sprintf("capnproto-replication=%t", capnpReplication), func(t *testing.T) {
testReceiveQuorum(t, AlgorithmHashmod, true, capnpReplication)
})
}
}

func TestReceiveWithConsistencyDelayKetama(t *testing.T) {
for _, capnpReplication := range []bool{false, true} {
testReceiveQuorum(t, AlgorithmKetama, true, capnpReplication)
t.Run(fmt.Sprintf("capnproto-replication=%t", capnpReplication), func(t *testing.T) {
testReceiveQuorum(t, AlgorithmKetama, true, capnpReplication)
})
}
}

Expand Down

0 comments on commit 5cfae4c

Please sign in to comment.