Skip to content

Commit

Permalink
Add option to enabled websocket keepalive pinging (grpc#546)
Browse files Browse the repository at this point in the history
* 增加websocket超时发送ping,以适配nginx的websocket代理

* Update go/grpcweb/websocket_wrapper.go

Co-Authored-By: Johan Brandhorst <[email protected]>

* Update go/grpcweb/websocket_wrapper.go

Co-Authored-By: Johan Brandhorst <[email protected]>

* Update go/grpcweb/websocket_wrapper.go

Co-Authored-By: Johan Brandhorst <[email protected]>

* 添加使能websocket超时ping函数,tickerCount增加互斥锁

* 调整互斥锁解锁位置

* 简单处理,使代码更易读

* 增加配置websocket发送ping的超时时间的功能

* 增加配置websocket发送ping的超时时间的功能

* 1.将wsConn.SetCloseHandler移到了EnablePing中;
2.使用timer.Reset()代替tickerCount计时与复位;
3.其他设置上的提示优化;

* 将timer改为ticker

* use logrus.Infof to get string interpolation

* 使用第三方timer包实现精确的超时间隔发送Ping

* Update go/grpcweb/options.go

Co-Authored-By: Johan Brandhorst <[email protected]>

* Update the document

* use godocdown to generate the latest documentation
  • Loading branch information
angwangiot authored and johanbrandhorst committed Sep 18, 2019
1 parent 67a6f66 commit 4529aac
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 6 deletions.
10 changes: 10 additions & 0 deletions go/grpcweb/DOC.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ requests - usually to check that the origin is valid.
The default behaviour is to check that the origin of the request matches the
host of the request and deny all requests from remote origins.

#### func WithWebsocketPingInterval

```go
func WithWebsocketPingInterval(websocketPingInterval time.Duration) Option
```
WithWebsocketPingInterval enables websocket keepalive pinging with the
configured timeout.

The default behaviour is to disable websocket pinging.

#### func WithWebsockets

```go
Expand Down
15 changes: 14 additions & 1 deletion go/grpcweb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

package grpcweb

import "net/http"
import (
"net/http"
"time"
)

var (
defaultOptions = &options{
Expand All @@ -19,6 +22,7 @@ type options struct {
corsForRegisteredEndpointsOnly bool
originFunc func(origin string) bool
enableWebsockets bool
websocketPingInterval time.Duration
websocketOriginFunc func(req *http.Request) bool
allowNonRootResources bool
}
Expand Down Expand Up @@ -92,6 +96,15 @@ func WithWebsockets(enableWebsockets bool) Option {
}
}

// WithWebsocketPingInterval enables websocket keepalive pinging with the configured timeout.
//
// The default behaviour is to disable websocket pinging.
func WithWebsocketPingInterval(websocketPingInterval time.Duration) Option {
return func(o *options) {
o.websocketPingInterval = websocketPingInterval
}
}

// WithWebsocketOriginFunc allows for customizing the acceptance of Websocket requests - usually to check that the origin
// is valid.
//
Expand Down
42 changes: 38 additions & 4 deletions go/grpcweb/websocket_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ import (
"net/http"
"net/textproto"
"strings"
"time"

"github.com/desertbit/timer"
"github.com/gorilla/websocket"
"golang.org/x/net/http2"
)

type webSocketResponseWriter struct {
writtenHeaders bool
wsConn *websocket.Conn
headers http.Header
flushedHeaders http.Header
writtenHeaders bool
wsConn *websocket.Conn
headers http.Header
flushedHeaders http.Header
timeOutInterval time.Duration
timer *timer.Timer
}

func newWebSocketResponseWriter(wsConn *websocket.Conn) *webSocketResponseWriter {
Expand All @@ -31,6 +35,33 @@ func newWebSocketResponseWriter(wsConn *websocket.Conn) *webSocketResponseWriter
}
}

func (w *webSocketResponseWriter) enablePing(timeOutInterval time.Duration) {
w.timeOutInterval = timeOutInterval
w.timer = timer.NewTimer(w.timeOutInterval)
dispose := make(chan bool)
w.wsConn.SetCloseHandler(func(code int, text string) error {
close(dispose)
return nil
})
go w.ping(dispose)
}

func (w *webSocketResponseWriter) ping(dispose chan bool) {
if dispose == nil {
return
}
defer w.timer.Stop()
for {
select {
case <-dispose:
return
case <-w.timer.C:
w.timer.Reset(w.timeOutInterval)
w.wsConn.WriteMessage(websocket.PingMessage, []byte{})
}
}
}

func (w *webSocketResponseWriter) Header() http.Header {
return w.headers
}
Expand All @@ -39,6 +70,9 @@ func (w *webSocketResponseWriter) Write(b []byte) (int, error) {
if !w.writtenHeaders {
w.WriteHeader(http.StatusOK)
}
if w.timeOutInterval > time.Second && w.timer != nil {
w.timer.Reset(w.timeOutInterval)
}
return len(b), w.wsConn.WriteMessage(websocket.BinaryMessage, b)
}

Expand Down
3 changes: 3 additions & 0 deletions go/grpcweb/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ func (w *WrappedGrpcServer) handleWebSocket(wsConn *websocket.Conn, req *http.Re
defer cancelFunc()

respWriter := newWebSocketResponseWriter(wsConn)
if w.opts.websocketPingInterval >= time.Second {
respWriter.enablePing(w.opts.websocketPingInterval)
}
wrappedReader := newWebsocketWrappedReader(wsConn, respWriter, cancelFunc)

req.Body = wrappedReader
Expand Down
10 changes: 9 additions & 1 deletion go/grpcwebproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ var (
runHttpServer = pflag.Bool("run_http_server", true, "whether to run HTTP server")
runTlsServer = pflag.Bool("run_tls_server", true, "whether to run TLS server")

useWebsockets = pflag.Bool("use_websockets", false, "whether to use beta websocket transport layer")
useWebsockets = pflag.Bool("use_websockets", false, "whether to use beta websocket transport layer")
websocketPingInterval = pflag.Duration("websocket_ping_interval", 0, "whether to use websocket keepalive pinging. Only used when using websockets. Configured interval must be >= 1s.")

flagHttpMaxWriteTimeout = pflag.Duration("server_http_max_write_timeout", 10*time.Second, "HTTP server config, max write duration.")
flagHttpMaxReadTimeout = pflag.Duration("server_http_max_read_timeout", 10*time.Second, "HTTP server config, max read duration.")
Expand Down Expand Up @@ -71,6 +72,13 @@ func main() {
grpcweb.WithWebsockets(true),
grpcweb.WithWebsocketOriginFunc(makeWebsocketOriginFunc(allowedOrigins)),
)
if *websocketPingInterval >= time.Second {
logrus.Infof("websocket keepalive pinging enabled, the timeout interval is %s", websocketPingInterval.String())
}
options = append(
options,
grpcweb.WithWebsocketPingInterval(*websocketPingInterval),
)
}
wrappedGrpc := grpcweb.WrapServer(grpcServer, options...)

Expand Down

0 comments on commit 4529aac

Please sign in to comment.