Skip to content

Commit

Permalink
Add server option for custom Elasticsearch HTTP client (#1246)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Feb 2, 2021
1 parent 21afdea commit 9d9d8bf
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 31 deletions.
10 changes: 7 additions & 3 deletions common/elasticsearch/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ import (
elasticaws "github.com/olivere/elastic/aws/v4"
)

func newAWSElasticsearchHTTPClient(config AWSRequestSigningConfig) (*http.Client, error) {
func NewAwsHttpClient(config AWSRequestSigningConfig) (*http.Client, error) {
if !config.Enabled {
return nil, nil
}

if config.Region == "" {
config.Region = os.Getenv("AWS_REGION")
if config.Region == "" {
return nil, fmt.Errorf("unable to resolve aws region for obtaining aws es signing credentials")
return nil, fmt.Errorf("unable to resolve AWS region for obtaining AWS Elastic signing credentials")
}
}

Expand All @@ -66,7 +70,7 @@ func newAWSElasticsearchHTTPClient(config AWSRequestSigningConfig) (*http.Client

awsCredentials = awsSession.Config.Credentials
default:
return nil, fmt.Errorf("unknown aws credential provider specified: %+v. Accepted options are 'static', 'environment' or 'session'", config.CredentialProvider)
return nil, fmt.Errorf("unknown AWS credential provider specified: %+v. Accepted options are 'static', 'environment' or 'session'", config.CredentialProvider)
}

return elasticaws.NewV4SigningClient(awsCredentials, config.Region), nil
Expand Down
7 changes: 4 additions & 3 deletions common/elasticsearch/client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ package elasticsearch

import (
"fmt"
"net/http"

"go.temporal.io/server/common/log"
)

func NewClient(config *Config, logger log.Logger) (Client, error) {
func NewClient(config *Config, httpClient *http.Client, logger log.Logger) (Client, error) {
switch config.Version {
case "v6", "":
return newClientV6(config, logger)
return newClientV6(config, httpClient, logger)
case "v7":
return newClientV7(config, logger)
return newClientV7(config, httpClient, logger)
default:
return nil, fmt.Errorf("not supported ElasticSearch version: %v", config.Version)
}
Expand Down
9 changes: 3 additions & 6 deletions common/elasticsearch/client_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package elasticsearch
import (
"context"
"encoding/json"
"net/http"
"strings"
"time"

Expand All @@ -46,7 +47,7 @@ type (
var _ Client = (*clientV6)(nil)

// newClientV6 create a ES client
func newClientV6(config *Config, logger log.Logger) (*clientV6, error) {
func newClientV6(config *Config, httpClient *http.Client, logger log.Logger) (*clientV6, error) {
options := []elastic6.ClientOptionFunc{
elastic6.SetURL(config.URL.String()),
elastic6.SetSniff(false),
Expand All @@ -64,11 +65,7 @@ func newClientV6(config *Config, logger log.Logger) (*clientV6, error) {

options = append(options, getLoggerOptionsV6(config.LogLevel, logger)...)

if config.AWSRequestSigning.Enabled {
httpClient, err := newAWSElasticsearchHTTPClient(config.AWSRequestSigning)
if err != nil {
return nil, err
}
if httpClient != nil {
options = append(options, elastic6.SetHttpClient(httpClient))
}

Expand Down
9 changes: 3 additions & 6 deletions common/elasticsearch/client_v7.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package elasticsearch

import (
"context"
"net/http"
"strings"
"time"

Expand All @@ -44,7 +45,7 @@ type (
var _ Client = (*clientV7)(nil)

// newClientV7 create a ES client
func newClientV7(config *Config, logger log.Logger) (*clientV7, error) {
func newClientV7(config *Config, httpClient *http.Client, logger log.Logger) (*clientV7, error) {
options := []elastic.ClientOptionFunc{
elastic.SetURL(config.URL.String()),
elastic.SetSniff(false),
Expand All @@ -62,11 +63,7 @@ func newClientV7(config *Config, logger log.Logger) (*clientV7, error) {

options = append(options, getLoggerOptions(config.LogLevel, logger)...)

if config.AWSRequestSigning.Enabled {
httpClient, err := newAWSElasticsearchHTTPClient(config.AWSRequestSigning)
if err != nil {
return nil, err
}
if httpClient != nil {
options = append(options, elastic.SetHttpClient(httpClient))
}

Expand Down
2 changes: 1 addition & 1 deletion host/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func NewCluster(options *TestClusterConfig, logger log.Logger) (*TestCluster, er
if options.WorkerConfig.EnableIndexer {
advancedVisibilityWritingMode = dynamicconfig.GetStringPropertyFn(common.AdvancedVisibilityWritingModeOn)
var err error
esClient, err = elasticsearch.NewClient(options.ESConfig, logger)
esClient, err = elasticsearch.NewClient(options.ESConfig, nil, logger)
if err != nil {
return nil, err
}
Expand Down
13 changes: 10 additions & 3 deletions temporal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,17 +309,24 @@ func (s *Server) getServiceParams(
return nil, fmt.Errorf("unable to find advanced visibility store in config for %q key", advancedVisStoreKey)
}

esClient, err := elasticsearch.NewClient(advancedVisStore.ElasticSearch, s.logger)
if s.so.elasticseachHttpClient == nil {
s.so.elasticseachHttpClient, err = elasticsearch.NewAwsHttpClient(advancedVisStore.ElasticSearch.AWSRequestSigning)
if err != nil {
return nil, fmt.Errorf("unable to create AWS HTTP client for Elasticsearch: %w", err)
}
}

esClient, err := elasticsearch.NewClient(advancedVisStore.ElasticSearch, s.so.elasticseachHttpClient, s.logger)
if err != nil {
return nil, fmt.Errorf("error creating elastic search client: %v", err)
return nil, fmt.Errorf("unable to create Elasticsearch client: %w", err)
}
params.ESConfig = advancedVisStore.ElasticSearch
params.ESClient = esClient

// verify index name
indexName, ok := advancedVisStore.ElasticSearch.Indices[common.VisibilityAppName]
if !ok || len(indexName) == 0 {
return nil, errors.New("elastic search config missing visibility index")
return nil, errors.New("visibility index in missing in Elasticsearch config")
}
}

Expand Down
9 changes: 9 additions & 0 deletions temporal/server_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
package temporal

import (
"net/http"

"github.com/uber-go/tally"

"go.temporal.io/server/common/authorization"
Expand Down Expand Up @@ -99,3 +101,10 @@ func WithPersistenceServiceResolver(r resolver.ServiceResolver) ServerOption {
s.persistenceServiceResolver = r
})
}

// Set custom persistence service resolver which will convert service name or address value from config to another a....
func WithElasticsearchHttpClient(c *http.Client) ServerOption {
return newApplyFuncContainer(func(s *serverOptions) {
s.elasticseachHttpClient = c
})
}
21 changes: 12 additions & 9 deletions temporal/server_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package temporal

import (
"fmt"
"net/http"

"github.com/uber-go/tally"

Expand All @@ -37,20 +38,22 @@ import (

type (
serverOptions struct {
config *config.Config
serviceNames []string

config *config.Config
configDir string
env string
zone string

interruptCh <-chan interface{}
blockingStart bool

authorizer authorization.Authorizer
tlsConfigProvider encryption.TLSConfigProvider
claimMapper authorization.ClaimMapper
metricsReporter tally.BaseStatsReporter
persistenceServiceResolver resolver.ServiceResolver
configDir string
env string
zone string

serviceNames []string

interruptCh <-chan interface{}
blockingStart bool
elasticseachHttpClient *http.Client
}
)

Expand Down

0 comments on commit 9d9d8bf

Please sign in to comment.