Skip to content

Commit

Permalink
message receiver supports customized liveness and readiness check (#4707
Browse files Browse the repository at this point in the history
)

* message receiver supports customized liveness and readiness check

* address comments
  • Loading branch information
grac3gao-zz authored Jan 11, 2021
1 parent 79344e8 commit 3f32d79
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 24 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ require (
k8s.io/apiserver v0.18.12
k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible
k8s.io/utils v0.0.0-20200603063816-c1c6865ac451
knative.dev/hack v0.0.0-20201214230143-4ed1ecb8db24
knative.dev/pkg v0.0.0-20210107022335-51c72e24c179
knative.dev/hack v0.0.0-20210108203236-ea9c9a0cac5c
knative.dev/pkg v0.0.0-20210107211936-93874f0ea7c0
knative.dev/reconciler-test v0.0.0-20210108100436-db4d65735605
sigs.k8s.io/yaml v1.2.0
)
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1098,8 +1098,12 @@ k8s.io/utils v0.0.0-20200603063816-c1c6865ac451 h1:v8ud2Up6QK1lNOKFgiIVrZdMg7Mpm
k8s.io/utils v0.0.0-20200603063816-c1c6865ac451/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
knative.dev/hack v0.0.0-20201214230143-4ed1ecb8db24 h1:kIztWfvnIFV8Lhlea02K3YO2mIzcDyQNzrBLn0Oq9sA=
knative.dev/hack v0.0.0-20201214230143-4ed1ecb8db24/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack v0.0.0-20210108203236-ea9c9a0cac5c h1:B/FwfbGrZRCwujjxVzFCc1sqNcAGL5oOm0ZkSSovSU8=
knative.dev/hack v0.0.0-20210108203236-ea9c9a0cac5c/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/pkg v0.0.0-20210107022335-51c72e24c179 h1:lkrgrv69iUk2qhOG9symy15kJUaJZmMybSloi7C3gIw=
knative.dev/pkg v0.0.0-20210107022335-51c72e24c179/go.mod h1:hckgW978SdzPA2H5EDvRPY8xsnPuDZLJLbPf8Jte7Q0=
knative.dev/pkg v0.0.0-20210107211936-93874f0ea7c0 h1:oLySohpGJOAo7LFCKpGEn1JOtZkzZ6QS8tQ03Pgll/0=
knative.dev/pkg v0.0.0-20210107211936-93874f0ea7c0/go.mod h1:hckgW978SdzPA2H5EDvRPY8xsnPuDZLJLbPf8Jte7Q0=
knative.dev/reconciler-test v0.0.0-20210108100436-db4d65735605 h1:gTcj4/ULCzgXEtW+sSd08C5LE3dcPGHU+6/wLT+PVMU=
knative.dev/reconciler-test v0.0.0-20210108100436-db4d65735605/go.mod h1:rmQpZseeqDpg6/ToFzIeV5hTRkOJujaXBCK7iYL7M4E=
pgregory.net/rapid v0.3.3 h1:jCjBsY4ln4Atz78QoBWxUEvAHaFyNDQg9+WU62aCn1U=
Expand Down
26 changes: 23 additions & 3 deletions pkg/kncloudevents/message_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,31 @@ type HTTPMessageReceiver struct {

server *http.Server
listener net.Listener

checker http.HandlerFunc
}

func NewHTTPMessageReceiver(port int) *HTTPMessageReceiver {
return &HTTPMessageReceiver{
// HTTPMessageReceiverOption enables further configuration of a HTTPMessageReceiver.
type HTTPMessageReceiverOption func(*HTTPMessageReceiver)

func NewHTTPMessageReceiver(port int, o ...HTTPMessageReceiverOption) *HTTPMessageReceiver {
h := &HTTPMessageReceiver{
port: port,
}
for _, opt := range o {
opt(h)
}
return h
}

// WithChecker takes a handler func which will run as an additional health check in Drainer.
// kncloudevents HTTPMessageReceiver uses Drainer to perform health check.
// By default, Drainer directly writes StatusOK to kubelet probe if the Pod is not draining.
// Users can configure customized liveness and readiness check logic by defining checker here.
func WithChecker(checker http.HandlerFunc) HTTPMessageReceiverOption {
return func(h *HTTPMessageReceiver) {
h.checker = checker
}
}

// Blocking
Expand All @@ -53,7 +72,8 @@ func (recv *HTTPMessageReceiver) StartListen(ctx context.Context, handler http.H
}

drainer := &handlers.Drainer{
Inner: CreateHandler(handler),
Inner: CreateHandler(handler),
HealthCheck: recv.checker,
}
recv.server = &http.Server{
Addr: recv.listener.Addr().String(),
Expand Down
45 changes: 29 additions & 16 deletions vendor/knative.dev/hack/library.sh
Original file line number Diff line number Diff line change
Expand Up @@ -490,24 +490,37 @@ function start_latest_eventing_sugar_controller() {
function run_go_tool() {
local tool=$2
local install_failed=0
if [[ -z "$(which ${tool})" ]]; then
local action=get
[[ $1 =~ ^[\./].* ]] && action=install
# Avoid running `go get` from root dir of the repository, as it can change go.sum and go.mod files.
# See discussions in https://github.com/golang/go/issues/27643.
if [[ ${action} == "get" && $(pwd) == "${REPO_ROOT_DIR}" ]]; then
local temp_dir="$(mktemp -d)"
# Swallow the output as we are returning the stdout in the end.
pushd "${temp_dir}" > /dev/null 2>&1
GOFLAGS="" go ${action} "$1" || install_failed=1
popd > /dev/null 2>&1
else
GOFLAGS="" go ${action} "$1" || install_failed=1
local run=$1

if [[ "$(basename $1)" != "$2" ]]; then
echo "Assuming tool is in package $2"
run="${run}/$2"
fi

if [[ -z "$(go list -mod=readonly -f '{{.Module.Version}}' $1)" ]]; then
echo "Tool $1/$2 is not included in hack/tools.go, falling back to non-hermetic install (via GOPATH)."
if [[ -z "$(which ${tool})" ]]; then
local action=get
[[ $1 =~ ^[\./].* ]] && action=install
# Avoid running `go get` from root dir of the repository, as it can change go.sum and go.mod files.
# See discussions in https://github.com/golang/go/issues/27643.
if [[ ${action} == "get" && $(pwd) == "${REPO_ROOT_DIR}" ]]; then
local temp_dir="$(mktemp -d)"
# Swallow the output as we are returning the stdout in the end.
pushd "${temp_dir}" > /dev/null 2>&1
GOFLAGS="" go ${action} "$1" || install_failed=1
popd > /dev/null 2>&1
else
GOFLAGS="" go ${action} "$1" || install_failed=1
fi
fi
(( install_failed )) && return ${install_failed}
shift 2
${tool} "$@"
else
shift 2
GOFLAGS="-mod=vendor" go run "${run}" "$@"
fi
(( install_failed )) && return ${install_failed}
shift 2
${tool} "$@"
}

# Add function call to trap
Expand Down
9 changes: 8 additions & 1 deletion vendor/knative.dev/pkg/network/handlers/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ var newTimer = func(d time.Duration) timer {
}

// Drainer wraps an inner http.Handler to support responding to kubelet
// probes and KProbes with a "200 OK" until the handler is told to Drain.
// probes and KProbes with a "200 OK" until the handler is told to Drain,
// or Drainer will optionally run the HealthCheck if it is defined.
// When the Drainer is told to Drain, it will immediately start to fail
// probes with a "500 shutting down", and the call will block until no
// requests have been received for QuietPeriod (defaults to
Expand All @@ -56,6 +57,10 @@ type Drainer struct {
// Mutex guards the initialization and resets of the timer
sync.RWMutex

// HealthCheck is an optional health check that is performed until the drain signal is received.
// When unspecified, a "200 OK" is returned, otherwise this function is invoked.
HealthCheck http.HandlerFunc

// Inner is the http.Handler to which we delegate actual requests.
Inner http.Handler

Expand All @@ -78,6 +83,8 @@ func (d *Drainer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if network.IsKubeletProbe(r) { // Respond to probes regardless of path.
if d.draining() {
http.Error(w, "shutting down", http.StatusServiceUnavailable)
} else if d.HealthCheck != nil {
d.HealthCheck(w, r)
} else {
w.WriteHeader(http.StatusOK)
}
Expand Down
4 changes: 2 additions & 2 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -966,11 +966,11 @@ k8s.io/utils/buffer
k8s.io/utils/integer
k8s.io/utils/pointer
k8s.io/utils/trace
# knative.dev/hack v0.0.0-20201214230143-4ed1ecb8db24
# knative.dev/hack v0.0.0-20210108203236-ea9c9a0cac5c
## explicit
knative.dev/hack
knative.dev/hack/shell
# knative.dev/pkg v0.0.0-20210107022335-51c72e24c179
# knative.dev/pkg v0.0.0-20210107211936-93874f0ea7c0
## explicit
knative.dev/pkg/apiextensions/storageversion
knative.dev/pkg/apiextensions/storageversion/cmd/migrate
Expand Down

0 comments on commit 3f32d79

Please sign in to comment.