Skip to content

Commit

Permalink
Improve ES logging (#1228)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Jan 26, 2021
1 parent 30aed76 commit 089f746
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 119 deletions.
29 changes: 25 additions & 4 deletions common/elasticsearch/client_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package elasticsearch
import (
"context"
"encoding/json"
"strings"
"time"

elastic6 "github.com/olivere/elastic"
Expand All @@ -51,10 +52,6 @@ func newClientV6(config *Config, logger log.Logger) (*clientV6, error) {
elastic6.SetSniff(false),
elastic6.SetBasicAuth(config.Username, config.Password),

elastic6.SetErrorLog(newErrorLogger(logger)),
// Uncomment next line to enable info logging also.
// elastic6.SetInfoLog(newInfoLogger(logger)),

// Disable health check so we don't block client creation (and thus temporal server startup)
// if the ES instance happens to be down.
elastic6.SetHealthcheck(false),
Expand All @@ -65,6 +62,8 @@ func newClientV6(config *Config, logger log.Logger) (*clientV6, error) {
elastic6.SetDecoder(&elastic6.NumberDecoder{}),
}

options = append(options, getLoggerOptionsV6(config.LogLevel, logger)...)

if config.AWSRequestSigning.Enabled {
httpClient, err := newAWSElasticsearchHTTPClient(config.AWSRequestSigning)
if err != nil {
Expand Down Expand Up @@ -584,3 +583,25 @@ func convertV6IndicesGetSettingsResponseToV7(response *elastic6.IndicesGetSettin
Settings: response.Settings,
}
}

func getLoggerOptionsV6(logLevel string, logger log.Logger) []elastic6.ClientOptionFunc {
switch {
case strings.EqualFold(logLevel, "trace"):
return []elastic6.ClientOptionFunc{
elastic6.SetErrorLog(newErrorLogger(logger)),
elastic6.SetInfoLog(newInfoLogger(logger)),
elastic6.SetTraceLog(newInfoLogger(logger)),
}
case strings.EqualFold(logLevel, "info"):
return []elastic6.ClientOptionFunc{
elastic6.SetErrorLog(newErrorLogger(logger)),
elastic6.SetInfoLog(newInfoLogger(logger)),
}
case strings.EqualFold(logLevel, "error"), logLevel == "": // Default is to log errors only.
return []elastic6.ClientOptionFunc{
elastic6.SetErrorLog(newErrorLogger(logger)),
}
default:
return nil
}
}
29 changes: 25 additions & 4 deletions common/elasticsearch/client_v7.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package elasticsearch

import (
"context"
"strings"
"time"

"github.com/olivere/elastic/v7"
Expand All @@ -49,10 +50,6 @@ func newClientV7(config *Config, logger log.Logger) (*clientV7, error) {
elastic.SetSniff(false),
elastic.SetBasicAuth(config.Username, config.Password),

elastic.SetErrorLog(newErrorLogger(logger)),
// Uncomment next line to enable info logging also.
elastic.SetInfoLog(newInfoLogger(logger)),

// Disable health check so we don't block client creation (and thus temporal server startup)
// if the ES instance happens to be down.
elastic.SetHealthcheck(false),
Expand All @@ -63,6 +60,8 @@ func newClientV7(config *Config, logger log.Logger) (*clientV7, error) {
elastic.SetDecoder(&elastic.NumberDecoder{}),
}

options = append(options, getLoggerOptions(config.LogLevel, logger)...)

if config.AWSRequestSigning.Enabled {
httpClient, err := newAWSElasticsearchHTTPClient(config.AWSRequestSigning)
if err != nil {
Expand Down Expand Up @@ -240,3 +239,25 @@ func (c *clientV7) IndexPutSettings(ctx context.Context, indexName string, bodyS
func (c *clientV7) IndexGetSettings(ctx context.Context, indexName string) (map[string]*elastic.IndicesGetSettingsResponse, error) {
return c.esClient.IndexGetSettings(indexName).Do(ctx)
}

func getLoggerOptions(logLevel string, logger log.Logger) []elastic.ClientOptionFunc {
switch {
case strings.EqualFold(logLevel, "trace"):
return []elastic.ClientOptionFunc{
elastic.SetErrorLog(newErrorLogger(logger)),
elastic.SetInfoLog(newInfoLogger(logger)),
elastic.SetTraceLog(newInfoLogger(logger)),
}
case strings.EqualFold(logLevel, "info"):
return []elastic.ClientOptionFunc{
elastic.SetErrorLog(newErrorLogger(logger)),
elastic.SetInfoLog(newInfoLogger(logger)),
}
case strings.EqualFold(logLevel, "error"), logLevel == "": // Default is to log errors only.
return []elastic.ClientOptionFunc{
elastic.SetErrorLog(newErrorLogger(logger)),
}
default:
return nil
}
}
1 change: 1 addition & 0 deletions common/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type (
Username string `yaml:"username"`
Password string `yaml:"password"`
Indices map[string]string `yaml:"indices"` //nolint:govet
LogLevel string `yaml:"logLevel"`
AWSRequestSigning AWSRequestSigningConfig `yaml:"aws-request-signing"`
}

Expand Down
78 changes: 10 additions & 68 deletions common/persistence/elasticsearch/esProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -166,11 +165,11 @@ func (p *esProcessorImpl) Add(request *elasticsearch.BulkableRequest, visibility
ackChWithStopwatch := newAckChanWithStopwatch(ackCh, &sw)
_, isDup, _ := p.mapToAckChan.PutOrDo(visibilityTaskKey, ackChWithStopwatch, func(key interface{}, value interface{}) error {
ackChWithStopwatchExisting, ok := value.(*ackChanWithStopwatch)
if !ok { // must be bug in code and bad deployment
p.logger.Fatal(fmt.Sprintf("Message has wrong type %T (%T expected).", value, &ackChanWithStopwatch{}), tag.Value(key))
if !ok {
p.logger.Fatal(fmt.Sprintf("mapToAckChan has item of a wrong type %T (%T expected).", value, &ackChanWithStopwatch{}), tag.Value(key))
}

p.logger.Info("Adding duplicate visibility task to ES bulk processor.", tag.Value(key))
p.logger.Warn("Adding duplicate ES request for visibility task key.", tag.Key(visibilityTaskKey), tag.ESDocID(request.ID), tag.Value(request.Doc))

// Nack existing visibility task.
ackChWithStopwatchExisting.addToProcessStopwatch.Stop()
Expand Down Expand Up @@ -218,22 +217,16 @@ func (p *esProcessorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRe
case isResponseSuccess(resp.Status):
p.sendToAckChan(visibilityTaskKey, true)
case !isResponseRetryable(resp.Status):
wid, rid, namespaceID := p.getDocIDs(request)
p.logger.Error("ES request failed.",
tag.ESResponseStatus(resp.Status),
tag.ESResponseError(getErrorMsgFromESResp(resp)),
tag.WorkflowID(wid),
tag.WorkflowRunID(rid),
tag.WorkflowNamespaceID(namespaceID))
tag.ESRequest(request.String()))
p.sendToAckChan(visibilityTaskKey, false)
default: // bulk processor will retry
wid, rid, namespaceID := p.getDocIDs(request)
p.logger.Info("ES request retried.",
p.logger.Warn("ES request retried.",
tag.ESResponseStatus(resp.Status),
tag.ESResponseError(getErrorMsgFromESResp(resp)),
tag.WorkflowID(wid),
tag.WorkflowRunID(rid),
tag.WorkflowNamespaceID(namespaceID))
tag.ESRequest(request.String()))
p.metricsClient.IncCounter(metrics.ElasticSearchVisibility, metrics.ESBulkProcessorRetries)
}
}
Expand All @@ -244,8 +237,8 @@ func (p *esProcessorImpl) sendToAckChan(visibilityTaskKey string, ack bool) {
// Use RemoveIf here to prevent race condition with de-dup logic in Add method.
_ = p.mapToAckChan.RemoveIf(visibilityTaskKey, func(key interface{}, value interface{}) bool {
ackChWithStopwatch, ok := value.(*ackChanWithStopwatch)
if !ok { // must be bug in code and bad deployment
p.logger.Fatal(fmt.Sprintf("Message has wrong type %T (%T expected).", value, &ackChanWithStopwatch{}), tag.ESKey(visibilityTaskKey))
if !ok {
p.logger.Fatal(fmt.Sprintf("mapToAckChan has item of a wrong type %T (%T expected).", value, &ackChanWithStopwatch{}), tag.ESKey(visibilityTaskKey))
}

ackChWithStopwatch.addToProcessStopwatch.Stop()
Expand Down Expand Up @@ -304,56 +297,6 @@ func (p *esProcessorImpl) getVisibilityTaskKey(request elastic.BulkableRequest)
return key
}

func (p *esProcessorImpl) getDocIDs(request elastic.BulkableRequest) (workflowID string, runID string, namespaceID string) {
// TODO (alex): This need to be combined with getVisibilityTaskKey
req, err := request.Source()
if err != nil {
p.logger.Error("Get request source err.", tag.Error(err), tag.ESRequest(request.String()))
p.metricsClient.IncCounter(metrics.ElasticSearchVisibility, metrics.ESBulkProcessorCorruptedData)
return
}

if len(req) == 2 { // index or update requests
var body map[string]interface{}
if err := json.Unmarshal([]byte(req[1]), &body); err != nil {
p.logger.Error("Unmarshal index request body err.", tag.Error(err))
p.metricsClient.IncCounter(metrics.ElasticSearchVisibility, metrics.ESBulkProcessorCorruptedData)
return
}

wID, _ := body[definition.WorkflowID]
workflowID, _ = wID.(string)

rID, _ := body[definition.RunID]
runID, _ = rID.(string)

nID, _ := body[definition.NamespaceID]
namespaceID, _ = nID.(string)
} else { // delete requests
var body map[string]map[string]interface{}
if err := json.Unmarshal([]byte(req[0]), &body); err != nil {
p.logger.Error("Unmarshal delete request body err.", tag.Error(err))
p.metricsClient.IncCounter(metrics.ElasticSearchVisibility, metrics.ESBulkProcessorCorruptedData)
return
}

opMap, ok := body["delete"]
if !ok {
return
}
id, _ := opMap["_id"]
docID, _ := id.(string)
wrIDs := strings.Split(docID, delimiter)
if len(wrIDs) > 0 {
workflowID = wrIDs[0]
}
if len(wrIDs) > 1 {
runID = wrIDs[1]
}
}
return
}

// 409 - Version Conflict
// 404 - Not Found
func isResponseSuccess(status int) bool {
Expand All @@ -379,11 +322,10 @@ func isResponseRetryable(status int) bool {
}

func getErrorMsgFromESResp(resp *elastic.BulkResponseItem) string {
var errMsg string
if resp.Error != nil {
errMsg = resp.Error.Reason
return resp.Error.Reason
}
return errMsg
return ""
}

func newAckChanWithStopwatch(ackCh chan<- bool, stopwatch *tally.Stopwatch) *ackChanWithStopwatch {
Expand Down
43 changes: 0 additions & 43 deletions common/persistence/elasticsearch/esProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,49 +350,6 @@ func (s *esProcessorSuite) TestHashFn() {
s.NotEqual(uint32(0), s.esProcessor.hashFn("test"))
}

// func (s *esProcessorSuite) getEncodedMsg(wid string, rid string, namespaceID string) []byte {
// indexMsg := &indexerspb.Message{
// NamespaceId: namespaceID,
// WorkflowId: wid,
// RunId: rid,
// }
// payload, err := s.esProcessor.msgEncoder.Encode(indexMsg)
// s.NoError(err)
// return payload
// }
//
func (s *esProcessorSuite) TestGetDocIDs() {
testKey := "test-key"
testWid := "test-workflowID"
testRid := "test-runID"
testNamespaceid := "test-namespaceID"

request := elastic.NewBulkIndexRequest().
Doc(map[string]interface{}{
definition.VisibilityTaskKey: testKey,
definition.NamespaceID: testNamespaceid,
definition.WorkflowID: testWid,
definition.RunID: testRid,
})

wid, rid, namespaceID := s.esProcessor.getDocIDs(request)
s.Equal(testWid, wid)
s.Equal(testRid, rid)
s.Equal(testNamespaceid, namespaceID)
}

func (s *esProcessorSuite) TestGetDocIDs_Error() {
testKey := "test-key"
request := elastic.NewBulkIndexRequest().
Doc(map[string]interface{}{
definition.VisibilityTaskKey: testKey,
})
wid, rid, namespaceID := s.esProcessor.getDocIDs(request)
s.Equal("", wid)
s.Equal("", rid)
s.Equal("", namespaceID)
}

func (s *esProcessorSuite) TestGetVisibilityTaskKey() {
request := elastic.NewBulkIndexRequest()
s.PanicsWithValue("VisibilityTaskKey not found", func() { s.esProcessor.getVisibilityTaskKey(request) })
Expand Down
1 change: 1 addition & 0 deletions config/development_es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ persistence:
es-visibility:
elasticsearch:
version: "v7"
logLevel: "error"
url:
scheme: "http"
host: "127.0.0.1:9200"
Expand Down
1 change: 1 addition & 0 deletions config/development_mysql-es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ persistence:
es-visibility:
elasticsearch:
version: "v7"
logLevel: "error"
url:
scheme: "http"
host: "127.0.0.1:9200"
Expand Down

0 comments on commit 089f746

Please sign in to comment.