Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

golang http fitler: fix race when waiting callback from Envoy #32081

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions contrib/golang/filters/http/source/go/pkg/http/capi_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"errors"
"runtime"
"strings"
"sync/atomic"
"unsafe"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -321,17 +320,16 @@ func (c *httpCApiImpl) HttpGetDynamicMetadata(rr unsafe.Pointer, filterName stri
r := (*httpRequest)(rr)
r.mutex.Lock()
defer r.mutex.Unlock()
r.sema.Add(1)
r.markMayWaitingCallback()

var valueData C.uint64_t
var valueLen C.int
res := C.envoyGoFilterHttpGetDynamicMetadata(unsafe.Pointer(r.req),
unsafe.Pointer(unsafe.StringData(filterName)), C.int(len(filterName)), &valueData, &valueLen)
if res == C.CAPIYield {
atomic.AddInt32(&r.waitingOnEnvoy, 1)
r.sema.Wait()
r.checkOrWaitCallback()
} else {
r.sema.Done()
r.markNoWaitingCallback()
handleCApiStatus(res)
}
buf := unsafe.Slice((*byte)(unsafe.Pointer(uintptr(valueData))), int(valueLen))
Expand Down Expand Up @@ -394,14 +392,13 @@ func (c *httpCApiImpl) HttpGetStringFilterState(rr unsafe.Pointer, key string) s
var valueLen C.int
r.mutex.Lock()
defer r.mutex.Unlock()
r.sema.Add(1)
r.markMayWaitingCallback()
res := C.envoyGoFilterHttpGetStringFilterState(unsafe.Pointer(r.req),
unsafe.Pointer(unsafe.StringData(key)), C.int(len(key)), &valueData, &valueLen)
if res == C.CAPIYield {
atomic.AddInt32(&r.waitingOnEnvoy, 1)
r.sema.Wait()
r.checkOrWaitCallback()
} else {
r.sema.Done()
r.markNoWaitingCallback()
handleCApiStatus(res)
}

Expand All @@ -416,15 +413,14 @@ func (c *httpCApiImpl) HttpGetStringProperty(rr unsafe.Pointer, key string) (str
var rc C.int
r.mutex.Lock()
defer r.mutex.Unlock()
r.sema.Add(1)
r.markMayWaitingCallback()
res := C.envoyGoFilterHttpGetStringProperty(unsafe.Pointer(r.req),
unsafe.Pointer(unsafe.StringData(key)), C.int(len(key)), &valueData, &valueLen, &rc)
if res == C.CAPIYield {
atomic.AddInt32(&r.waitingOnEnvoy, 1)
r.sema.Wait()
r.checkOrWaitCallback()
res = C.CAPIStatus(rc)
} else {
r.sema.Done()
r.markNoWaitingCallback()
handleCApiStatus(res)
}

Expand Down
63 changes: 57 additions & 6 deletions contrib/golang/filters/http/source/go/pkg/http/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"unsafe"

"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
Expand All @@ -47,6 +48,11 @@ const (
HTTP30 = "HTTP/3.0"
)

const (
NoWaitingCallback = 0
MayWaitingCallback = 1
)

var protocolsIdToName = map[uint64]string{
0: HTTP10,
1: HTTP11,
Expand All @@ -59,12 +65,57 @@ type panicInfo struct {
details string
}
type httpRequest struct {
req *C.httpRequest
httpFilter api.StreamFilter
pInfo panicInfo
sema sync.WaitGroup
waitingOnEnvoy int32
mutex sync.Mutex
req *C.httpRequest
httpFilter api.StreamFilter
pInfo panicInfo
waitingLock sync.Mutex // protect waitingCallback
cond sync.Cond
waitingCallback int32

// protect multiple cases:
// 1. protect req_->strValue in the C++ side from being used concurrently.
// 2. protect waitingCallback from being modified in markMayWaitingCallback concurrently.
mutex sync.Mutex
}

// markWaitingOnEnvoy marks the request may be waiting a callback from envoy.
// Must be the NoWaitingCallback state since it's invoked under the r.mutex lock.
// We do not do lock waitingCallback here, to reduce lock contention.
func (r *httpRequest) markMayWaitingCallback() {
if !atomic.CompareAndSwapInt32(&r.waitingCallback, NoWaitingCallback, MayWaitingCallback) {
panic("markWaitingCallback: unexpected state")
}
}

// markNoWaitingOnEnvoy marks the request is not waiting a callback from envoy.
// Can not make sure it's in the MayWaitingCallback state, since the state maybe changed by OnDestroy.
func (r *httpRequest) markNoWaitingCallback() {
atomic.StoreInt32(&r.waitingCallback, NoWaitingCallback)
}

// checkOrWaitCallback checks if we need to wait a callback from envoy, and wait it.
func (r *httpRequest) checkOrWaitCallback() {
// need acquire the lock, since there might be concurrency race with resumeWaitCallback.
r.cond.L.Lock()
defer r.cond.L.Unlock()

// callback or OnDestroy already called, no need to wait.
if atomic.LoadInt32(&r.waitingCallback) == NoWaitingCallback {
return
}
r.cond.Wait()
}

// resumeWaitCallback resumes the goroutine that waiting for the callback from envoy.
func (r *httpRequest) resumeWaitCallback() {
// need acquire the lock, since there might be concurrency race with checkOrWaitCallback.
r.cond.L.Lock()
defer r.cond.L.Unlock()

if atomic.CompareAndSwapInt32(&r.waitingCallback, MayWaitingCallback, NoWaitingCallback) {
// Broadcast is safe even there is no waiters.
r.cond.Broadcast()
}
}

func (r *httpRequest) pluginName() string {
Expand Down
14 changes: 4 additions & 10 deletions contrib/golang/filters/http/source/go/pkg/http/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"fmt"
"runtime"
"sync"
"sync/atomic"

"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
)
Expand Down Expand Up @@ -83,6 +82,7 @@ func createRequest(r *C.httpRequest) *httpRequest {
req := &httpRequest{
req: r,
}
req.cond.L = &req.waitingLock
// NP: make sure filter will be deleted.
runtime.SetFinalizer(req, requestFinalize)

Expand Down Expand Up @@ -214,9 +214,6 @@ func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) {
}

defer req.RecoverPanic()
if atomic.CompareAndSwapInt32(&req.waitingOnEnvoy, 1, 0) {
req.sema.Done()
}

v := api.AccessLogType(logType)

Expand All @@ -238,9 +235,8 @@ func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) {
req := getRequest(r)
// do nothing even when req.panic is true, since filter is already destroying.
defer req.RecoverPanic()
if atomic.CompareAndSwapInt32(&req.waitingOnEnvoy, 1, 0) {
req.sema.Done()
}

req.resumeWaitCallback()

v := api.DestroyReason(reason)

Expand All @@ -259,7 +255,5 @@ func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) {
func envoyGoRequestSemaDec(r *C.httpRequest) {
req := getRequest(r)
defer req.RecoverPanic()
if atomic.CompareAndSwapInt32(&req.waitingOnEnvoy, 1, 0) {
req.sema.Done()
}
req.resumeWaitCallback()
}
Loading