Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event publishing to kne_cli and controller server #393

Merged
merged 18 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
/go.work.sum
**/super-linter.log
kne_cli/kne_cli
controller/server/server
3 changes: 3 additions & 0 deletions cmd/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ func deployFn(cmd *cobra.Command, args []string) error {
return err
}
d.Progress = viper.GetBool("progress")
d.ReportUsage = viper.GetBool("report_usage")
d.ReportUsageProjectID = viper.GetString("report_usage_project_id")
d.ReportUsageTopicID = viper.GetString("report_usage_topic_id")
if err := d.Deploy(cmd.Context(), viper.GetString("kubecfg")); err != nil {
return err
}
Expand Down
15 changes: 14 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ environment.`,
root.SetOut(os.Stdout)
cfgFile := root.PersistentFlags().String("config_file", defaultCfgFile(), "Path to KNE config file")
root.PersistentFlags().String("kubecfg", defaultKubeCfg(), "kubeconfig file")
root.PersistentFlags().Bool("report_usage", false, "Whether to reporting anonymous usage metrics")
root.PersistentFlags().String("report_usage_project_id", "", "Project to report anonymous usage metrics to")
root.PersistentFlags().String("report_usage_topic_id", "", "Topic to report anonymous usage metrics to")
root.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {
if *cfgFile == "" {
return nil
Expand All @@ -52,6 +55,7 @@ environment.`,
}
}
viper.BindPFlags(cmd.Flags())
viper.SetDefault("report_usage", false)
return nil
}
root.AddCommand(newCreateCmd())
Expand Down Expand Up @@ -139,7 +143,16 @@ func createFn(cmd *cobra.Command, args []string) error {
if err != nil {
return fmt.Errorf("%s: %w", cmd.Use, err)
}
tm, err := topo.New(topopb, topo.WithKubecfg(viper.GetString("kubecfg")), topo.WithBasePath(bp))
opts := []topo.Option{
topo.WithKubecfg(viper.GetString("kubecfg")),
topo.WithBasePath(bp),
topo.WithUsageReporting(
viper.GetBool("report_usage"),
viper.GetString("report_usage_project_id"),
viper.GetString("report_usage_topic_id"),
),
}
tm, err := topo.New(topopb, opts...)
if err != nil {
return fmt.Errorf("%s: %w", cmd.Use, err)
}
Expand Down
15 changes: 13 additions & 2 deletions controller/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ var (
defaultCEOSLabOperator = ""
defaultLemmingOperator = ""
// Flags.
port = flag.Int("port", 50051, "Controller server port")
port = flag.Int("port", 50051, "Controller server port")
reportUsage = flag.Bool("report_usage", false, "Whether to reporting anonymous usage metrics")
reportUsageProjectID = flag.String("report_usage_project_id", "", "Project to report anonymous usage metrics to")
reportUsageTopicID = flag.String("report_usage_topic_id", "", "Topic to report anonymous usage metrics to")
)

func init() {
Expand Down Expand Up @@ -267,6 +270,10 @@ func newDeployment(req *cpb.CreateClusterRequest) (*deploy.Deployment, error) {
return nil, fmt.Errorf("controller type not supported: %T", t)
}
}
d.Progress = true
d.ReportUsage = *reportUsage
d.ReportUsageProjectID = *reportUsageProjectID
d.ReportUsageTopicID = *reportUsageTopicID
return d, nil
}

Expand Down Expand Up @@ -369,7 +376,11 @@ func (s *server) CreateTopology(ctx context.Context, req *cpb.CreateTopologyRequ
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "kubecfg %q does not exist: %v", path, err)
}
tm, err := topo.New(topoPb, topo.WithKubecfg(kcfg))
opts := []topo.Option{
topo.WithKubecfg(kcfg),
topo.WithUsageReporting(*reportUsage, *reportUsageProjectID, *reportUsageTopicID),
}
tm, err := topo.New(topoPb, opts...)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to create topology manager: %v", err)
}
Expand Down
10 changes: 10 additions & 0 deletions controller/server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func TestNewDeployment(t *testing.T) {
CNI: &deploy.MeshnetSpec{
Manifest: testFile.Name(),
},
Progress: true,
},
}, {
desc: "request spec - with ixiatg controller",
Expand Down Expand Up @@ -182,6 +183,7 @@ func TestNewDeployment(t *testing.T) {
ConfigMap: testFile.Name(),
},
},
Progress: true,
},
}, {
desc: "request spec - with srlinux controller",
Expand Down Expand Up @@ -244,6 +246,7 @@ func TestNewDeployment(t *testing.T) {
Operator: testFile.Name(),
},
},
Progress: true,
},
}, {
desc: "request spec - with ceoslab controller",
Expand Down Expand Up @@ -306,6 +309,7 @@ func TestNewDeployment(t *testing.T) {
Operator: testFile.Name(),
},
},
Progress: true,
},
}, {
desc: "request spec - with lemming controller",
Expand Down Expand Up @@ -368,6 +372,7 @@ func TestNewDeployment(t *testing.T) {
Operator: testFile.Name(),
},
},
Progress: true,
},
}, {
desc: "request spec - with multiple controllers empty filepath",
Expand Down Expand Up @@ -466,6 +471,7 @@ func TestNewDeployment(t *testing.T) {
Operator: defTestFile.Name(),
},
},
Progress: true,
},
}, {
desc: "request spec - with multiple controllers",
Expand Down Expand Up @@ -578,6 +584,7 @@ func TestNewDeployment(t *testing.T) {
Operator: testFile.Name(),
},
},
Progress: true,
},
}, {
desc: "request spec - with multiple controllers data",
Expand Down Expand Up @@ -690,6 +697,7 @@ func TestNewDeployment(t *testing.T) {
OperatorData: testData,
},
},
Progress: true,
},
}, {
desc: "request spec - without ixiatg config map",
Expand Down Expand Up @@ -753,6 +761,7 @@ func TestNewDeployment(t *testing.T) {
ConfigMap: defTestFile.Name(),
},
},
Progress: true,
},
}, {
desc: "request spec - default ixiatg config map dne",
Expand Down Expand Up @@ -842,6 +851,7 @@ func TestNewDeployment(t *testing.T) {
Operator: defTestFile.Name(),
},
},
Progress: true,
},
}, {
desc: "request spec - default manifest paths dne",
Expand Down
67 changes: 66 additions & 1 deletion deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
kexec "github.com/openconfig/kne/exec"
"github.com/openconfig/kne/load"
logshim "github.com/openconfig/kne/logshim"
"github.com/openconfig/kne/metrics"
"github.com/openconfig/kne/pods"
epb "github.com/openconfig/kne/proto/event"
metallbv1 "go.universe.tf/metallb/api/v1beta1"
"golang.org/x/oauth2/google"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -123,6 +125,20 @@ type Deployment struct {
// If Progress is true then deployment status updates will be sent to
// standard output.
Progress bool

// If ReportUsage is true then anonymous usage metrics will be
// published using Cloud PubSub.
ReportUsage bool
// ReportUsageProjectID is the ID of the GCP project the usage
// metrics should be written to. This field is not used if
// ReportUsage is unset. An empty string will result in the
// default project being used.
ReportUsageProjectID string
// ReportUsageTopicID is the ID of the GCP PubSub topic the usage
// metrics should be written to. This field is not used if
// ReportUsage is unset. An empty string will result in the
// default topic being used.
ReportUsageTopicID string
}

func (d *Deployment) String() string {
Expand All @@ -146,7 +162,56 @@ type kubeVersion struct {
ServerVersion *kversion.Info `json:"serverVersion,omitempty" yaml:"serverVersion,omitempty"`
}

// event turns the deployment into a cluster event protobuf.
func (d *Deployment) event() *epb.Cluster {
c := &epb.Cluster{}
switch d.Cluster.(type) {
case *ExternalSpec:
c.Cluster = epb.Cluster_CLUSTER_TYPE_EXTERNAL
case *KindSpec:
c.Cluster = epb.Cluster_CLUSTER_TYPE_KIND
}
switch d.Ingress.(type) {
case *MetalLBSpec:
c.Ingress = epb.Cluster_INGRESS_TYPE_METALLB
}
switch d.CNI.(type) {
case *MeshnetSpec:
c.Cni = epb.Cluster_CNI_TYPE_MESHNET
}
for _, cntrl := range d.Controllers {
switch cntrl.(type) {
bormanp marked this conversation as resolved.
Show resolved Hide resolved
case *CEOSLabSpec:
c.Controllers = append(c.Controllers, epb.Cluster_CONTROLLER_TYPE_CEOSLAB)
case *IxiaTGSpec:
c.Controllers = append(c.Controllers, epb.Cluster_CONTROLLER_TYPE_IXIATG)
case *SRLinuxSpec:
c.Controllers = append(c.Controllers, epb.Cluster_CONTROLLER_TYPE_SRLINUX)
case *LemmingSpec:
c.Controllers = append(c.Controllers, epb.Cluster_CONTROLLER_TYPE_LEMMING)
}
}
return c
}

func (d *Deployment) Deploy(ctx context.Context, kubecfg string) (rerr error) {
if d.ReportUsage {
r, err := metrics.NewReporter(ctx, d.ReportUsageProjectID, d.ReportUsageTopicID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the entire thing in a method so you can say:

// reportUsage returns a function to defer by the caller.
//
//      if d.ReportUsage {
//              finish = d.reportUsage(ctx)
//              defer finish(rerr)
//      }
func (d *Deployment) reportUsage(ctx context.Context) func(error) {
        r, err := metrics.NewReporter(ctx, d.ReportUsageProjectID, d.ReportUsageTopicID)
        if err != nil {
                log.Warningf("Unable to create metrics reporter: %v", err)
                return func(_ error) {}
        }
        if id, err := r.ReportDeployClusterStart(ctx, d.event()); err != nil {
                log.Warningf("Unable to report cluster deployment start event: %v", err)
                return func(_ error) { r.Close() }
        }
        return func(rerr error) {
                if err := r.ReportDeployClusterEnd(ctx, id, rerr); err != nil {
                        log.Warningf("Unable to report cluster deployment end event: %v", err)
                }
        }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err != nil {
log.Warningf("Unable to create metrics reporter: %v", err)
} else {
defer r.Close()
if id, err := r.ReportDeployClusterStart(ctx, d.event()); err != nil {
log.Warningf("Unable to report cluster deployment start event: %v", err)
} else {
defer func() {
if err := r.ReportDeployClusterEnd(ctx, id, rerr); err != nil {
log.Warningf("Unable to report cluster deployment end event: %v", err)
}
}()
}
}
}
if err := d.checkDependencies(); err != nil {
return err
}
Expand Down Expand Up @@ -195,7 +260,7 @@ func (d *Deployment) Deploy(ctx context.Context, kubecfg string) (rerr error) {
if kClientVersion.Less(kServerVersion) {
log.Warning("Kube client and server versions are not within expected range.")
}
log.V(1).Info("Found k8s versions:\n", output)
log.V(1).Info("Found k8s versions:\n", string(output))

ctx, cancel := context.WithCancel(ctx)

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ require (
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/api v0.114.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
69 changes: 68 additions & 1 deletion topo/topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/kr/pretty"
topologyclientv1 "github.com/networkop/meshnet-cni/api/clientset/v1beta1"
topologyv1 "github.com/networkop/meshnet-cni/api/types/v1beta1"
"github.com/openconfig/kne/metrics"
cpb "github.com/openconfig/kne/proto/controller"
epb "github.com/openconfig/kne/proto/event"
tpb "github.com/openconfig/kne/proto/topo"
"github.com/openconfig/kne/topo/node"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -64,6 +66,19 @@ type Manager struct {
tClient topologyclientv1.Interface
rCfg *rest.Config
basePath string

// If reportUsage is set, report anonymous usage metrics.
reportUsage bool
// reportUsageProjectID is the ID of the GCP project the usage
// metrics should be written to. This field is not used if
// ReportUsage is unset. An empty string will result in the
// default project being used.
reportUsageProjectID string
// reportUsageTopicID is the ID of the GCP PubSub topic the usage
// metrics should be written to. This field is not used if
// ReportUsage is unset. An empty string will result in the
// default topic being used.
reportUsageTopicID string
}

type Option func(m *Manager)
Expand Down Expand Up @@ -98,6 +113,14 @@ func WithBasePath(s string) Option {
}
}

func WithUsageReporting(b bool, project, topic string) Option {
return func(m *Manager) {
m.reportUsage = b
m.reportUsageProjectID = project
m.reportUsageTopicID = topic
}
}

// New creates a new Manager based on the provided topology. The cluster config
// passed from the WithClusterConfig option overrides the determined in-cluster
// config. If neither of these configurations can be used then the kubecfg passed
Expand Down Expand Up @@ -146,9 +169,53 @@ func New(topo *tpb.Topology, opts ...Option) (*Manager, error) {
return m, nil
}

// event creates a topology event protobuf from the topo.
func (m *Manager) event() *epb.Topology {
t := &epb.Topology{
LinkCount: int64(len(m.topo.Links)),
}
for _, node := range m.topo.Nodes {
t.Nodes = append(t.Nodes, &epb.Node{
Vendor: node.Vendor,
Model: node.Model,
})
}
return t
}

var (
// Stubs for testing.
newMetricsReporter = func(ctx context.Context, project, topic string) (metricsReporter, error) {
return metrics.NewReporter(ctx, project, topic)
}
)

type metricsReporter interface {
ReportCreateTopologyStart(context.Context, *epb.Topology) (string, error)
ReportCreateTopologyEnd(context.Context, string, error) error
Close() error
}

// Create creates the topology in the cluster.
func (m *Manager) Create(ctx context.Context, timeout time.Duration) error {
func (m *Manager) Create(ctx context.Context, timeout time.Duration) (rerr error) {
log.V(1).Infof("Topology:\n%v", prototext.Format(m.topo))
if m.reportUsage {
bormanp marked this conversation as resolved.
Show resolved Hide resolved
r, err := newMetricsReporter(ctx, m.reportUsageProjectID, m.reportUsageTopicID)
if err != nil {
log.Warningf("Unable to create metrics reporter: %v", err)
} else {
defer r.Close()
if id, err := r.ReportCreateTopologyStart(ctx, m.event()); err != nil {
log.Warningf("Unable to report topology creation start event: %v", err)
} else {
defer func() {
if err := r.ReportCreateTopologyEnd(ctx, id, rerr); err != nil {
log.Warningf("Unable to report topology creation end event: %v", err)
}
}()
}
}
}
if err := m.push(ctx); err != nil {
return err
}
Expand Down
Loading