Skip to content

Commit

Permalink
Merge branch 'master' into http-compression
Browse files Browse the repository at this point in the history
  • Loading branch information
black-adder authored Nov 17, 2017
2 parents ce9f725 + acfbf29 commit f157f97
Show file tree
Hide file tree
Showing 31 changed files with 2,140 additions and 47 deletions.
14 changes: 12 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
PROJECT_ROOT=github.com/jaegertracing/jaeger
TOP_PKGS := $(shell glide novendor | grep -v -e ./thrift-gen/... -e ./examples/... -e ./scripts/...)
TOP_PKGS := $(shell glide novendor | grep -v -e ./thrift-gen/... -e swagger-gen... -e ./examples/... -e ./scripts/...)

# all .go files that don't exist in hidden directories
ALL_SRC := $(shell find . -name "*.go" | grep -v -e vendor -e thrift-gen \
ALL_SRC := $(shell find . -name "*.go" | grep -v -e vendor -e thrift-gen -e swagger-gen \
-e ".*/\..*" \
-e ".*/_.*" \
-e ".*/mocks.*")
Expand Down Expand Up @@ -34,6 +34,11 @@ THRIFT_GO_ARGS=thrift_import="github.com/apache/thrift/lib/go/thrift"
THRIFT_GEN=$(shell which thrift-gen)
THRIFT_GEN_DIR=thrift-gen

SWAGGER_VER=0.12.0
SWAGGER_IMAGE=quay.io/goswagger/swagger:$(SWAGGER_VER)
SWAGGER=docker run --rm -it -u ${shell id -u} -v "${PWD}:/go/src/${PROJECT_ROOT}" -w /go/src/${PROJECT_ROOT} $(SWAGGER_IMAGE)
SWAGGER_GEN_DIR=swagger-gen

PASS=$(shell printf "\033[32mPASS\033[0m")
FAIL=$(shell printf "\033[31mFAIL\033[0m")
COLORIZE=$(SED) ''/PASS/s//$(PASS)/'' | $(SED) ''/FAIL/s//$(FAIL)/''
Expand Down Expand Up @@ -219,6 +224,11 @@ idl-submodule:
thrift-image:
$(THRIFT) -version

.PHONY: generate-zipkin-swagger
generate-zipkin-swagger: idl-submodule
$(SWAGGER) generate server -f ./idl/swagger/zipkin2-api.yaml -t $(SWAGGER_GEN_DIR) -O PostSpans --exclude-main
rm $(SWAGGER_GEN_DIR)/restapi/operations/post_spans_urlbuilder.go $(SWAGGER_GEN_DIR)/restapi/server.go $(SWAGGER_GEN_DIR)/restapi/configure_zipkin.go $(SWAGGER_GEN_DIR)/models/trace.go $(SWAGGER_GEN_DIR)/models/list_of_traces.go $(SWAGGER_GEN_DIR)/models/dependency_link.go

.PHONY: docs
docs: $(MKDOCS_VIRTUAL_ENV)
bash -c 'source $(MKDOCS_VIRTUAL_ENV)/bin/activate; mkdocs serve'
Expand Down
28 changes: 28 additions & 0 deletions cmd/collector/app/zipkin/fixtures/zipkin_01.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[
{
"traceId":"1",
"id":"2",
"parentId": "1",
"name":"foo",
"kind": "CLIENT",
"debug": true,
"shared": true,
"timestamp": 1,
"duration": 10,
"localEndpoint":{
"serviceName":"foo",
"ipv4":"10.43.17.42"
},
"remoteEndpoint":{
"serviceName":"bar",
"ipv4":"10.43.17.43"
},
"annotations": [{
"value": "foo",
"timestamp": 1
}],
"tags": {
"foo": "bar"
}
}
]
88 changes: 81 additions & 7 deletions cmd/collector/app/zipkin/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,54 @@ package zipkin
import (
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/go-openapi/loads"
"github.com/go-openapi/strfmt"
"github.com/go-openapi/swag"
"github.com/gorilla/mux"
tchanThrift "github.com/uber/tchannel-go/thrift"

"github.com/jaegertracing/jaeger/cmd/collector/app"
"github.com/jaegertracing/jaeger/swagger-gen/models"
"github.com/jaegertracing/jaeger/swagger-gen/restapi"
"github.com/jaegertracing/jaeger/swagger-gen/restapi/operations"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

// APIHandler handles all HTTP calls to the collector
type APIHandler struct {
zipkinSpansHandler app.ZipkinSpansHandler
zipkinV2Formats strfmt.Registry
}

// NewAPIHandler returns a new APIHandler
func NewAPIHandler(
zipkinSpansHandler app.ZipkinSpansHandler,
) *APIHandler {
swaggerSpec, _ := loads.Analyzed(restapi.SwaggerJSON, "")
return &APIHandler{
zipkinSpansHandler: zipkinSpansHandler,
zipkinV2Formats: operations.NewZipkinAPI(swaggerSpec).Formats(),
}
}

// RegisterRoutes registers Zipkin routes
func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/api/v1/spans", aH.saveSpans).Methods(http.MethodPost)
router.HandleFunc("/api/v2/spans", aH.saveSpansV2).Methods(http.MethodPost)
}

func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
bRead := r.Body
defer r.Body.Close()

if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") {
gz, err := gzip.NewReader(r.Body)
gz, err := gunzip(bRead)
if err != nil {
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
Expand Down Expand Up @@ -84,15 +94,79 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
return
}

if len(tSpans) > 0 {
ctx, _ := tchanThrift.NewContext(time.Minute)
if _, err = aH.zipkinSpansHandler.SubmitZipkinBatch(ctx, tSpans); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError)
if err := aH.saveThriftSpans(tSpans); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusAccepted)
}

func (aH *APIHandler) saveSpansV2(w http.ResponseWriter, r *http.Request) {
bRead := r.Body
defer r.Body.Close()
if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") {
gz, err := gunzip(bRead)
if err != nil {
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}
defer gz.Close()
bRead = gz
}

w.WriteHeader(http.StatusAccepted)
bodyBytes, err := ioutil.ReadAll(bRead)
if err != nil {
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusInternalServerError)
return
}

contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
http.Error(w, "Unsupported Content-Type", http.StatusBadRequest)
return
}

var spans models.ListOfSpans
if err = swag.ReadJSON(bodyBytes, &spans); err != nil {
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}
if err = spans.Validate(aH.zipkinV2Formats); err != nil {
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}

tSpans, err := spansV2ToThrift(spans)
if err != nil {
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}

if err := aH.saveThriftSpans(tSpans); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError)
return
}

w.WriteHeader(operations.PostSpansAcceptedCode)
}

func gunzip(r io.ReadCloser) (*gzip.Reader, error) {
gz, err := gzip.NewReader(r)
if err != nil {
return nil, err
}
return gz, nil
}

func (aH *APIHandler) saveThriftSpans(tSpans []*zipkincore.Span) error {
if len(tSpans) > 0 {
ctx, _ := tchanThrift.NewContext(time.Minute)
if _, err := aH.zipkinSpansHandler.SubmitZipkinBatch(ctx, tSpans); err != nil {
return err
}
}
return nil
}

func deserializeThrift(b []byte) ([]*zipkincore.Span, error) {
Expand Down
58 changes: 47 additions & 11 deletions cmd/collector/app/zipkin/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,13 @@ func waitForSpans(t *testing.T, handler *mockZipkinHandler, expecting int) {
}

func TestThriftFormat(t *testing.T) {
server, handler := initializeTestServer(nil)
server, _ := initializeTestServer(nil)
defer server.Close()

bodyBytes := zipkinSerialize([]*zipkincore.Span{{}})
statusCode, resBodyStr, err := postBytes(server.URL+`/api/v1/spans`, bodyBytes, createHeader("application/x-thrift"))
assert.NoError(t, err)
assert.EqualValues(t, http.StatusAccepted, statusCode)
assert.EqualValues(t, "", resBodyStr)

handler.zipkinSpansHandler.(*mockZipkinHandler).err = fmt.Errorf("Bad times ahead")
statusCode, resBodyStr, err = postBytes(server.URL+`/api/v1/spans`, bodyBytes, createHeader("application/x-thrift"))
assert.NoError(t, err)
assert.EqualValues(t, http.StatusInternalServerError, statusCode)
assert.EqualValues(t, "Cannot submit Zipkin batch: Bad times ahead\n", resBodyStr)
}

func TestJsonFormat(t *testing.T) {
Expand Down Expand Up @@ -228,9 +221,52 @@ func TestCannotReadBodyFromRequest(t *testing.T) {
req, err := http.NewRequest(http.MethodPost, "whatever", &errReader{})
assert.NoError(t, err)
rw := dummyResponseWriter{}
handler.saveSpans(&rw, req)
assert.EqualValues(t, http.StatusInternalServerError, rw.myStatusCode)
assert.EqualValues(t, "Unable to process request body: Simulated error reading body\n", rw.myBody)

tests := []struct {
handler func(w http.ResponseWriter, r *http.Request)
}{
{handler: handler.saveSpans},
{handler: handler.saveSpansV2},
}
for _, test := range tests {
test.handler(&rw, req)
assert.EqualValues(t, http.StatusInternalServerError, rw.myStatusCode)
assert.EqualValues(t, "Unable to process request body: Simulated error reading body\n", rw.myBody)
}
}

func TestSaveSpansV2(t *testing.T) {
server, handler := initializeTestServer(nil)
defer server.Close()
tests := []struct {
body []byte
resBody string
code int
headers map[string]string
}{
{body: []byte("[]"), code: http.StatusAccepted},
{body: gzipEncode([]byte("[]")), code: http.StatusAccepted, headers: map[string]string{"Content-Encoding": "gzip"}},
{body: []byte("[]"), code: http.StatusBadRequest, headers: map[string]string{"Content-Type": "text/html"}, resBody: "Unsupported Content-Type\n"},
{body: []byte("[]"), code: http.StatusBadRequest, headers: map[string]string{"Content-Encoding": "gzip"}, resBody: "Unable to process request body: unexpected EOF\n"},
{body: []byte("not good"), code: http.StatusBadRequest, resBody: "Unable to process request body: invalid character 'o' in literal null (expecting 'u')\n"},
{body: []byte("[{}]"), code: http.StatusBadRequest, resBody: "Unable to process request body: validation failure list:\nid in body is required\ntraceId in body is required\n"},
{body: []byte(`[{"id":"1111111111111111", "traceId":"1111111111111111", "localEndpoint": {"ipv4": "A"}}]`), code: http.StatusBadRequest, resBody: "Unable to process request body: wrong ipv4\n"},
}
for _, test := range tests {
h := createHeader("application/json")
for k, v := range test.headers {
h.Set(k, v)
}
statusCode, resBody, err := postBytes(server.URL+`/api/v2/spans`, test.body, h)
require.NoError(t, err)
assert.EqualValues(t, test.code, statusCode)
assert.EqualValues(t, test.resBody, resBody)
}
handler.zipkinSpansHandler.(*mockZipkinHandler).err = fmt.Errorf("Bad times ahead")
statusCode, resBody, err := postBytes(server.URL+`/api/v2/spans`, []byte(`[{"id":"1111111111111111", "traceId":"1111111111111111"}]`), createHeader("application/json"))
require.NoError(t, err)
assert.EqualValues(t, http.StatusInternalServerError, statusCode)
assert.EqualValues(t, "Cannot submit Zipkin batch: Bad times ahead\n", resBody)
}

type errReader struct{}
Expand Down
26 changes: 16 additions & 10 deletions cmd/collector/app/zipkin/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,29 +153,35 @@ func cutLongID(id string) string {
}

func endpointToThrift(e endpoint) (*zipkincore.Endpoint, error) {
ipv4, err := parseIpv4(e.IPv4)
return eToThrift(e.IPv4, e.IPv6, e.Port, e.ServiceName)
}

func eToThrift(ip4 string, ip6 string, p int32, service string) (*zipkincore.Endpoint, error) {
ipv4, err := parseIpv4(ip4)
if err != nil {
return nil, err
}
port := e.Port
if port >= (1 << 15) {
// Zipkin.thrift defines port as i16, so values between (2^15 and 2^16-1) must be encoded as negative
port = port - (1 << 16)
}

ipv6, err := parseIpv6(e.IPv6)
port := port(p)
ipv6, err := parseIpv6(string(ip6))
if err != nil {
return nil, err
}

return &zipkincore.Endpoint{
ServiceName: e.ServiceName,
ServiceName: service,
Port: int16(port),
Ipv4: ipv4,
Ipv6: ipv6,
}, nil
}

func port(p int32) int32 {
if p >= (1 << 15) {
// Zipkin.thrift defines port as i16, so values between (2^15 and 2^16-1) must be encoded as negative
p = p - (1 << 16)
}
return p
}

func annoToThrift(a annotation) (*zipkincore.Annotation, error) {
endpoint, err := endpointToThrift(a.Endpoint)
if err != nil {
Expand Down
Loading

0 comments on commit f157f97

Please sign in to comment.