Skip to content

Commit

Permalink
Add Jaeger Thrift HTTP exporter (gabime#926)
Browse files Browse the repository at this point in the history
  • Loading branch information
seemk authored Jul 28, 2021
1 parent a1db1ca commit 914df66
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Increment the:
## [Unreleased]

* [BUILD] Allow to use local GSL
* [EXPORTER] Jaeger Exporter - Add Thrift HTTP exporter ([#926](https://github.com/open-telemetry/opentelemetry-cpp/pull/926))

## [1.0.0-rc3] 2021-07-12

Expand Down
2 changes: 1 addition & 1 deletion examples/jaeger/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ int main(int argc, char *argv[])
{
if (argc == 2)
{
opts.server_addr = argv[1];
opts.endpoint = argv[1];
}
// Removing this line will leave the default noop TracerProvider in place.
InitTracer();
Expand Down
11 changes: 8 additions & 3 deletions exporters/jaeger/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ set(JAEGER_THRIFT_GENCPP_SOURCES
thrift-gen/zipkincore_types.cpp)

set(JAEGER_EXPORTER_SOURCES
src/jaeger_exporter.cc src/thrift_sender.cc src/udp_transport.cc
src/recordable.cc src/TUDPTransport.cc)
src/jaeger_exporter.cc
src/thrift_sender.cc
src/udp_transport.cc
src/recordable.cc
src/TUDPTransport.cc
src/http_transport.cc
src/THttpTransport.cc)

add_library(opentelemetry_exporter_jaeger_trace ${JAEGER_EXPORTER_SOURCES}
${JAEGER_THRIFT_GENCPP_SOURCES})
Expand All @@ -26,7 +31,7 @@ target_include_directories(

target_link_libraries(
opentelemetry_exporter_jaeger_trace
PUBLIC opentelemetry_resources
PUBLIC opentelemetry_resources http_client_curl
PRIVATE thrift::thrift)

if(MSVC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#include <opentelemetry/ext/http/client/http_client.h>
#include <opentelemetry/sdk/trace/exporter.h>

OPENTELEMETRY_BEGIN_NAMESPACE
Expand All @@ -25,10 +26,11 @@ class ThriftSender;
*/
struct JaegerExporterOptions
{
// The endpoint to export to.
std::string server_addr = "localhost";
uint16_t server_port = 6831;
TransportFormat transport_format = TransportFormat::kThriftUdpCompact;
std::string endpoint = "localhost";
uint16_t server_port = 6831;
// Only applicable when using kThriftHttp transport.
ext::http::client::Headers headers;
};

namespace trace_sdk = opentelemetry::sdk::trace;
Expand Down
61 changes: 61 additions & 0 deletions exporters/jaeger/src/THttpTransport.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include "THttpTransport.h"
#include "opentelemetry/ext/http/client/http_client_factory.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
{
namespace jaeger
{

THttpTransport::THttpTransport(std::string endpoint, ext::http::client::Headers extra_headers)
: endpoint(std::move(endpoint)),
headers(std::move(extra_headers)),
client(ext::http::client::HttpClientFactory::CreateSync())
{
headers.insert({{"Content-Type", "application/vnd.apache.thrift.binary"}});
}

THttpTransport::~THttpTransport() {}

bool THttpTransport::isOpen() const
{
return true;
}

uint32_t THttpTransport::read(uint8_t *buf, uint32_t len)
{
(void)buf;
(void)len;
return 0;
}

void THttpTransport::write(const uint8_t *buf, uint32_t len)
{
request_buffer.insert(request_buffer.end(), buf, buf + len);
}

bool THttpTransport::sendSpans()
{
auto result = client->Post(endpoint, request_buffer, headers);
request_buffer.clear();

// TODO: Add logging once global log handling is available.
if (!result)
{
return false;
}

if (result.GetResponse().GetStatusCode() >= 400)
{
return false;
}

return true;
}

} // namespace jaeger
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
40 changes: 40 additions & 0 deletions exporters/jaeger/src/THttpTransport.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once

#include <opentelemetry/ext/http/client/http_client.h>
#include <opentelemetry/version.h>

#include <thrift/transport/TVirtualTransport.h>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
{
namespace jaeger
{

class THttpTransport : public apache::thrift::transport::TVirtualTransport<THttpTransport>
{
public:
THttpTransport(std::string endpoint, ext::http::client::Headers extra_headers);
~THttpTransport() override;

bool isOpen() const override;

uint32_t read(uint8_t *buf, uint32_t len);

void write(const uint8_t *buf, uint32_t len);

bool sendSpans();

private:
std::string endpoint;
ext::http::client::Headers headers;
std::shared_ptr<ext::http::client::HttpClientSync> client;
std::vector<uint8_t> request_buffer;
};

} // namespace jaeger
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
37 changes: 37 additions & 0 deletions exporters/jaeger/src/http_transport.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include "http_transport.h"

#include <thrift/protocol/TBinaryProtocol.h>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
{
namespace jaeger
{

using TBinaryProtocol = apache::thrift::protocol::TBinaryProtocol;
using TTransport = apache::thrift::transport::TTransport;

HttpTransport::HttpTransport(std::string endpoint, ext::http::client::Headers headers)
{
endpoint_transport_ = std::make_shared<THttpTransport>(std::move(endpoint), std::move(headers));
protocol_ = std::shared_ptr<TProtocol>(new TBinaryProtocol(endpoint_transport_));
}

int HttpTransport::EmitBatch(const thrift::Batch &batch)
{
batch.write(protocol_.get());

if (!endpoint_transport_->sendSpans())
{
return 0;
}

return static_cast<int>(batch.spans.size());
}

} // namespace jaeger
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
39 changes: 39 additions & 0 deletions exporters/jaeger/src/http_transport.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include "THttpTransport.h"
#include "transport.h"

#include <thrift/protocol/TProtocol.h>
#include <thrift/transport/TTransport.h>
#include <memory>
#include <string>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
{
namespace jaeger
{

using TProtocol = apache::thrift::protocol::TProtocol;

class HttpTransport : public Transport
{
public:
HttpTransport(std::string endpoint, ext::http::client::Headers headers);

int EmitBatch(const thrift::Batch &batch) override;

uint32_t MaxPacketSize() const override
{
// Default to 4 MiB POST body size.
return 1 << 22;
}

private:
std::shared_ptr<THttpTransport> endpoint_transport_;
std::shared_ptr<TProtocol> protocol_;
};

} // namespace jaeger
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
16 changes: 12 additions & 4 deletions exporters/jaeger/src/jaeger_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <opentelemetry/exporters/jaeger/jaeger_exporter.h>
#include <opentelemetry/exporters/jaeger/recordable.h>

#include "http_transport.h"
#include "thrift_sender.h"
#include "udp_transport.h"

Expand Down Expand Up @@ -64,14 +65,21 @@ void JaegerExporter::InitializeEndpoint()
{
// TODO: do we need support any authentication mechanism?
auto transport = std::unique_ptr<Transport>(
static_cast<Transport *>(new UDPTransport(options_.server_addr, options_.server_port)));
static_cast<Transport *>(new UDPTransport(options_.endpoint, options_.server_port)));
sender_ = std::unique_ptr<ThriftSender>(new ThriftSender(std::move(transport)));
return;
}
else

if (options_.transport_format == TransportFormat::kThriftHttp)
{
// The transport format is not implemented.
assert(false);
auto transport =
std::unique_ptr<HttpTransport>(new HttpTransport(options_.endpoint, options_.headers));
sender_ = std::unique_ptr<ThriftSender>(new ThriftSender(std::move(transport)));
return;
}

// The transport format is not implemented.
assert(false);
}

} // namespace jaeger
Expand Down
4 changes: 2 additions & 2 deletions exporters/jaeger/src/thrift_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ int ThriftSender::Flush()
batch.__set_process(process_);
batch.__set_spans(span_buffer_);

transport_->EmitBatch(batch);
int spans_flushed = transport_->EmitBatch(batch);

ResetBuffers();

return static_cast<int>(batch.spans.size());
return spans_flushed;
}

void ThriftSender::Close()
Expand Down
4 changes: 2 additions & 2 deletions exporters/jaeger/src/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class Transport
Transport() = default;
virtual ~Transport() = default;

virtual void EmitBatch(const thrift::Batch &batch) = 0;
virtual uint32_t MaxPacketSize() const = 0;
virtual int EmitBatch(const thrift::Batch &batch) = 0;
virtual uint32_t MaxPacketSize() const = 0;
};

} // namespace jaeger
Expand Down
4 changes: 3 additions & 1 deletion exporters/jaeger/src/udp_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,16 @@ void UDPTransport::CleanSocket()
#endif
}

void UDPTransport::EmitBatch(const thrift::Batch &batch)
int UDPTransport::EmitBatch(const thrift::Batch &batch)
{
try
{
agent_->emitBatch(batch);
}
catch (...)
{}

return static_cast<int>(batch.spans.size());
}

} // namespace jaeger
Expand Down
3 changes: 1 addition & 2 deletions exporters/jaeger/src/udp_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ using TBinaryProtocol = apache::thrift::protocol::TBinaryProtocol;
using TCompactProtocol = apache::thrift::protocol::TCompactProtocol;
using TBufferedTransport = apache::thrift::transport::TBufferedTransport;
using TProtocol = apache::thrift::protocol::TProtocol;
using TSocket = apache::thrift::transport::TSocket;
using TTransport = apache::thrift::transport::TTransport;

class UDPTransport : public Transport
Expand All @@ -38,7 +37,7 @@ class UDPTransport : public Transport
UDPTransport(const std::string &addr, uint16_t port);
virtual ~UDPTransport();

void EmitBatch(const thrift::Batch &batch) override;
int EmitBatch(const thrift::Batch &batch) override;

uint32_t MaxPacketSize() const override { return max_packet_size_; }

Expand Down

0 comments on commit 914df66

Please sign in to comment.