From 7bb3905c0c6f5c629d3f9430a8fab2f4e4a2d04d Mon Sep 17 00:00:00 2001 From: xiaolongran Date: Tue, 5 Jan 2021 20:07:21 +0800 Subject: [PATCH 1/2] add auth token flags for perf Signed-off-by: xiaolongran --- perf/perf-consumer.go | 5 ++--- perf/perf-producer.go | 1 + perf/pulsar-perf-go.go | 28 +++++++++++++++++++++++++++- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go index 9170ee3..30ea5d0 100644 --- a/perf/perf-consumer.go +++ b/perf/perf-consumer.go @@ -65,9 +65,7 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) { b, _ = json.MarshalIndent(consumeArgs, "", " ") log.Info("Consumer config: ", string(b)) - client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: clientArgs.ServiceURL, - }) + client, err := NewClient() if err != nil { log.Fatal(err) @@ -92,6 +90,7 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) { // Print stats of the consume rate tick := time.NewTicker(10 * time.Second) + defer tick.Stop() for { select { diff --git a/perf/perf-producer.go b/perf/perf-producer.go index 7d43cf3..7f78628 100644 --- a/perf/perf-producer.go +++ b/perf/perf-producer.go @@ -136,6 +136,7 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) { // Print stats of the publish rate and latencies tick := time.NewTicker(10 * time.Second) + defer tick.Stop() q := quantile.NewTargeted(0.50, 0.95, 0.99, 0.999, 1.0) messagesPublished := 0 diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go index a4ecb2c..306c232 100644 --- a/perf/pulsar-perf-go.go +++ b/perf/pulsar-perf-go.go @@ -20,6 +20,7 @@ package main import ( "context" "fmt" + "io/ioutil" "net/http" _ "net/http/pprof" "os" @@ -41,7 +42,10 @@ var flagDebug bool var PrometheusPort int type ClientArgs struct { - ServiceURL string + ServiceURL string + TokenFile string + TLSTrustCertFile string + AuthToken string } var clientArgs ClientArgs @@ -50,6 +54,25 @@ func NewClient() (pulsar.Client, error) { clientOpts := pulsar.ClientOptions{ URL: clientArgs.ServiceURL, } + + if clientArgs.TokenFile != "" { + // read JWT from the file + tokenBytes, err := ioutil.ReadFile(clientArgs.TokenFile) + if err != nil { + log.WithError(err).Errorf("failed to read Pulsar JWT from a file %s", clientArgs.TokenFile) + os.Exit(1) + } + clientOpts.Authentication = pulsar.NewAuthenticationToken(string(tokenBytes)) + } + + if clientArgs.TLSTrustCertFile != "" { + clientOpts.TLSTrustCertsFilePath = clientArgs.TLSTrustCertFile + } + + if clientArgs.AuthToken != "" { + clientOpts.Authentication = pulsar.NewAuthenticationToken(clientArgs.AuthToken) + } + return pulsar.NewClient(clientOpts) } @@ -79,6 +102,9 @@ func main() { flags.BoolVar(&flagDebug, "debug", false, "enable debug output") flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u", "pulsar://localhost:6650", "The Pulsar service URL") + flags.StringVar(&clientArgs.TokenFile, "token-file", "", "file path to the Pulsar JWT file") + flags.StringVar(&clientArgs.AuthToken, "token-string", "", "string to the Pulsar JWT") + flags.StringVar(&clientArgs.TLSTrustCertFile, "trust-cert-file", "", "file path to the trusted certificate file") rootCmd.AddCommand(newProducerCommand()) rootCmd.AddCommand(newConsumerCommand()) From 8be6d3391671a662e0d6f167c95618988eb228a7 Mon Sep 17 00:00:00 2001 From: xiaolongran Date: Tue, 5 Jan 2021 20:14:09 +0800 Subject: [PATCH 2/2] fix code style Signed-off-by: xiaolongran --- perf/pulsar-perf-go.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go index 306c232..a52eba6 100644 --- a/perf/pulsar-perf-go.go +++ b/perf/pulsar-perf-go.go @@ -45,7 +45,7 @@ type ClientArgs struct { ServiceURL string TokenFile string TLSTrustCertFile string - AuthToken string + AuthToken string } var clientArgs ClientArgs @@ -98,13 +98,15 @@ func main() { flags := rootCmd.PersistentFlags() flags.BoolVar(&FlagProfile, "profile", false, "enable profiling") - flags.IntVar(&PrometheusPort, "metrics", 8000, "Port to use to export metrics for Prometheus. Use -1 to disable.") + flags.IntVar(&PrometheusPort, "metrics", 8000, + "Port to use to export metrics for Prometheus. Use -1 to disable.") flags.BoolVar(&flagDebug, "debug", false, "enable debug output") flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u", "pulsar://localhost:6650", "The Pulsar service URL") flags.StringVar(&clientArgs.TokenFile, "token-file", "", "file path to the Pulsar JWT file") flags.StringVar(&clientArgs.AuthToken, "token-string", "", "string to the Pulsar JWT") - flags.StringVar(&clientArgs.TLSTrustCertFile, "trust-cert-file", "", "file path to the trusted certificate file") + flags.StringVar(&clientArgs.TLSTrustCertFile, "trust-cert-file", "", + "file path to the trusted certificate file") rootCmd.AddCommand(newProducerCommand()) rootCmd.AddCommand(newConsumerCommand())