diff --git a/contrib/golang/filters/http/source/go/pkg/http/capi_impl.go b/contrib/golang/filters/http/source/go/pkg/http/capi_impl.go index a195c3b26162..a9a91abd921a 100644 --- a/contrib/golang/filters/http/source/go/pkg/http/capi_impl.go +++ b/contrib/golang/filters/http/source/go/pkg/http/capi_impl.go @@ -35,7 +35,6 @@ import ( "errors" "runtime" "strings" - "sync/atomic" "unsafe" "google.golang.org/protobuf/proto" @@ -321,17 +320,16 @@ func (c *httpCApiImpl) HttpGetDynamicMetadata(rr unsafe.Pointer, filterName stri r := (*httpRequest)(rr) r.mutex.Lock() defer r.mutex.Unlock() - r.sema.Add(1) + r.markMayWaitingCallback() var valueData C.uint64_t var valueLen C.int res := C.envoyGoFilterHttpGetDynamicMetadata(unsafe.Pointer(r.req), unsafe.Pointer(unsafe.StringData(filterName)), C.int(len(filterName)), &valueData, &valueLen) if res == C.CAPIYield { - atomic.AddInt32(&r.waitingOnEnvoy, 1) - r.sema.Wait() + r.checkOrWaitCallback() } else { - r.sema.Done() + r.markNoWaitingCallback() handleCApiStatus(res) } buf := unsafe.Slice((*byte)(unsafe.Pointer(uintptr(valueData))), int(valueLen)) @@ -394,14 +392,13 @@ func (c *httpCApiImpl) HttpGetStringFilterState(rr unsafe.Pointer, key string) s var valueLen C.int r.mutex.Lock() defer r.mutex.Unlock() - r.sema.Add(1) + r.markMayWaitingCallback() res := C.envoyGoFilterHttpGetStringFilterState(unsafe.Pointer(r.req), unsafe.Pointer(unsafe.StringData(key)), C.int(len(key)), &valueData, &valueLen) if res == C.CAPIYield { - atomic.AddInt32(&r.waitingOnEnvoy, 1) - r.sema.Wait() + r.checkOrWaitCallback() } else { - r.sema.Done() + r.markNoWaitingCallback() handleCApiStatus(res) } @@ -416,15 +413,14 @@ func (c *httpCApiImpl) HttpGetStringProperty(rr unsafe.Pointer, key string) (str var rc C.int r.mutex.Lock() defer r.mutex.Unlock() - r.sema.Add(1) + r.markMayWaitingCallback() res := C.envoyGoFilterHttpGetStringProperty(unsafe.Pointer(r.req), unsafe.Pointer(unsafe.StringData(key)), C.int(len(key)), &valueData, &valueLen, &rc) if res == C.CAPIYield { - atomic.AddInt32(&r.waitingOnEnvoy, 1) - r.sema.Wait() + r.checkOrWaitCallback() res = C.CAPIStatus(rc) } else { - r.sema.Done() + r.markNoWaitingCallback() handleCApiStatus(res) } diff --git a/contrib/golang/filters/http/source/go/pkg/http/filter.go b/contrib/golang/filters/http/source/go/pkg/http/filter.go index c9b1d3424c75..b90629b2330c 100644 --- a/contrib/golang/filters/http/source/go/pkg/http/filter.go +++ b/contrib/golang/filters/http/source/go/pkg/http/filter.go @@ -35,6 +35,7 @@ import ( "fmt" "runtime" "sync" + "sync/atomic" "unsafe" "github.com/envoyproxy/envoy/contrib/golang/common/go/api" @@ -47,6 +48,11 @@ const ( HTTP30 = "HTTP/3.0" ) +const ( + NoWaitingCallback = 0 + MayWaitingCallback = 1 +) + var protocolsIdToName = map[uint64]string{ 0: HTTP10, 1: HTTP11, @@ -59,12 +65,57 @@ type panicInfo struct { details string } type httpRequest struct { - req *C.httpRequest - httpFilter api.StreamFilter - pInfo panicInfo - sema sync.WaitGroup - waitingOnEnvoy int32 - mutex sync.Mutex + req *C.httpRequest + httpFilter api.StreamFilter + pInfo panicInfo + waitingLock sync.Mutex // protect waitingCallback + cond sync.Cond + waitingCallback int32 + + // protect multiple cases: + // 1. protect req_->strValue in the C++ side from being used concurrently. + // 2. protect waitingCallback from being modified in markMayWaitingCallback concurrently. + mutex sync.Mutex +} + +// markWaitingOnEnvoy marks the request may be waiting a callback from envoy. +// Must be the NoWaitingCallback state since it's invoked under the r.mutex lock. +// We do not do lock waitingCallback here, to reduce lock contention. +func (r *httpRequest) markMayWaitingCallback() { + if !atomic.CompareAndSwapInt32(&r.waitingCallback, NoWaitingCallback, MayWaitingCallback) { + panic("markWaitingCallback: unexpected state") + } +} + +// markNoWaitingOnEnvoy marks the request is not waiting a callback from envoy. +// Can not make sure it's in the MayWaitingCallback state, since the state maybe changed by OnDestroy. +func (r *httpRequest) markNoWaitingCallback() { + atomic.StoreInt32(&r.waitingCallback, NoWaitingCallback) +} + +// checkOrWaitCallback checks if we need to wait a callback from envoy, and wait it. +func (r *httpRequest) checkOrWaitCallback() { + // need acquire the lock, since there might be concurrency race with resumeWaitCallback. + r.cond.L.Lock() + defer r.cond.L.Unlock() + + // callback or OnDestroy already called, no need to wait. + if atomic.LoadInt32(&r.waitingCallback) == NoWaitingCallback { + return + } + r.cond.Wait() +} + +// resumeWaitCallback resumes the goroutine that waiting for the callback from envoy. +func (r *httpRequest) resumeWaitCallback() { + // need acquire the lock, since there might be concurrency race with checkOrWaitCallback. + r.cond.L.Lock() + defer r.cond.L.Unlock() + + if atomic.CompareAndSwapInt32(&r.waitingCallback, MayWaitingCallback, NoWaitingCallback) { + // Broadcast is safe even there is no waiters. + r.cond.Broadcast() + } } func (r *httpRequest) pluginName() string { diff --git a/contrib/golang/filters/http/source/go/pkg/http/shim.go b/contrib/golang/filters/http/source/go/pkg/http/shim.go index 45d4cf1d4fb6..f77db4770b1f 100644 --- a/contrib/golang/filters/http/source/go/pkg/http/shim.go +++ b/contrib/golang/filters/http/source/go/pkg/http/shim.go @@ -37,7 +37,6 @@ import ( "fmt" "runtime" "sync" - "sync/atomic" "github.com/envoyproxy/envoy/contrib/golang/common/go/api" ) @@ -83,6 +82,7 @@ func createRequest(r *C.httpRequest) *httpRequest { req := &httpRequest{ req: r, } + req.cond.L = &req.waitingLock // NP: make sure filter will be deleted. runtime.SetFinalizer(req, requestFinalize) @@ -214,9 +214,6 @@ func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) { } defer req.RecoverPanic() - if atomic.CompareAndSwapInt32(&req.waitingOnEnvoy, 1, 0) { - req.sema.Done() - } v := api.AccessLogType(logType) @@ -238,9 +235,8 @@ func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) { req := getRequest(r) // do nothing even when req.panic is true, since filter is already destroying. defer req.RecoverPanic() - if atomic.CompareAndSwapInt32(&req.waitingOnEnvoy, 1, 0) { - req.sema.Done() - } + + req.resumeWaitCallback() v := api.DestroyReason(reason) @@ -259,7 +255,5 @@ func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) { func envoyGoRequestSemaDec(r *C.httpRequest) { req := getRequest(r) defer req.RecoverPanic() - if atomic.CompareAndSwapInt32(&req.waitingOnEnvoy, 1, 0) { - req.sema.Done() - } + req.resumeWaitCallback() }