Skip to content

Commit

Permalink
Fix Blackhole implemention for e2e tests
Browse files Browse the repository at this point in the history
Thanks to Fu Wei for the input regarding etcd-io#17938.

[Problem]

A peer will
(a) receive traffic from its peers
(b) initiate connections to its peers (via stream and pipeline).

Thus, the current mechanism of only blocking peer traffic via the peer's existing proxy is insufficient, since only scenario (a) is handled, and scenario (b) is not blocked at all.

[Main idea]

We introduce 1 shared "HTTP proxy" for all peers. All peers will be proxying all the connections through it.

The modified architecture will look something like this:
```
A -- shared HTTP proxy ----- B
     ^ newly introduced
```

By adding this HTTP proxy, we can block all in and out traffic that is initiated from a peer to others, without having to resort to external tools, such as iptables. It's verified that the blocking of traffic is complete, compared to previous solutions [2][3].

[Implementation]

The main subtasks are
- set up an environment variable `FORWARD_PROXY`, because go will not parse HTTP_PROXY and HTTPS_PROXY that is using localhost or 127.0.0.1, regardless if the port is present or not
- implement the shared HTTP proxy by extending the existing proxy server code
- remove existing proxy setup (the per-peer proxy)
- implement enable/disable of the HTTP proxy in the e2e test

[Testing]

- `make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionLeader$ go.etcd.io/etcd/tests/v3/e2e -v -count=1`
- `make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionFollower$ go.etcd.io/etcd/tests/v3/e2e -v -count=1`

[References]
[1] Tracking issue etcd-io#17737
[2] PR (V1) https://github.com/henrybear327/etcd/tree/fix/e2e_blackhole
[3] PR (V2) etcd-io#17891
[4] PR (V3) etcd-io#17938

Signed-off-by: Chun-Hung Tseng <[email protected]>
  • Loading branch information
henrybear327 committed May 6, 2024
1 parent 465b00c commit cc5f759
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 14 deletions.
51 changes: 48 additions & 3 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ type Server interface {
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()

// BlackholePeer drops all traffic coming in and out of the specified peer
BlackholePeer(url.URL)
// UnblackholePeer resumes all traffic coming in and out of the specified peer
UnblackholePeer(url.URL)

// PauseTx stops "forwarding" packets; "outgoing" traffic blocks.
PauseTx()
// UnpauseTx removes "forwarding" pause operation.
Expand Down Expand Up @@ -191,6 +196,11 @@ type server struct {

latencyRxMu sync.RWMutex
latencyRx time.Duration

// TODO: add mutex
blackholeURL map[url.URL]struct{}
// TODO: add mutex, remove entries once the connection is closed
connectionMap map[net.Addr]url.URL
}

// NewServer returns a proxy implementation with no iptables/tc dependencies.
Expand All @@ -217,6 +227,9 @@ func NewServer(cfg ServerConfig) Server {
pauseAcceptc: make(chan struct{}),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),

blackholeURL: make(map[url.URL]struct{}),
connectionMap: make(map[net.Addr]url.URL),
}

_, fromPort, err := net.SplitHostPort(cfg.From.Host)
Expand Down Expand Up @@ -395,6 +408,11 @@ func (s *server) listenAndServe() {
}
connectResponse.Write(in)

// maintain connection mapping
if s.isHTTPProxy {
s.connectionMap[in.RemoteAddr()] = ??? // FIXME: how to know which peer is sending out this traffic?
}

return req.URL.Host
}

Expand Down Expand Up @@ -465,11 +483,11 @@ func (s *server) listenAndServe() {
}
}

func (s *server) transmit(dst io.Writer, src io.Reader) {
func (s *server) transmit(dst, src net.Conn) {
s.ioCopy(dst, src, proxyTx)
}

func (s *server) receive(dst io.Writer, src io.Reader) {
func (s *server) receive(dst, src net.Conn) {
s.ioCopy(dst, src, proxyRx)
}

Expand All @@ -480,7 +498,7 @@ const (
proxyRx
)

func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
func (s *server) ioCopy(dst, src net.Conn, ptype proxyType) {
buf := make([]byte, s.bufferSize)
for {
nr1, err := src.Read(buf)
Expand Down Expand Up @@ -530,6 +548,25 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
default:
panic("unknown proxy type")
}

// blackhole by URL
switch ptype {
case proxyTx:
s.modifyTxMu.RLock()
if s.modifyTx != nil {
data = s.modifyTx(data)
}
s.modifyTxMu.RUnlock()
case proxyRx:
s.modifyRxMu.RLock()
if s.modifyRx != nil {
data = s.modifyRx(data)
}
s.modifyRxMu.RUnlock()
default:
panic("unknown proxy type")
}

nr2 := len(data)
switch ptype {
case proxyTx:
Expand Down Expand Up @@ -954,6 +991,14 @@ func (s *server) UnblackholeRx() {
)
}

func (s *server) BlackholePeer(peerURL url.URL) {
s.blackholeURL[peerURL] = struct{}{}
}

func (s *server) UnblackholePeer(peerURL url.URL) {
delete(s.blackholeURL, peerURL)
}

func (s *server) PauseTx() {
s.pauseTxMu.Lock()
s.pauseTxc = make(chan struct{})
Expand Down
14 changes: 3 additions & 11 deletions tests/e2e/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
e2e.WithSnapshotCount(10),
e2e.WithSnapshotCatchUpEntries(10),
e2e.WithIsPeerTLS(true),
e2e.WithPeerProxy(true),
e2e.WithSingleHTTPProxy(true),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
Expand All @@ -58,13 +58,8 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
}
partitionedMember := epc.Procs[mockPartitionNodeIndex]
// Mock partition
proxy := partitionedMember.PeerProxy()
httpProxy := partitionedMember.PeerHTTPProxy()
t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
httpProxy.BlackholeTx()
httpProxy.BlackholeRx()
epc.SingleHTTPProxyInstance.BlackholePeer(partitionedMember.Config().PeerURL)

t.Logf("Wait 5s for any open connections to expire")
time.Sleep(5 * time.Second)
Expand All @@ -82,10 +77,7 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
// Wait for some time to restore the network
time.Sleep(1 * time.Second)
t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()
httpProxy.UnblackholeTx()
httpProxy.UnblackholeRx()
epc.SingleHTTPProxyInstance.UnblackholePeer(partitionedMember.Config().PeerURL)

leaderEPC = epc.Procs[epc.WaitLeader(t)]
time.Sleep(5 * time.Second)
Expand Down
44 changes: 44 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ type EtcdProcessCluster struct {
Cfg *EtcdProcessClusterConfig
Procs []EtcdProcess
nextSeq int // sequence number of the next etcd process (if it will be required)

SingleHTTPProxyInstance proxy.Server
}

type EtcdProcessClusterConfig struct {
Expand All @@ -144,6 +146,9 @@ type EtcdProcessClusterConfig struct {
LazyFSEnabled bool
PeerProxy bool

SingleHTTPProxy bool
SingleHTTPProxyPort int

// Process config

EnvVars map[string]string
Expand Down Expand Up @@ -184,6 +189,8 @@ func DefaultConfig() *EtcdProcessClusterConfig {
CN: true,

ServerConfig: *embed.NewConfig(),

SingleHTTPProxyPort: 55688,
}
cfg.ServerConfig.InitialClusterToken = "new"
return cfg
Expand Down Expand Up @@ -371,6 +378,10 @@ func WithPeerProxy(enabled bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.PeerProxy = enabled }
}

func WithSingleHTTPProxy(enabled bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.SingleHTTPProxy = enabled }
}

// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new EtcdProcessCluster once all nodes are ready to accept client requests.
func NewEtcdProcessCluster(ctx context.Context, t testing.TB, opts ...EPClusterOption) (*EtcdProcessCluster, error) {
Expand Down Expand Up @@ -438,6 +449,20 @@ func StartEtcdProcessCluster(ctx context.Context, t testing.TB, epc *EtcdProcess
}
}

if cfg.SingleHTTPProxy && epc.SingleHTTPProxyInstance == nil {
cfg.Logger.Info("starting single HTTP proxy...", zap.String("name", cfg.ServerConfig.Name))
epc.SingleHTTPProxyInstance = proxy.NewServer(proxy.ServerConfig{
Logger: zap.NewNop(),
From: url.URL{Scheme: "tcp", Host: fmt.Sprintf("localhost:%d", cfg.SingleHTTPProxyPort)},
IsHTTPProxy: true,
})
select {
case <-epc.SingleHTTPProxyInstance.Ready():
case err := <-epc.SingleHTTPProxyInstance.Error():
return nil, err
}
}

return epc, nil
}

Expand Down Expand Up @@ -501,6 +526,11 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
peerAdvertiseURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
var proxyCfg *proxy.ServerConfig
var httpProxyCfg *proxy.ServerConfig

if cfg.PeerProxy && cfg.SingleHTTPProxy {
panic("Can't only use PeerProxy and SingleHTTPProxy at the same time")
}

if cfg.PeerProxy {
if !cfg.IsPeerTLS {
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
Expand All @@ -524,6 +554,12 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
cfg.EnvVars = make(map[string]string)
}
cfg.EnvVars["FORWARD_PROXY"] = fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort)
} else if cfg.SingleHTTPProxy {
if !cfg.IsPeerTLS {
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
}

cfg.EnvVars["FORWARD_PROXY"] = fmt.Sprintf("http://127.0.0.1:%d", cfg.SingleHTTPProxyPort)
}

name := fmt.Sprintf("%s-test-%d", testNameCleanRegex.ReplaceAllString(tb.Name(), ""), i)
Expand Down Expand Up @@ -986,6 +1022,14 @@ func (epc *EtcdProcessCluster) Close() error {
err = cerr
}
}

if epc.SingleHTTPProxyInstance != nil {
epc.lg.Info("closing single HTTP Proxy...")
if err = epc.SingleHTTPProxyInstance.Close(); err != nil {
return err
}
}

epc.lg.Info("closed test cluster.")
return err
}
Expand Down

0 comments on commit cc5f759

Please sign in to comment.