diff --git a/logstreamer/pool.go b/logstreamer/pool.go index db91a2c..46a96d2 100644 --- a/logstreamer/pool.go +++ b/logstreamer/pool.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "sync" + "time" "github.com/ddosify/alaz/log" ) @@ -20,6 +21,29 @@ type PoolConn struct { tls bool } +func (p *PoolConn) isAlive() bool { + var buf [1]byte + p.SetReadDeadline(time.Now().Add(1 * time.Millisecond)) // Set a very short deadline + _, err := p.Read(buf[:]) + + if err != nil { + if e, ok := err.(net.Error); ok && e.Timeout() { + // Timeout occurred, but connection is still alive + return true + } + // Real error or EOF encountered, connection likely dead + return false + } + + if buf[0] == 'X' { + // close + return false + } + + // Data received (unexpected in send only), process or ignore + return true +} + // Close() puts the given connects back to the pool instead of closing it. func (p *PoolConn) Close() error { p.mu.RLock() @@ -94,7 +118,13 @@ func (c *channelPool) Get() (*PoolConn, error) { return nil, ErrClosed } - return conn, nil + if conn.isAlive() { + return conn, nil + } else { + conn.MarkUnusable() + conn.Close() + return nil, ErrClosed + } default: conn, err := factory() if err != nil {