Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Sync logs to local file #632

Merged
merged 6 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions apiserver/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"net"
"net/http"
"os"
"path"
"strings"

Expand All @@ -30,11 +31,20 @@ var (
rpcPortFlag = flag.String("rpcPortFlag", ":8887", "RPC Port")
httpPortFlag = flag.String("httpPortFlag", ":8888", "Http Proxy Port")
collectMetricsFlag = flag.Bool("collectMetricsFlag", true, "Whether to collect Prometheus metrics in API server.")
logFile = flag.String("logFilePath", "", "sync log to local file")
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
)

func main() {
flag.Parse()

if *logFile != "" {
flagSet := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
klog.InitFlags(flagSet)
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
_ = flagSet.Set("alsologtostderr", "true")
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
_ = flagSet.Set("logtostderr", "false")
_ = flagSet.Set("log_file", *logFile)
}

clientManager := manager.NewClientManager()
resourceManager := manager.NewResourceManager(&clientManager)

Expand Down
2 changes: 1 addition & 1 deletion apiserver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/go-openapi/runtime v0.19.31
github.com/golang/glog v1.0.0
github.com/golang/glog v1.0.0 // indirect
github.com/golang/protobuf v1.5.2
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
Expand Down
5 changes: 3 additions & 2 deletions apiserver/pkg/client/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package client
import (
"time"

"github.com/golang/glog"
"k8s.io/klog/v2"

"github.com/ray-project/kuberay/apiserver/pkg/util"
rayclient "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned"
rayiov1alpha1 "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/typed/ray/v1alpha1"
Expand All @@ -25,7 +26,7 @@ func (cc RayClusterClient) RayClusterClient(namespace string) rayiov1alpha1.RayC
func NewRayClusterClientOrFatal(initConnectionTimeout time.Duration, options util.ClientOptions) ClusterClientInterface {
cfg, err := config.GetConfig()
if err != nil {
glog.Fatalf("Failed to create RayCluster client. Error: %v", err)
klog.Fatalf("Failed to create RayCluster client. Error: %v", err)
Jeffwan marked this conversation as resolved.
Show resolved Hide resolved
}
cfg.QPS = options.QPS
cfg.Burst = options.Burst
Expand Down
5 changes: 3 additions & 2 deletions apiserver/pkg/client/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package client
import (
"time"

"github.com/golang/glog"
"k8s.io/klog/v2"

"github.com/ray-project/kuberay/apiserver/pkg/util"
rayclient "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned"
rayiov1alpha1 "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/typed/ray/v1alpha1"
Expand All @@ -25,7 +26,7 @@ func (cc RayJobClient) RayJobClient(namespace string) rayiov1alpha1.RayJobInterf
func NewRayJobClientOrFatal(initConnectionTimeout time.Duration, options util.ClientOptions) JobClientInterface {
cfg, err := config.GetConfig()
if err != nil {
glog.Fatalf("Failed to create RayCluster client. Error: %v", err)
klog.Fatalf("Failed to create RayCluster client. Error: %v", err)
}
cfg.QPS = options.QPS
cfg.Burst = options.Burst
Expand Down
3 changes: 1 addition & 2 deletions apiserver/pkg/client/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package client
import (
"time"

"github.com/golang/glog"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client/config"

Expand Down Expand Up @@ -43,7 +42,7 @@ func (c *KubernetesClient) NamespaceClient() v1.NamespaceInterface {
func CreateKubernetesCoreOrFatal(initConnectionTimeout time.Duration, options util.ClientOptions) KubernetesClientInterface {
cfg, err := config.GetConfig()
if err != nil {
glog.Fatalf("Failed to create TokenReview client. Error: %v", err)
klog.Fatalf("Failed to create TokenReview client. Error: %v", err)
}
cfg.QPS = options.QPS
cfg.Burst = options.Burst
Expand Down
5 changes: 3 additions & 2 deletions apiserver/pkg/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package client
import (
"time"

"github.com/golang/glog"
"k8s.io/klog/v2"

"github.com/ray-project/kuberay/apiserver/pkg/util"
rayclient "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned"
rayiov1alpha1 "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/typed/ray/v1alpha1"
Expand All @@ -25,7 +26,7 @@ func (cc RayServiceClient) RayServiceClient(namespace string) rayiov1alpha1.RayS
func NewRayServiceClientOrFatal(initConnectionTimeout time.Duration, options util.ClientOptions) ServiceClientInterface {
cfg, err := config.GetConfig()
if err != nil {
glog.Fatalf("Failed to create RayService client. Error: %v", err)
klog.Fatalf("Failed to create RayService client. Error: %v", err)
}
cfg.QPS = options.QPS
cfg.Burst = options.Burst
Expand Down
6 changes: 3 additions & 3 deletions apiserver/pkg/model/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"strconv"

"github.com/golang/glog"
"k8s.io/klog/v2"

"github.com/golang/protobuf/ptypes/timestamp"
"github.com/ray-project/kuberay/apiserver/pkg/util"
Expand Down Expand Up @@ -131,7 +131,7 @@ func FromCrdToApiJob(job *v1alpha1.RayJob) (pbJob *api.RayJob) {
defer func() {
err := recover()
if err != nil {
glog.Errorf("failed to transfer job crd to job protobuf, err: %v, crd: %+v", err, job)
klog.Errorf("failed to transfer job crd to job protobuf, err: %v, crd: %+v", err, job)
}
}()

Expand Down Expand Up @@ -178,7 +178,7 @@ func FromCrdToApiService(service *v1alpha1.RayService, events []v1.Event) *api.R
defer func() {
err := recover()
if err != nil {
glog.Errorf("failed to transfer ray service, err: %v, item: %v", err, service)
klog.Errorf("failed to transfer ray service, err: %v, item: %v", err, service)
}
}()

Expand Down
11 changes: 6 additions & 5 deletions apiserver/pkg/util/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package util
import (
"fmt"

"k8s.io/klog/v2"

"github.com/go-openapi/runtime"
"github.com/golang/glog"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -243,9 +244,9 @@ func (e *UserError) wrap(message string) *UserError {
func (e *UserError) Log() {
switch e.externalStatusCode {
case codes.Aborted, codes.InvalidArgument, codes.NotFound, codes.Internal:
glog.Infof("%+v", e.internalError)
klog.Infof("%+v", e.internalError)
default:
glog.Errorf("%+v", e.internalError)
klog.Errorf("%+v", e.internalError)
}
}

Expand Down Expand Up @@ -281,14 +282,14 @@ func LogError(err error) {
e.Log()
default:
// We log all the details.
glog.Errorf("InternalError: %+v", err)
klog.Errorf("InternalError: %+v", err)
}
}

// TerminateIfError Check if error is nil. Terminate if not.
func TerminateIfError(err error) {
if err != nil {
glog.Fatalf("%v", err)
klog.Fatalf("%v", err)
}
}

Expand Down
4 changes: 2 additions & 2 deletions apiserver/pkg/util/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package util
import (
"time"

"github.com/golang/glog"
"k8s.io/klog/v2"
)

type TimeInterface interface {
Expand Down Expand Up @@ -44,7 +44,7 @@ func (f *FakeTime) Now() time.Time {
func ParseTimeOrFatal(value string) time.Time {
result, err := time.Parse(time.RFC3339, value)
if err != nil {
glog.Fatalf("Could not parse time: %+v", err)
klog.Fatalf("Could not parse time: %+v", err)
}
return result.UTC()
}
3 changes: 2 additions & 1 deletion ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.17

require (
github.com/go-logr/logr v1.2.0
github.com/go-logr/zapr v1.2.0
github.com/jarcoal/httpmock v1.2.0
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.17.0
Expand All @@ -13,6 +14,7 @@ require (
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.2
go.uber.org/zap v1.19.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
k8s.io/api v0.23.0
k8s.io/apimachinery v0.23.0
k8s.io/client-go v0.23.0
Expand All @@ -30,7 +32,6 @@ require (
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-logr/zapr v1.2.0 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.5 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
Expand Down
1 change: 1 addition & 0 deletions ray-operator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,7 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
Expand Down
37 changes: 34 additions & 3 deletions ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"fmt"
"os"

"github.com/go-logr/zapr"
"go.uber.org/zap"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/ray-project/kuberay/ray-operator/controllers/ray"

"go.uber.org/zap/zapcore"
Expand All @@ -14,7 +18,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
k8szap "sigs.k8s.io/controller-runtime/pkg/log/zap"

rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"
// +kubebuilder:scaffold:imports
Expand All @@ -41,6 +45,7 @@ func main() {
var probeAddr string
var reconcileConcurrency int
var watchNamespace string
var logFile string
flag.BoolVar(&version, "version", false, "Show the version information.")
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8082", "The address the probe endpoint binds to.")
Expand All @@ -56,7 +61,10 @@ func main() {
"Temporary feature flag - to be deleted after testing")
flag.BoolVar(&ray.ForcedClusterUpgrade, "forced-cluster-upgrade", false,
"Forced cluster upgrade flag")
opts := zap.Options{
flag.StringVar(&logFile, "log-file-path", "",
"sync log to local file")
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved

opts := k8szap.Options{
Development: true,
TimeEncoder: zapcore.ISO8601TimeEncoder,
}
Expand All @@ -69,7 +77,30 @@ func main() {
os.Exit(0)
}

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
if logFile != "" {
fileWriter := &lumberjack.Logger{
Filename: logFile,
MaxSize: 500, // megabytes
MaxBackups: 10, // files
MaxAge: 30, // days
}

pe := zap.NewProductionEncoderConfig()
pe.EncodeTime = zapcore.ISO8601TimeEncoder
consoleEncoder := zapcore.NewConsoleEncoder(pe)

k8sLogger := k8szap.NewRaw(k8szap.UseFlagOptions(&opts))
zapOpts := append(opts.ZapOpts, zap.AddCallerSkip(1))
combineLogger := zap.New(zapcore.NewTee(
k8sLogger.Core(),
zapcore.NewCore(consoleEncoder, zapcore.AddSync(fileWriter), zap.InfoLevel),
)).WithOptions(zapOpts...)
combineLoggerR := zapr.NewLogger(combineLogger)

ctrl.SetLogger(combineLoggerR)
} else {
ctrl.SetLogger(k8szap.New(k8szap.UseFlagOptions(&opts)))
}

setupLog.Info("the operator", "version:", os.Getenv("OPERATOR_VERSION"))
if ray.PrioritizeWorkersToDelete {
Expand Down