Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

analyzer: Fixes for running analyzer in diverse local environments #150

Merged
merged 3 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/streamStatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
"errors"
"net/http"

"github.com/livepeer/livepeer-data/health"
Expand All @@ -16,6 +17,11 @@ const (

func streamStatus(healthcore *health.Core) middleware {
return inlineMiddleware(func(rw http.ResponseWriter, r *http.Request, next http.Handler) {
if healthcore == nil {
respondError(rw, http.StatusNotImplemented, errors.New("stream healthcore is unavailable"))
return
}

streamID := apiParam(r, streamIDParam)
if streamID == "" {
next.ServeHTTP(rw, r)
Expand Down
41 changes: 31 additions & 10 deletions cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ type BuildFlags struct {
type cliFlags struct {
mistJson bool

rabbitmqUri string
amqpUri string
// stream health

enableStreamHealth bool
rabbitmqUri string
amqpUri string

golivepeerExchange string
shardPrefixesFlag string
Expand All @@ -41,6 +44,8 @@ type cliFlags struct {
streamingOpts health.StreamingOptions
memoryRecordsTtl time.Duration

// data analytics

viewsOpts views.ClientOptions
usageOpts usage.ClientOptions
}
Expand All @@ -51,6 +56,7 @@ func parseFlags(version string) cliFlags {

fs.BoolVar(&cli.mistJson, "j", false, "Print application info as json")

fs.BoolVar(&cli.enableStreamHealth, "enable-stream-health", true, "Whether to enable the stream health services and API")
fs.StringVar(&cli.rabbitmqUri, "rabbitmq-uri", "amqp://guest:guest@localhost:5672/livepeer", "URI for RabbitMQ server to consume from. Can be specified as a default AMQP URI which will be converted to stream protocol.")
fs.StringVar(&cli.amqpUri, "amqp-uri", "", "Explicit AMQP URI in case of non-default protocols/ports (optional). Must point to the same cluster as rabbitmqUri")

Expand Down Expand Up @@ -120,6 +126,9 @@ func parseFlags(version string) cliFlags {
os.Exit(0)
}

cli.usageOpts.BigQueryCredentialsJSON = cli.viewsOpts.BigQueryCredentialsJSON
cli.usageOpts.Livepeer = cli.viewsOpts.Livepeer

return cli
}

Expand All @@ -130,6 +139,21 @@ func Run(build BuildFlags) {
glog.Infof("Stream health care system starting up... version=%q", build.Version)
ctx := contextUntilSignal(context.Background(), syscall.SIGINT, syscall.SIGTERM)

healthcore := provisionStreamHealthcore(ctx, cli)
views, usage := provisionDataAnalytics(cli)

glog.Info("Starting server...")
err := api.ListenAndServe(ctx, cli.serverOpts, healthcore, views, usage)
if err != nil {
glog.Fatalf("Error starting api server. err=%q", err)
}
}

func provisionStreamHealthcore(ctx context.Context, cli cliFlags) *health.Core {
if !cli.enableStreamHealth {
return nil
}

streamUri := cli.rabbitmqUri
if cli.amqpUri == "" && strings.HasPrefix(streamUri, "amqp") {
streamUri, cli.amqpUri = "", streamUri
Expand All @@ -150,24 +174,21 @@ func Run(build BuildFlags) {
glog.Fatalf("Error starting health core. err=%q", err)
}

return healthcore
}

func provisionDataAnalytics(cli cliFlags) (*views.Client, *usage.Client) {
views, err := views.NewClient(cli.viewsOpts)
if err != nil {
glog.Fatalf("Error creating views client. err=%q", err)
}

cli.usageOpts.BigQueryCredentialsJSON = cli.viewsOpts.BigQueryCredentialsJSON
cli.usageOpts.Livepeer = cli.viewsOpts.Livepeer

usage, err := usage.NewClient(cli.usageOpts)
if err != nil {
glog.Fatalf("Error creating usage client. err=%q", err)
}

glog.Info("Starting server...")
err = api.ListenAndServe(ctx, cli.serverOpts, healthcore, views, usage)
if err != nil {
glog.Fatalf("Error starting api server. err=%q", err)
}
return views, usage
}

func contextUntilSignal(parent context.Context, sigs ...os.Signal) context.Context {
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3.2"
services:
rabbitmq:
image: rabbitmq:3.9-rc-management
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- 5552:5552
Expand Down