Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add mutex to span attributes #3371

Merged
merged 6 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions agent/workers/datastores/awsxray.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func segToSpans(seg segment, traceID string, parent *traces.Span) ([]traces.Span
}

func generateSpan(seg *segment, parent *traces.Span) (traces.Span, error) {
attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()
span := traces.Span{
Parent: parent,
Name: *seg.Name,
Expand All @@ -206,9 +206,9 @@ func generateSpan(seg *segment, parent *traces.Span) (traces.Span, error) {
return span, err
}

attributes[traces.TracetestMetadataFieldParentID] = parentID.String()
attributes.Set(traces.TracetestMetadataFieldParentID, parentID.String())
} else if parent != nil {
attributes[traces.TracetestMetadataFieldParentID] = parent.ID.String()
attributes.Set(traces.TracetestMetadataFieldParentID, parent.ID.String())
}

// decode span id
Expand All @@ -229,7 +229,7 @@ func generateSpan(seg *segment, parent *traces.Span) (traces.Span, error) {
}

if seg.InProgress != nil {
attributes[AWSXRayInProgressAttribute] = strconv.FormatBool(*seg.InProgress)
attributes.Set(AWSXRayInProgressAttribute, strconv.FormatBool(*seg.InProgress))
}

attributes.SetPointerValue(conventions.AttributeEnduserID, seg.User)
Expand All @@ -241,7 +241,7 @@ func generateSpan(seg *segment, parent *traces.Span) (traces.Span, error) {
}

if seg.Traced != nil {
attributes[AWSXRayTracedAttribute] = strconv.FormatBool(*seg.Traced)
attributes.Set(AWSXRayTracedAttribute, strconv.FormatBool(*seg.Traced))
}

addAnnotations(seg.Annotations, attributes)
Expand Down Expand Up @@ -288,21 +288,21 @@ func addHTTP(seg *segment, attributes traces.Attributes) {
attributes.SetPointerValue(conventions.AttributeHTTPURL, req.URL)

if req.XForwardedFor != nil {
attributes[AWSXRayXForwardedForAttribute] = strconv.FormatBool(*req.XForwardedFor)
attributes.Set(AWSXRayXForwardedForAttribute, strconv.FormatBool(*req.XForwardedFor))
}
}

if resp := seg.HTTP.Response; resp != nil {
if resp.status != nil {
attributes[conventions.AttributeHTTPStatusCode] = fmt.Sprintf("%v", *resp.status)
attributes.Set(conventions.AttributeHTTPStatusCode, fmt.Sprintf("%v", *resp.status))
}

switch val := resp.contentLength.(type) {
case string:
attributes[conventions.AttributeHTTPResponseContentLength] = val
attributes.Set(conventions.AttributeHTTPResponseContentLength, val)
case float64:
lengthPointer := int64(val)
attributes[conventions.AttributeHTTPResponseContentLength] = fmt.Sprintf("%v", lengthPointer)
attributes.Set(conventions.AttributeHTTPResponseContentLength, fmt.Sprintf("%v", lengthPointer))
}
}
}
Expand All @@ -317,7 +317,7 @@ func addAWSToSpan(aws *aWSData, attrs traces.Attributes) {
attrs.SetPointerValue(AWSTableNameAttribute, aws.TableName)

if aws.Retries != nil {
attrs[AWSXrayRetriesAttribute] = fmt.Sprintf("%v", *aws.Retries)
attrs.Set(AWSXrayRetriesAttribute, fmt.Sprintf("%v", *aws.Retries))
}
}
}
Expand All @@ -333,8 +333,8 @@ func addSQLToSpan(sql *sQLData, attrs traces.Attributes) error {
return err
}

attrs[conventions.AttributeDBConnectionString] = dbURL
attrs[conventions.AttributeDBName] = dbName
attrs.Set(conventions.AttributeDBConnectionString, dbURL)
attrs.Set(conventions.AttributeDBName, dbName)
}
// not handling sql.ConnectionString for now because the X-Ray exporter
// does not support it
Expand All @@ -349,19 +349,19 @@ func addAnnotations(annos map[string]interface{}, attrs traces.Attributes) {
for k, v := range annos {
switch t := v.(type) {
case int:
attrs[k] = fmt.Sprintf("%v", t)
attrs.Set(k, fmt.Sprintf("%v", t))
case int32:
attrs[k] = fmt.Sprintf("%v", t)
attrs.Set(k, fmt.Sprintf("%v", t))
case int64:
attrs[k] = fmt.Sprintf("%v", t)
attrs.Set(k, fmt.Sprintf("%v", t))
case string:
attrs[k] = t
attrs.Set(k, t)
case bool:
attrs[k] = strconv.FormatBool(t)
attrs.Set(k, strconv.FormatBool(t))
case float32:
attrs[k] = fmt.Sprintf("%v", t)
attrs.Set(k, fmt.Sprintf("%v", t))
case float64:
attrs[k] = fmt.Sprintf("%v", t)
attrs.Set(k, fmt.Sprintf("%v", t))
default:
}
}
Expand All @@ -374,7 +374,7 @@ func addMetadata(meta map[string]map[string]interface{}, attrs traces.Attributes
if err != nil {
return err
}
attrs[AWSXraySegmentMetadataAttributePrefix+k] = string(val)
attrs.Set(AWSXraySegmentMetadataAttributePrefix+k, string(val))
}
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions agent/workers/datastores/azureappinsights.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func parseEvent(row spanRow) (traces.SpanEvent, error) {

event.Timestamp = timestamp

attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()
rawAttributes := row.Get("customDimensions").(string)
err = json.Unmarshal([]byte(rawAttributes), &attributes)
if err != nil {
Expand All @@ -255,7 +255,7 @@ func parseEvent(row spanRow) (traces.SpanEvent, error) {
}

func parseRowToSpan(row spanRow) (traces.Span, error) {
attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()
span := traces.Span{
Attributes: attributes,
}
Expand Down Expand Up @@ -313,25 +313,25 @@ func parseSpanID(span *traces.Span, value any) error {
}

func parseAttributes(span *traces.Span, value any) error {
attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()
rawAttributes := value.(string)
err := json.Unmarshal([]byte(rawAttributes), &attributes)
if err != nil {
return fmt.Errorf("failed to parse attributes: %w", err)
}

for key, value := range attributes {
span.Attributes[key] = value
for key, value := range attributes.Values() {
span.Attributes.Set(key, value)
}
return nil
}

func parseParentID(span *traces.Span, value any) error {
rawParentID, ok := value.(string)
if ok {
span.Attributes[traces.TracetestMetadataFieldParentID] = rawParentID
span.Attributes.Set(traces.TracetestMetadataFieldParentID, rawParentID)
} else {
span.Attributes[traces.TracetestMetadataFieldParentID] = ""
span.Attributes.Set(traces.TracetestMetadataFieldParentID, "")
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions agent/workers/datastores/elasticsearchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,19 +205,19 @@ func convertElasticSearchSpanIntoSpan(input map[string]interface{}) traces.Span
endTime := startTime.Add(time.Microsecond * time.Duration(duration))

// Attributes
attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()

for attrName, attrValue := range flatInput {
name := attrName
name = strings.ReplaceAll(name, "transaction.", "")
name = strings.ReplaceAll(name, "span.", "")
attributes[name] = fmt.Sprintf("%v", attrValue)
attributes.Set(name, fmt.Sprintf("%v", attrValue))
}

// ParentId
parentId := flatInput["parent.id"]
if parentId != nil {
attributes[traces.TracetestMetadataFieldParentID] = flatInput["parent.id"].(string)
attributes.Set(traces.TracetestMetadataFieldParentID, flatInput["parent.id"].(string))
}

return traces.Span{
Expand Down
8 changes: 4 additions & 4 deletions agent/workers/datastores/opensearchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func convertOpensearchSpanIntoSpan(input map[string]interface{}) traces.Span {
startTime, _ := time.Parse(time.RFC3339, input["startTime"].(string))
endTime, _ := time.Parse(time.RFC3339, input["endTime"].(string))

attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()

for attrName, attrValue := range input {
if !strings.HasPrefix(attrName, "span.attributes.") && !strings.HasPrefix(attrName, "resource.attributes.") {
Expand All @@ -158,11 +158,11 @@ func convertOpensearchSpanIntoSpan(input map[string]interface{}) traces.Span {
// Opensearch's data-prepper replaces "." with "@". We have to revert it. Example:
// "service.name" becomes "service@name"
name = strings.ReplaceAll(name, "@", ".")
attributes[name] = fmt.Sprintf("%v", attrValue)
attributes.Set(name, fmt.Sprintf("%v", attrValue))
}

attributes[traces.TracetestMetadataFieldKind] = input["kind"].(string)
attributes[traces.TracetestMetadataFieldKind] = input["parentSpanId"].(string)
attributes.Set(traces.TracetestMetadataFieldKind, input["kind"].(string))
attributes.Set(traces.TracetestMetadataFieldKind, input["parentSpanId"].(string))

return traces.Span{
ID: spanId,
Expand Down
12 changes: 6 additions & 6 deletions agent/workers/datastores/signalfxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,18 @@ func (db signalfxDB) getSegmentSpans(ctx context.Context, traceID string, timest
}

func convertSignalFXSpan(in signalFXSpan) traces.Span {
attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()
for name, value := range in.Tags {
attributes[name] = value
attributes.Set(name, value)
}

for name, value := range in.ProcessTags {
attributes[name] = value
attributes.Set(name, value)
}

attributes[traces.TracetestMetadataFieldParentID] = in.ParentID
attributes[traces.TracetestMetadataFieldKind] = attributes["span.kind"]
delete(attributes, "span.kind")
attributes.Set(traces.TracetestMetadataFieldParentID, in.ParentID)
attributes.Set(traces.TracetestMetadataFieldKind, attributes.Get("span.kind"))
attributes.Delete("span.kind")

spanID, _ := trace.SpanIDFromHex(in.SpanID)
startTime, _ := time.Parse(time.RFC3339, in.StartTime)
Expand Down
4 changes: 2 additions & 2 deletions agent/workers/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ func convertProtoToDataStore(r *proto.DataStore) (*datastore.DataStore, error) {
func convertTraceInToProtoSpans(trace traces.Trace) []*proto.Span {
spans := make([]*proto.Span, 0, len(trace.Flat))
for _, span := range trace.Flat {
attributes := make([]*proto.KeyValuePair, 0, len(span.Attributes))
for name, value := range span.Attributes {
attributes := make([]*proto.KeyValuePair, 0, span.Attributes.Len())
for name, value := range span.Attributes.Values() {
attributes = append(attributes, &proto.KeyValuePair{
Key: name,
Value: value,
Expand Down
16 changes: 8 additions & 8 deletions server/assertions/selectors/selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,38 @@ var pokeshopTrace = traces.Trace{
ID: gen.TraceID(),
RootSpan: traces.Span{
ID: postImportSpanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"tracetest.span.type": "http",
"http.status_code": "201",
},
}),
Name: "POST /import",
Children: []*traces.Span{
{
ID: insertPokemonDatabaseSpanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"tracetest.span.type": "db",
"db.statement": "INSERT INTO pokemon (id) values (?)",
},
}),
Name: "Insert pokemon into database",
},
{
ID: getPokemonFromExternalAPISpanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop-worker",
"tracetest.span.type": "http",
"http.status_code": "200",
},
}),
Name: "Get pokemon from external API",
Children: []*traces.Span{
{
ID: updatePokemonDatabaseSpanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop-worker",
"tracetest.span.type": "db",
"db.statement": "UPDATE pokemon (name = ?) WHERE id = ?",
},
}),
Name: "Update pokemon on database",
},
},
Expand Down
20 changes: 10 additions & 10 deletions server/executor/assetion_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ func TestAssertion(t *testing.T) {
trace: traces.Trace{
RootSpan: traces.Span{
ID: spanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"tracetest.span.duration": "2000",
},
}),
},
},
expectedAllPassed: true,
Expand Down Expand Up @@ -78,10 +78,10 @@ func TestAssertion(t *testing.T) {
trace: traces.Trace{
RootSpan: traces.Span{
ID: spanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"tracetest.span.duration": "2000",
},
}),
},
},
expectedAllPassed: true,
Expand Down Expand Up @@ -124,11 +124,11 @@ func TestAssertion(t *testing.T) {
trace: traces.Trace{
RootSpan: traces.Span{
ID: spanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"http.response.body": `{"id":52}`,
"tracetest.span.duration": "21000000",
},
}),
},
},
expectedAllPassed: true,
Expand Down Expand Up @@ -169,11 +169,11 @@ func TestAssertion(t *testing.T) {
trace: traces.Trace{
RootSpan: traces.Span{
ID: spanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"http.response.body": `{"id":52}`,
"tracetest.span.duration": "25187564", // 25ms
},
}),
},
},
expectedAllPassed: true,
Expand Down Expand Up @@ -204,11 +204,11 @@ func TestAssertion(t *testing.T) {
trace: traces.Trace{
RootSpan: traces.Span{
ID: spanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"http.response.body": `{"id":52}`,
"tracetest.span.duration": "35000000", // 35ms
},
}),
},
},
expectedAllPassed: false,
Expand Down
Loading
Loading