From 08320d31e07a3ee9daf41ba6624fc4a71a5a6123 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 26 Dec 2018 12:14:57 -0700 Subject: [PATCH 01/11] server, tidb-server: improve socket handling Cleanup socket file on exit Handle error of socket file exists Support both socket and tcp via redirection Support case neither socket or tcp specified --- server/server.go | 61 +++++++++++++++++++++++++++++++++++++++++---- server/tidb_test.go | 1 + tidb-server/main.go | 1 + 3 files changed, 58 insertions(+), 5 deletions(-) diff --git a/server/server.go b/server/server.go index 6f52d0ddb2d19..85242044c45b0 100644 --- a/server/server.go +++ b/server/server.go @@ -32,10 +32,12 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "io" "io/ioutil" "math/rand" "net" "net/http" + "os" // For pprof _ "net/http/pprof" "sync" @@ -133,6 +135,37 @@ func (s *Server) isUnixSocket() bool { return s.cfg.Socket != "" } +func (s *Server) forwardUnixSocketToTcp(socket string, addr string) { + if sock, err := net.Listen("unix", socket); err == nil { + log.Infof("Server redirecting [%s] to [%s]", socket, addr) + for { + uconn, err := sock.Accept() + if err != nil { + log.Warningf("server failed to forward from [%s] to [%s], err: %s", socket, addr, err) + continue + } else { + log.Infof("server socket forwarding from [%s] to [%s]", socket, addr) + } + go s.handleForwardedConnection(uconn, addr) + } + } else { + log.Fatalf("err: %s", err) // in use? + } +} + +func (s *Server) handleForwardedConnection(uconn net.Conn, addr string) { + if tconn, err := net.Dial("tcp", addr); err == nil { + if _, err := io.Copy(tconn, uconn); err != nil { + log.Warningf("socket forward copy failed: %s", err) + } + } else { + log.Warningf("socket forward failed: could not connect to [%s], err: %s", addr, err) + } + if err := uconn.Close(); err != nil { + log.Warningf("socket failed to close: %s", err) + } +} + // NewServer creates a new Server. func NewServer(cfg *config.Config, driver IDriver) (*Server, error) { s := &Server{ @@ -151,15 +184,23 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) { } var err error - if cfg.Socket != "" { - if s.listener, err = net.Listen("unix", cfg.Socket); err == nil { - log.Infof("Server is running MySQL Protocol through Socket [%s]", cfg.Socket) - } - } else { + + if s.cfg.Host != "" && s.cfg.Port != 0 { addr := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.Port) if s.listener, err = net.Listen("tcp", addr); err == nil { log.Infof("Server is running MySQL Protocol at [%s]", addr) } + if cfg.Socket != "" { + go s.forwardUnixSocketToTcp(cfg.Socket, addr) // listen on both + } + } else if cfg.Socket != "" { + if s.listener, err = net.Listen("unix", cfg.Socket); err == nil { + log.Infof("Server is running MySQL Protocol through Socket [%s]", cfg.Socket) + } else { + log.Fatalf("err: %s", err) // in use? + } + } else { + log.Fatal("Server not configured to listen on either -socket or -host and -port!") } if cfg.ProxyProtocol.Networks != "" { @@ -405,6 +446,16 @@ func (s *Server) GracefulDown() { } } +// CleanupSocketFile cleans up socket file if it was created. +func (s *Server) CleanupSocketFile() { + if s.cfg.Socket != "" { + log.Infof("[server] removing socket file [%s]", s.cfg.Socket) + if err := os.Remove(s.cfg.Socket); err != nil { + log.Warningf("[server] failed to remove socket file! err: %s", err) + } + } +} + func (s *Server) kickIdleConnection() { var conns []*clientConn s.rwlock.RLock() diff --git a/server/tidb_test.go b/server/tidb_test.go index 2a7a18264f15e..c9d4625c8ca0f 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -163,6 +163,7 @@ func (ts *TidbTestSuite) TestMultiStatements(c *C) { func (ts *TidbTestSuite) TestSocket(c *C) { cfg := config.NewConfig() cfg.Socket = "/tmp/tidbtest.sock" + os.Remove(cfg.Socket) cfg.Status.ReportStatus = false server, err := NewServer(cfg, ts.tidbdrv) diff --git a/tidb-server/main.go b/tidb-server/main.go index 34b98aa59e7cf..869df44ca5e75 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -548,4 +548,5 @@ func cleanup() { svr.KillAllConnections() } closeDomainAndStorage() + svr.CleanupSocketFile() } From 4f956a9586c3fd7b08b9d6853c6b20bc53fee95b Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 26 Dec 2018 12:35:37 -0700 Subject: [PATCH 02/11] server: fixed linter, fixed test to use sock again --- server/server.go | 14 ++++++-------- server/tidb_test.go | 2 ++ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/server.go b/server/server.go index 85242044c45b0..d861dbddcc623 100644 --- a/server/server.go +++ b/server/server.go @@ -135,18 +135,16 @@ func (s *Server) isUnixSocket() bool { return s.cfg.Socket != "" } -func (s *Server) forwardUnixSocketToTcp(socket string, addr string) { +func (s *Server) forwardUnixSocketToTCP(socket string, addr string) { if sock, err := net.Listen("unix", socket); err == nil { log.Infof("Server redirecting [%s] to [%s]", socket, addr) for { - uconn, err := sock.Accept() - if err != nil { - log.Warningf("server failed to forward from [%s] to [%s], err: %s", socket, addr, err) - continue - } else { + if uconn, err := sock.Accept(); err == nil { log.Infof("server socket forwarding from [%s] to [%s]", socket, addr) + go s.handleForwardedConnection(uconn, addr) + } else { + log.Warningf("server failed to forward from [%s] to [%s], err: %s", socket, addr, err) } - go s.handleForwardedConnection(uconn, addr) } } else { log.Fatalf("err: %s", err) // in use? @@ -191,7 +189,7 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) { log.Infof("Server is running MySQL Protocol at [%s]", addr) } if cfg.Socket != "" { - go s.forwardUnixSocketToTcp(cfg.Socket, addr) // listen on both + go s.forwardUnixSocketToTCP(cfg.Socket, addr) // listen on both } } else if cfg.Socket != "" { if s.listener, err = net.Listen("unix", cfg.Socket); err == nil { diff --git a/server/tidb_test.go b/server/tidb_test.go index c9d4625c8ca0f..b399aac5ceff0 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -163,6 +163,8 @@ func (ts *TidbTestSuite) TestMultiStatements(c *C) { func (ts *TidbTestSuite) TestSocket(c *C) { cfg := config.NewConfig() cfg.Socket = "/tmp/tidbtest.sock" + cfg.Host = "" + cfg.Port = 0 os.Remove(cfg.Socket) cfg.Status.ReportStatus = false From aaf71deae2ea3aaf5b4695aefc55aeed44000714 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Thu, 27 Dec 2018 18:31:57 -0700 Subject: [PATCH 03/11] Addressed PR feedback --- server/server.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/server/server.go b/server/server.go index d861dbddcc623..8a8bb43b16568 100644 --- a/server/server.go +++ b/server/server.go @@ -143,7 +143,7 @@ func (s *Server) forwardUnixSocketToTCP(socket string, addr string) { log.Infof("server socket forwarding from [%s] to [%s]", socket, addr) go s.handleForwardedConnection(uconn, addr) } else { - log.Warningf("server failed to forward from [%s] to [%s], err: %s", socket, addr, err) + log.Errorf("server failed to forward from [%s] to [%s], err: %s", socket, addr, err) } } } else { @@ -187,18 +187,16 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) { addr := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.Port) if s.listener, err = net.Listen("tcp", addr); err == nil { log.Infof("Server is running MySQL Protocol at [%s]", addr) - } - if cfg.Socket != "" { - go s.forwardUnixSocketToTCP(cfg.Socket, addr) // listen on both + if cfg.Socket != "" { + go s.forwardUnixSocketToTCP(cfg.Socket, addr) // listen on both + } } } else if cfg.Socket != "" { if s.listener, err = net.Listen("unix", cfg.Socket); err == nil { log.Infof("Server is running MySQL Protocol through Socket [%s]", cfg.Socket) - } else { - log.Fatalf("err: %s", err) // in use? } } else { - log.Fatal("Server not configured to listen on either -socket or -host and -port!") + err = errors.New("Server not configured to listen on either -socket or -host and -port") } if cfg.ProxyProtocol.Networks != "" { @@ -449,7 +447,7 @@ func (s *Server) CleanupSocketFile() { if s.cfg.Socket != "" { log.Infof("[server] removing socket file [%s]", s.cfg.Socket) if err := os.Remove(s.cfg.Socket); err != nil { - log.Warningf("[server] failed to remove socket file! err: %s", err) + log.Errorf("[server] failed to remove socket file! err: %s", err) } } } @@ -468,7 +466,7 @@ func (s *Server) kickIdleConnection() { for _, cc := range conns { err := cc.Close() if err != nil { - log.Error("close connection error:", err) + log.Errorf("close connection error: %s", err) } } } From af05e8fc914950086791f55679e79d16a42b7865 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Thu, 27 Dec 2018 20:07:33 -0700 Subject: [PATCH 04/11] addressed pr feedback --- server/server.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/server/server.go b/server/server.go index 8a8bb43b16568..f5b5b5f852a2d 100644 --- a/server/server.go +++ b/server/server.go @@ -82,6 +82,7 @@ type Server struct { tlsConfig *tls.Config driver IDriver listener net.Listener + socket net.Listener rwlock *sync.RWMutex concurrentLimiter *TokenLimiter clients map[uint32]*clientConn @@ -135,15 +136,17 @@ func (s *Server) isUnixSocket() bool { return s.cfg.Socket != "" } -func (s *Server) forwardUnixSocketToTCP(socket string, addr string) { - if sock, err := net.Listen("unix", socket); err == nil { - log.Infof("Server redirecting [%s] to [%s]", socket, addr) +func (s *Server) forwardUnixSocketToTCP() { + addr := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.Port) + var err error + if s.socket, err = net.Listen("unix", s.cfg.Socket); err == nil { + log.Infof("Server redirecting [%s] to [%s]", s.cfg.Socket, addr) for { - if uconn, err := sock.Accept(); err == nil { - log.Infof("server socket forwarding from [%s] to [%s]", socket, addr) + if uconn, err := s.socket.Accept(); err == nil { + log.Infof("server socket forwarding from [%s] to [%s]", s.cfg.Socket, addr) go s.handleForwardedConnection(uconn, addr) } else { - log.Errorf("server failed to forward from [%s] to [%s], err: %s", socket, addr, err) + log.Errorf("server failed to forward from [%s] to [%s], err: %s", s.cfg.Socket, addr, err) } } } else { @@ -152,6 +155,7 @@ func (s *Server) forwardUnixSocketToTCP(socket string, addr string) { } func (s *Server) handleForwardedConnection(uconn net.Conn, addr string) { + defer terror.Call(uconn.Close) if tconn, err := net.Dial("tcp", addr); err == nil { if _, err := io.Copy(tconn, uconn); err != nil { log.Warningf("socket forward copy failed: %s", err) @@ -159,9 +163,6 @@ func (s *Server) handleForwardedConnection(uconn net.Conn, addr string) { } else { log.Warningf("socket forward failed: could not connect to [%s], err: %s", addr, err) } - if err := uconn.Close(); err != nil { - log.Warningf("socket failed to close: %s", err) - } } // NewServer creates a new Server. @@ -188,7 +189,7 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) { if s.listener, err = net.Listen("tcp", addr); err == nil { log.Infof("Server is running MySQL Protocol at [%s]", addr) if cfg.Socket != "" { - go s.forwardUnixSocketToTCP(cfg.Socket, addr) // listen on both + go s.forwardUnixSocketToTCP() // listen on both } } } else if cfg.Socket != "" { From dadc8d5670499c0e1a09d60a6ce76865dfdc8d43 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sat, 29 Dec 2018 12:02:35 -0700 Subject: [PATCH 05/11] address pr feedback --- server/server.go | 43 ++++++++++++++++++++----------------------- tidb-server/main.go | 1 - 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/server/server.go b/server/server.go index f5b5b5f852a2d..fab9f3bd65dfb 100644 --- a/server/server.go +++ b/server/server.go @@ -138,19 +138,13 @@ func (s *Server) isUnixSocket() bool { func (s *Server) forwardUnixSocketToTCP() { addr := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.Port) - var err error - if s.socket, err = net.Listen("unix", s.cfg.Socket); err == nil { - log.Infof("Server redirecting [%s] to [%s]", s.cfg.Socket, addr) - for { - if uconn, err := s.socket.Accept(); err == nil { - log.Infof("server socket forwarding from [%s] to [%s]", s.cfg.Socket, addr) - go s.handleForwardedConnection(uconn, addr) - } else { - log.Errorf("server failed to forward from [%s] to [%s], err: %s", s.cfg.Socket, addr, err) - } + for { + if uconn, err := s.socket.Accept(); err == nil { + log.Infof("server socket forwarding from [%s] to [%s]", s.cfg.Socket, addr) + go s.handleForwardedConnection(uconn, addr) + } else { + log.Errorf("server failed to forward from [%s] to [%s], err: %s", s.cfg.Socket, addr, err) } - } else { - log.Fatalf("err: %s", err) // in use? } } @@ -189,7 +183,10 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) { if s.listener, err = net.Listen("tcp", addr); err == nil { log.Infof("Server is running MySQL Protocol at [%s]", addr) if cfg.Socket != "" { - go s.forwardUnixSocketToTCP() // listen on both + if s.socket, err = net.Listen("unix", s.cfg.Socket); err == nil { + log.Infof("Server redirecting [%s] to [%s]", s.cfg.Socket, addr) + go s.forwardUnixSocketToTCP() + } } } } else if cfg.Socket != "" { @@ -330,11 +327,21 @@ func (s *Server) Close() { terror.Log(errors.Trace(err)) s.listener = nil } + if s.socket != nil { + err := s.socket.Close() + terror.Log(errors.Trace(err)) + } if s.statusServer != nil { err := s.statusServer.Close() terror.Log(errors.Trace(err)) s.statusServer = nil } + if s.cfg.Socket != "" { + log.Infof("[server] removing socket file [%s]", s.cfg.Socket) + if err := os.Remove(s.cfg.Socket); err != nil { + log.Errorf("[server] failed to remove socket file! err: %s", err) + } + } metrics.ServerEventCounter.WithLabelValues(metrics.EventClose).Inc() } @@ -443,16 +450,6 @@ func (s *Server) GracefulDown() { } } -// CleanupSocketFile cleans up socket file if it was created. -func (s *Server) CleanupSocketFile() { - if s.cfg.Socket != "" { - log.Infof("[server] removing socket file [%s]", s.cfg.Socket) - if err := os.Remove(s.cfg.Socket); err != nil { - log.Errorf("[server] failed to remove socket file! err: %s", err) - } - } -} - func (s *Server) kickIdleConnection() { var conns []*clientConn s.rwlock.RLock() diff --git a/tidb-server/main.go b/tidb-server/main.go index 869df44ca5e75..34b98aa59e7cf 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -548,5 +548,4 @@ func cleanup() { svr.KillAllConnections() } closeDomainAndStorage() - svr.CleanupSocketFile() } From 46d852367127022e495b09b991a806aaf5398ce7 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sat, 29 Dec 2018 12:19:48 -0700 Subject: [PATCH 06/11] Cleanup code --- server/server.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/server/server.go b/server/server.go index fab9f3bd65dfb..cd7b30b1aaf6b 100644 --- a/server/server.go +++ b/server/server.go @@ -37,7 +37,6 @@ import ( "math/rand" "net" "net/http" - "os" // For pprof _ "net/http/pprof" "sync" @@ -139,11 +138,16 @@ func (s *Server) isUnixSocket() bool { func (s *Server) forwardUnixSocketToTCP() { addr := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.Port) for { + if s.listener == nil { + return // server shutdown has started + } if uconn, err := s.socket.Accept(); err == nil { log.Infof("server socket forwarding from [%s] to [%s]", s.cfg.Socket, addr) go s.handleForwardedConnection(uconn, addr) } else { - log.Errorf("server failed to forward from [%s] to [%s], err: %s", s.cfg.Socket, addr, err) + if s.listener != nil { + log.Errorf("server failed to forward from [%s] to [%s], err: %s", s.cfg.Socket, addr, err) + } } } } @@ -336,12 +340,6 @@ func (s *Server) Close() { terror.Log(errors.Trace(err)) s.statusServer = nil } - if s.cfg.Socket != "" { - log.Infof("[server] removing socket file [%s]", s.cfg.Socket) - if err := os.Remove(s.cfg.Socket); err != nil { - log.Errorf("[server] failed to remove socket file! err: %s", err) - } - } metrics.ServerEventCounter.WithLabelValues(metrics.EventClose).Inc() } From c8f888f62a01b58a080dfff5329c104a0858b357 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sat, 29 Dec 2018 12:26:59 -0700 Subject: [PATCH 07/11] fixed bug: copy data in reverse direction --- server/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/server.go b/server/server.go index cd7b30b1aaf6b..1ec84c481702d 100644 --- a/server/server.go +++ b/server/server.go @@ -155,6 +155,7 @@ func (s *Server) forwardUnixSocketToTCP() { func (s *Server) handleForwardedConnection(uconn net.Conn, addr string) { defer terror.Call(uconn.Close) if tconn, err := net.Dial("tcp", addr); err == nil { + go io.Copy(uconn, tconn) if _, err := io.Copy(tconn, uconn); err != nil { log.Warningf("socket forward copy failed: %s", err) } From bb0e7a18f63b12b167eb1c5ba58bcf4cb6ea7a1b Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sat, 29 Dec 2018 12:37:17 -0700 Subject: [PATCH 08/11] catch error --- server/server.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index 1ec84c481702d..e3b37c1ee6320 100644 --- a/server/server.go +++ b/server/server.go @@ -155,7 +155,11 @@ func (s *Server) forwardUnixSocketToTCP() { func (s *Server) handleForwardedConnection(uconn net.Conn, addr string) { defer terror.Call(uconn.Close) if tconn, err := net.Dial("tcp", addr); err == nil { - go io.Copy(uconn, tconn) + go func() { + if _, err := io.Copy(uconn, tconn); err != nil { + log.Warningf("copy server to socket failed: %s", err) + } + }() if _, err := io.Copy(tconn, uconn); err != nil { log.Warningf("socket forward copy failed: %s", err) } From cedbc0d23150df9f416b57eb75ab72dd7b6708a1 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Thu, 3 Jan 2019 09:26:05 -0700 Subject: [PATCH 09/11] Addressed PR feedback --- server/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/server.go b/server/server.go index e3b37c1ee6320..608f387004635 100644 --- a/server/server.go +++ b/server/server.go @@ -339,6 +339,7 @@ func (s *Server) Close() { if s.socket != nil { err := s.socket.Close() terror.Log(errors.Trace(err)) + s.socket = nil } if s.statusServer != nil { err := s.statusServer.Close() From 4a2f0ed2495d5a5bca0fd02c2e4b764dc7301ffa Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Fri, 4 Jan 2019 19:56:18 -0700 Subject: [PATCH 10/11] Setup listening on port 3999 for test coverage --- server/tidb_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/tidb_test.go b/server/tidb_test.go index b399aac5ceff0..2fd68a67e4c9f 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -163,8 +163,7 @@ func (ts *TidbTestSuite) TestMultiStatements(c *C) { func (ts *TidbTestSuite) TestSocket(c *C) { cfg := config.NewConfig() cfg.Socket = "/tmp/tidbtest.sock" - cfg.Host = "" - cfg.Port = 0 + cfg.Port = 3999 os.Remove(cfg.Socket) cfg.Status.ReportStatus = false From 06aa60ccb894790f5be6354eee68c84df6e77499 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Fri, 4 Jan 2019 20:01:19 -0700 Subject: [PATCH 11/11] improve test coverage --- server/tidb_test.go | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/server/tidb_test.go b/server/tidb_test.go index 2fd68a67e4c9f..200263caf5931 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -160,7 +160,7 @@ func (ts *TidbTestSuite) TestMultiStatements(c *C) { runTestMultiStatements(c) } -func (ts *TidbTestSuite) TestSocket(c *C) { +func (ts *TidbTestSuite) TestSocketForwarding(c *C) { cfg := config.NewConfig() cfg.Socket = "/tmp/tidbtest.sock" cfg.Port = 3999 @@ -182,6 +182,30 @@ func (ts *TidbTestSuite) TestSocket(c *C) { }, "SocketRegression") } +func (ts *TidbTestSuite) TestSocket(c *C) { + cfg := config.NewConfig() + cfg.Socket = "/tmp/tidbtest.sock" + cfg.Port = 0 + os.Remove(cfg.Socket) + cfg.Host = "" + cfg.Status.ReportStatus = false + + server, err := NewServer(cfg, ts.tidbdrv) + c.Assert(err, IsNil) + go server.Run() + time.Sleep(time.Millisecond * 100) + defer server.Close() + + runTestRegression(c, func(config *mysql.Config) { + config.User = "root" + config.Net = "unix" + config.Addr = "/tmp/tidbtest.sock" + config.DBName = "test" + config.Strict = true + }, "SocketRegression") + +} + // generateCert generates a private key and a certificate in PEM format based on parameters. // If parentCert and parentCertKey is specified, the new certificate will be signed by the parentCert. // Otherwise, the new certificate will be self-signed and is a CA.