Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Fix Blackhole implementation for e2e tests #17950

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion client/pkg/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
)
Expand All @@ -31,7 +33,17 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er
}

t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Proxy: func(req *http.Request) (*url.URL, error) {
// according to the comment of http.ProxyFromEnvironment: if the
// proxy URL is "localhost" (with or without a port number),
// then a nil URL and nil error will be returned.
// Thus, we need to workaround this by manually setting an
// ENV named FORWARD_PROXY and parse the URL (which is a localhost in our case)
if httpProxy, exists := os.LookupEnv("FORWARD_PROXY"); exists {
return url.Parse(httpProxy)
}
return http.ProxyFromEnvironment(req)
},
DialContext: (&net.Dialer{
Timeout: dialtimeoutd,
// value taken from http.DefaultTransport
Expand Down
175 changes: 163 additions & 12 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
package proxy

import (
"bufio"
"bytes"
"context"
"encoding/base64"
"fmt"
"io"
mrand "math/rand"
Expand Down Expand Up @@ -114,6 +117,11 @@ type Server interface {
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()

// BlackholePeer drops all traffic coming in and out of the specified peer
BlackholePeer(url.URL)
// UnblackholePeer resumes all traffic coming in and out of the specified peer
UnblackholePeer(url.URL)

// PauseTx stops "forwarding" packets; "outgoing" traffic blocks.
PauseTx()
// UnpauseTx removes "forwarding" pause operation.
Expand All @@ -137,11 +145,14 @@ type ServerConfig struct {
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration
IsHTTPProxy bool
}

type server struct {
lg *zap.Logger

isHTTPProxy bool

from url.URL
fromPort int
to url.URL
Expand Down Expand Up @@ -186,6 +197,11 @@ type server struct {

latencyRxMu sync.RWMutex
latencyRx time.Duration

blackholeURL map[string]struct{}
blackholeURLMu sync.RWMutex
connectionMap map[string]string
connectionMapMu sync.RWMutex
}

// NewServer returns a proxy implementation with no iptables/tc dependencies.
Expand All @@ -194,6 +210,8 @@ func NewServer(cfg ServerConfig) Server {
s := &server{
lg: cfg.Logger,

isHTTPProxy: cfg.IsHTTPProxy,

from: cfg.From,
to: cfg.To,

Expand All @@ -210,16 +228,21 @@ func NewServer(cfg ServerConfig) Server {
pauseAcceptc: make(chan struct{}),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),

blackholeURL: make(map[string]struct{}),
connectionMap: make(map[string]string),
}

_, fromPort, err := net.SplitHostPort(cfg.From.Host)
if err == nil {
s.fromPort, _ = strconv.Atoi(fromPort)
}
var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
s.toPort, _ = strconv.Atoi(toPort)
if !s.isHTTPProxy {
var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
s.toPort, _ = strconv.Atoi(toPort)
}
}

if s.dialTimeout == 0 {
Expand All @@ -239,8 +262,10 @@ func NewServer(cfg ServerConfig) Server {
if strings.HasPrefix(s.from.Scheme, "http") {
s.from.Scheme = "tcp"
}
if strings.HasPrefix(s.to.Scheme, "http") {
s.to.Scheme = "tcp"
if !s.isHTTPProxy {
if strings.HasPrefix(s.to.Scheme, "http") {
s.to.Scheme = "tcp"
}
}

addr := fmt.Sprintf(":%d", s.fromPort)
Expand Down Expand Up @@ -273,7 +298,10 @@ func (s *server) From() string {
}

func (s *server) To() string {
return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host)
if !s.isHTTPProxy {
return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host)
}
return ""
}

// TODO: implement packet reordering from multiple TCP connections
Expand Down Expand Up @@ -353,6 +381,62 @@ func (s *server) listenAndServe() {
continue
}

parseHeaderForDestination := func() string {
// the first request should always contain a CONNECT header field
// since we set the transport to forward the traffic to the proxy
buf := make([]byte, s.bufferSize)
var data []byte
if nr1, err := in.Read(buf); err != nil {
if err == io.EOF {
panic("No data available for http proxy to work on")
}
} else {
data = buf[:nr1]
}

// attempt to parse for the HOST from the CONNECT request
var req *http.Request
if req, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(data))); err != nil {
panic("Failed to parse header in http proxy")
}

if req.Method == http.MethodConnect {
// make sure a reply is sent back to the client
connectResponse := &http.Response{
StatusCode: 200,
ProtoMajor: 1,
ProtoMinor: 1,
}
connectResponse.Write(in)

// maintain connection mapping
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to use one pull request so that reviewers can read all the context about the change.

Just think about that it's necessary to use single one http proxy here?

What about this?

Assuming that each member has their own HTTP proxy server to forward traffic to other members.
There is no any proxy server to listen on advertise URL.

Let's say that there're 3 members in cluster and they're named by A, B and C.
If we want to block any traffic between A and others, based on your interface, we can just call.

procA.Proxy.BlackholePeer(procB.PeerURL) // block traffic from A to B 
procA.Proxy.BlackholePeer(procC.PeerURL) // block traffic from A to C
procB.Proxy.BlackholePeer(procA.PeerURL) // block traffic from B to A
procC.Proxy.BlackholePeer(procA.PeerURL) // block traffic from C to A

To be honest, using Proxy-Authorization to carry port information looks weird to me.
So, IMO, setup each HTTP proxy is like sidecar and more easier to understand and control source and destination of traffic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this draft PR is a bit extreme 😅
Let me revise a bit to your proposed architecture!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fuweid an attempt is made #17985

// we use the Proxy-Authorization field to identify the sender
// we map the randomly selected port to the port that the peer is listening on
proxyAuthString := req.Header.Get("Proxy-Authorization")
proxyAuthString = strings.ReplaceAll(proxyAuthString, "Basic ", "")
payload, err := base64.StdEncoding.DecodeString(proxyAuthString)
if err != nil {
panic("Invalid Proxy-Authorization data")
}
payloadSplit := strings.Split(string(payload), ":")
if len(payloadSplit) != 2 {
panic("Wrong Proxy-Authorization format")
}
sourcePort := payloadSplit[0]
s.connectionMapMu.Lock()
_, extractedPort, err := net.SplitHostPort(in.RemoteAddr().String())
if err != nil {
panic("Failed to parse port")
}
s.connectionMap[extractedPort] = sourcePort
s.connectionMapMu.Unlock()

return req.URL.Host
}

panic("Wrong header type to start the connection")
}

var out net.Conn
if !s.tlsInfo.Empty() {
var tp *http.Transport
Expand All @@ -370,9 +454,19 @@ func (s *server) listenAndServe() {
}
continue
}
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
if s.isHTTPProxy {
dest := parseHeaderForDestination()
out, err = tp.DialContext(ctx, "tcp", dest)
} else {
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
}
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
if s.isHTTPProxy {
dest := parseHeaderForDestination()
out, err = net.Dial("tcp", dest)
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
}
}
if err != nil {
select {
Expand All @@ -396,6 +490,16 @@ func (s *server) listenAndServe() {
s.transmit(out, in)
out.Close()
in.Close()

s.connectionMapMu.Lock()
_, extractedPort, err := net.SplitHostPort(in.RemoteAddr().String())
if err != nil {
panic("Failed to parse port")
}
if sourcePeer, ok := s.connectionMap[extractedPort]; ok {
delete(s.connectionMap, sourcePeer)
}
s.connectionMapMu.Unlock()
}()
go func() {
defer s.closeWg.Done()
Expand All @@ -407,11 +511,11 @@ func (s *server) listenAndServe() {
}
}

func (s *server) transmit(dst io.Writer, src io.Reader) {
func (s *server) transmit(dst, src net.Conn) {
s.ioCopy(dst, src, proxyTx)
}

func (s *server) receive(dst io.Writer, src io.Reader) {
func (s *server) receive(dst, src net.Conn) {
s.ioCopy(dst, src, proxyRx)
}

Expand All @@ -422,7 +526,7 @@ const (
proxyRx
)

func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
func (s *server) ioCopy(dst, src net.Conn, ptype proxyType) {
buf := make([]byte, s.bufferSize)
for {
nr1, err := src.Read(buf)
Expand Down Expand Up @@ -472,6 +576,41 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
default:
panic("unknown proxy type")
}

// blackhole by URL
switch ptype {
case proxyTx:
s.connectionMapMu.RLock()
_, extractedPort, err := net.SplitHostPort(src.RemoteAddr().String())
if err != nil {
panic("Failed to parse port")
}
if sourcePeer, ok := s.connectionMap[extractedPort]; ok {
s.blackholeURLMu.RLock()
if _, ok := s.blackholeURL[sourcePeer]; ok {
data = nil
}
s.blackholeURLMu.RUnlock()
}
s.connectionMapMu.RUnlock()
case proxyRx:
s.connectionMapMu.RLock()
_, extractedPort, err := net.SplitHostPort(dst.RemoteAddr().String())
if err != nil {
panic("Failed to parse port")
}
if sourcePeer, ok := s.connectionMap[extractedPort]; ok {
s.blackholeURLMu.RLock()
if _, ok := s.blackholeURL[sourcePeer]; ok {
data = nil
}
s.blackholeURLMu.RUnlock()
}
s.connectionMapMu.RUnlock()
default:
panic("unknown proxy type")
}

nr2 := len(data)
switch ptype {
case proxyTx:
Expand Down Expand Up @@ -896,6 +1035,18 @@ func (s *server) UnblackholeRx() {
)
}

func (s *server) BlackholePeer(peerURL url.URL) {
s.blackholeURLMu.Lock()
s.blackholeURL[peerURL.Port()] = struct{}{}
s.blackholeURLMu.Unlock()
}

func (s *server) UnblackholePeer(peerURL url.URL) {
s.blackholeURLMu.Lock()
delete(s.blackholeURL, peerURL.Port())
s.blackholeURLMu.Unlock()
}

func (s *server) PauseTx() {
s.pauseTxMu.Lock()
s.pauseTxc = make(chan struct{})
Expand Down
Loading
Loading