Skip to content

Commit

Permalink
Merge 3279fb5 into 1a08fb2
Browse files Browse the repository at this point in the history
  • Loading branch information
henrybear327 authored Sep 25, 2024
2 parents 1a08fb2 + 3279fb5 commit 80c1536
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 111 deletions.
64 changes: 2 additions & 62 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ type Server interface {
// Close closes listener and transport.
Close() error

// PauseAccept stops accepting new connections.
PauseAccept()
// UnpauseAccept removes pause operation on accepting new connections.
UnpauseAccept()

// DelayAccept adds latency ± random variable to accepting
// new incoming connections.
DelayAccept(latency, rv time.Duration)
Expand Down Expand Up @@ -115,16 +110,6 @@ type Server interface {
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()

// PauseTx stops "forwarding" packets; "outgoing" traffic blocks.
PauseTx()
// UnpauseTx removes "forwarding" pause operation.
UnpauseTx()

// PauseRx stops "receiving" packets; "incoming" traffic blocks.
PauseRx()
// UnpauseRx removes "receiving" pause operation.
UnpauseRx()

// ResetListener closes and restarts listener.
ResetListener() error
}
Expand Down Expand Up @@ -164,9 +149,6 @@ type server struct {
listenerMu sync.RWMutex
listener net.Listener

pauseAcceptMu sync.Mutex
pauseAcceptc chan struct{}

latencyAcceptMu sync.RWMutex
latencyAccept time.Duration

Expand Down Expand Up @@ -208,9 +190,8 @@ func NewServer(cfg ServerConfig) Server {
donec: make(chan struct{}),
errc: make(chan error, 16),

pauseAcceptc: make(chan struct{}),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),
}

_, fromPort, err := net.SplitHostPort(cfg.From.Host)
Expand All @@ -233,7 +214,6 @@ func NewServer(cfg ServerConfig) Server {
s.retryInterval = defaultRetryInterval
}

close(s.pauseAcceptc)
close(s.pauseTxc)
close(s.pauseRxc)

Expand Down Expand Up @@ -290,15 +270,6 @@ func (s *server) listenAndServe() {
close(s.readyc)

for {
s.pauseAcceptMu.Lock()
pausec := s.pauseAcceptc
s.pauseAcceptMu.Unlock()
select {
case <-pausec:
case <-s.donec:
return
}

s.latencyAcceptMu.RLock()
lat := s.latencyAccept
s.latencyAcceptMu.RUnlock()
Expand Down Expand Up @@ -645,37 +616,6 @@ func (s *server) Close() (err error) {
return err
}

func (s *server) PauseAccept() {
s.pauseAcceptMu.Lock()
s.pauseAcceptc = make(chan struct{})
s.pauseAcceptMu.Unlock()

s.lg.Info(
"paused accept",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) UnpauseAccept() {
s.pauseAcceptMu.Lock()
select {
case <-s.pauseAcceptc: // already unpaused
case <-s.donec:
s.pauseAcceptMu.Unlock()
return
default:
close(s.pauseAcceptc)
}
s.pauseAcceptMu.Unlock()

s.lg.Info(
"unpaused accept",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) DelayAccept(latency, rv time.Duration) {
if latency <= 0 {
return
Expand Down
49 changes: 0 additions & 49 deletions pkg/proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,55 +234,6 @@ func testServerDelayAccept(t *testing.T, secure bool) {
}
}

func TestServer_PauseTx(t *testing.T) {
lg := zaptest.NewLogger(t)
scheme := "unix"
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
defer func() {
os.RemoveAll(srcAddr)
os.RemoveAll(dstAddr)
}()
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
defer ln.Close()

p := NewServer(ServerConfig{
Logger: lg,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
})

waitForServer(t, p)

defer p.Close()

p.PauseTx()

data := []byte("Hello World!")
send(t, data, scheme, srcAddr, transport.TLSInfo{})

recvc := make(chan []byte, 1)
go func() {
recvc <- receive(t, ln)
}()

select {
case d := <-recvc:
t.Fatalf("received unexpected data %q during pause", string(d))
case <-time.After(200 * time.Millisecond):
}

p.UnpauseTx()

select {
case d := <-recvc:
if !bytes.Equal(data, d) {
t.Fatalf("expected %q, got %q", string(data), string(d))
}
case <-time.After(2 * time.Second):
t.Fatal("took too long to receive after unpause")
}
}

func TestServer_ModifyTx_corrupt(t *testing.T) {
lg := zaptest.NewLogger(t)
scheme := "unix"
Expand Down

0 comments on commit 80c1536

Please sign in to comment.