Skip to content

Commit

Permalink
Add initial lime tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
fhanau committed Sep 13, 2024
1 parent cf8c335 commit 081f031
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 66 deletions.
4 changes: 2 additions & 2 deletions src/workerd/api/actor.c++
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public:
auto& context = IoContext::current();

return context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
[&](SpanBuilder& span, IoChannelFactory& ioChannelFactory) {
[&](SpanBuilder& span, LimeSpanBuilder&, IoChannelFactory& ioChannelFactory) {
if (span.isObserved()) {
span.setTag("actor_id"_kjc, kj::str(actorId));
}
Expand Down Expand Up @@ -65,7 +65,7 @@ public:
auto& context = IoContext::current();

return context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
[&](SpanBuilder& span, IoChannelFactory& ioChannelFactory) {
[&](SpanBuilder& span, LimeSpanBuilder&, IoChannelFactory& ioChannelFactory) {
if (span.isObserved()) {
span.setTag("actor_id"_kjc, id->toString());
}
Expand Down
10 changes: 9 additions & 1 deletion src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,15 @@ jsg::V8Ref<v8::Object> getTraceLogMessage(jsg::Lock& js, const Trace::Log& log)
}

kj::Array<jsg::Ref<TraceLog>> getTraceLogs(jsg::Lock& js, const Trace& trace) {
return KJ_MAP(x, trace.logs) -> jsg::Ref<TraceLog> { return jsg::alloc<TraceLog>(js, trace, x); };
auto builder = kj::heapArrayBuilder<jsg::Ref<TraceLog>>(trace.logs.size() + trace.spans.size());
for (auto i: kj::indices(trace.logs)) {
builder.add(jsg::alloc<TraceLog>(js, trace, trace.logs[i]));
}
// Add spans represented as logs to the logs object.
for (auto i: kj::indices(trace.spans)) {
builder.add(jsg::alloc<TraceLog>(js, trace, trace.spans[i]));
}
return builder.finish();
}

kj::Array<jsg::Ref<TraceDiagnosticChannelEvent>> getTraceDiagnosticChannelEvents(
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ wd_cc_library(
name = "trace",
srcs = ["trace.c++"],
hdrs = ["trace.h"],
implementation_deps = [
"//src/workerd/util:thread-scopes",
],
visibility = ["//visibility:public"],
deps = [
":worker-interface_capnp",
Expand Down
1 change: 1 addition & 0 deletions src/workerd/io/io-channels.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class IoChannelFactory {

// Specifies the parent span for the subrequest for tracing purposes.
SpanParent parentSpan = nullptr;
LimeSpanParent limeParentSpan = nullptr;

// Serialized JSON value to pass in ew_compat field of control header to FL. If this subrequest
// does not go directly to FL, this value is ignored. Flags marked with `$neededByFl` in
Expand Down
39 changes: 31 additions & 8 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -779,14 +779,19 @@ kj::Date IoContext::now() {
}

kj::Own<WorkerInterface> IoContext::getSubrequestNoChecks(
kj::FunctionParam<kj::Own<WorkerInterface>(SpanBuilder&, IoChannelFactory&)> func,
kj::FunctionParam<kj::Own<WorkerInterface>(SpanBuilder&, LimeSpanBuilder&, IoChannelFactory&)>
func,
SubrequestOptions options) {
SpanBuilder span = nullptr;
LimeSpanBuilder limeSpan = nullptr;

KJ_IF_SOME(n, options.operationName) {
span = makeTraceSpan(kj::mv(n));
// TODO(cleanup): Avoid cloning the string here if possible.
span = makeTraceSpan(kj::ConstString(kj::str(n)));
limeSpan = makeLimeTraceSpan(kj::ConstString(kj::mv(n)));
}

auto ret = func(span, getIoChannelFactory());
auto ret = func(span, limeSpan, getIoChannelFactory());

if (options.wrapMetrics) {
auto& metrics = getMetrics();
Expand All @@ -798,12 +803,16 @@ kj::Own<WorkerInterface> IoContext::getSubrequestNoChecks(
if (span.isObserved()) {
ret = ret.attach(kj::mv(span));
}
if (limeSpan.isObserved()) {
ret = ret.attach(kj::mv(limeSpan));
}

return kj::mv(ret);
}

kj::Own<WorkerInterface> IoContext::getSubrequest(
kj::FunctionParam<kj::Own<WorkerInterface>(SpanBuilder&, IoChannelFactory&)> func,
kj::FunctionParam<kj::Own<WorkerInterface>(SpanBuilder&, LimeSpanBuilder&, IoChannelFactory&)>
func,
SubrequestOptions options) {
limitEnforcer->newSubrequest(options.inHouse);
return getSubrequestNoChecks(kj::mv(func), kj::mv(options));
Expand All @@ -812,8 +821,9 @@ kj::Own<WorkerInterface> IoContext::getSubrequest(
kj::Own<WorkerInterface> IoContext::getSubrequestChannel(
uint channel, bool isInHouse, kj::Maybe<kj::String> cfBlobJson, kj::ConstString operationName) {
return getSubrequest(
[&](SpanBuilder& span, IoChannelFactory& channelFactory) {
return getSubrequestChannelImpl(channel, isInHouse, kj::mv(cfBlobJson), span, channelFactory);
[&](SpanBuilder& span, LimeSpanBuilder& limeSpan, IoChannelFactory& channelFactory) {
return getSubrequestChannelImpl(
channel, isInHouse, kj::mv(cfBlobJson), span, limeSpan, channelFactory);
},
SubrequestOptions{
.inHouse = isInHouse,
Expand All @@ -827,8 +837,9 @@ kj::Own<WorkerInterface> IoContext::getSubrequestChannelNoChecks(uint channel,
kj::Maybe<kj::String> cfBlobJson,
kj::Maybe<kj::ConstString> operationName) {
return getSubrequestNoChecks(
[&](SpanBuilder& span, IoChannelFactory& channelFactory) {
return getSubrequestChannelImpl(channel, isInHouse, kj::mv(cfBlobJson), span, channelFactory);
[&](SpanBuilder& span, LimeSpanBuilder& limeSpan, IoChannelFactory& channelFactory) {
return getSubrequestChannelImpl(
channel, isInHouse, kj::mv(cfBlobJson), span, limeSpan, channelFactory);
},
SubrequestOptions{
.inHouse = isInHouse,
Expand All @@ -841,10 +852,12 @@ kj::Own<WorkerInterface> IoContext::getSubrequestChannelImpl(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
SpanBuilder& span,
LimeSpanBuilder& limeSpan,
IoChannelFactory& channelFactory) {
IoChannelFactory::SubrequestMetadata metadata{
.cfBlobJson = kj::mv(cfBlobJson),
.parentSpan = span,
.limeParentSpan = limeSpan,
.featureFlagsForFl = worker->getIsolate().getFeatureFlagsForFl(),
};

Expand Down Expand Up @@ -909,10 +922,20 @@ SpanParent IoContext::getCurrentTraceSpan() {
return getMetrics().getSpan();
}

LimeSpanParent IoContext::getCurrentLimeTraceSpan() {
// TODO(o11y): Add support for retrieving span from storage scope lock for more accurate span
// context, as with Jaeger spans.
return getMetrics().getLimeSpan();
}

SpanBuilder IoContext::makeTraceSpan(kj::ConstString operationName) {
return getCurrentTraceSpan().newChild(kj::mv(operationName));
}

LimeSpanBuilder IoContext::makeLimeTraceSpan(kj::ConstString operationName) {
return getCurrentLimeTraceSpan().newLimeChild(kj::mv(operationName));
}

void IoContext::taskFailed(kj::Exception&& exception) {
if (waitUntilStatusValue == EventOutcome::OK) {
KJ_IF_SOME(status, limitEnforcer->getLimitsExceeded()) {
Expand Down
9 changes: 7 additions & 2 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -659,13 +659,15 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
};

kj::Own<WorkerInterface> getSubrequestNoChecks(
kj::FunctionParam<kj::Own<WorkerInterface>(SpanBuilder&, IoChannelFactory&)> func,
kj::FunctionParam<kj::Own<WorkerInterface>(SpanBuilder&, LimeSpanBuilder&, IoChannelFactory&)>
func,
SubrequestOptions options);

// If creating a new subrequest is permitted, calls the given factory function synchronously to
// create one.
kj::Own<WorkerInterface> getSubrequest(
kj::FunctionParam<kj::Own<WorkerInterface>(SpanBuilder&, IoChannelFactory&)> func,
kj::FunctionParam<kj::Own<WorkerInterface>(SpanBuilder&, LimeSpanBuilder&, IoChannelFactory&)>
func,
SubrequestOptions options);

// Get WorkerInterface objects to use for subrequests.
Expand Down Expand Up @@ -746,11 +748,13 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
// Returns the current span being recorded. If called while the JS lock is held, uses the trace
// information from the current async context, if available.
SpanParent getCurrentTraceSpan();
LimeSpanParent getCurrentLimeTraceSpan();

// Returns a builder for recording tracing spans (or a no-op builder if tracing is inactive).
// If called while the JS lock is held, uses the trace information from the current async
// context, if available.
SpanBuilder makeTraceSpan(kj::ConstString operationName);
LimeSpanBuilder makeLimeTraceSpan(kj::ConstString operationName);

// Implement per-IoContext rate limiting for Cache.put(). Pass the body of a Cache API PUT
// request and get a possibly wrapped stream back.
Expand Down Expand Up @@ -860,6 +864,7 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
SpanBuilder& span,
LimeSpanBuilder& limeSpan,
IoChannelFactory& channelFactory);

friend class IoContext_IncomingRequest;
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/io/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class RequestObserver: public kj::Refcounted {
virtual SpanParent getSpan() {
return nullptr;
}
virtual LimeSpanParent getLimeSpan() {
return nullptr;
}

virtual void addedContextTask() {}
virtual void finishedContextTask() {}
Expand Down
89 changes: 84 additions & 5 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// https://opensource.org/licenses/Apache-2.0

#include <workerd/io/trace.h>
#include "kj/time.h"
#include "workerd/util/thread-scopes.h"
#include <capnp/message.h>
#include <capnp/schema.h>
#include <kj/debug.h>
Expand All @@ -14,6 +16,10 @@ namespace workerd {
// want this number to be big enough to be useful for tracing, but small enough to make it hard to
// DoS the C++ heap -- keeping in mind we can record a trace per handler run during a request.
static constexpr size_t MAX_TRACE_BYTES = 128 * 1024;
// Limit spans to at most 512, it could be difficult to fit e.g. 1024 spans within MAX_TRACE_BYTES
// unless most of the included spans do not include tags. If use cases arise where this amount is
// insufficient, merge smaller spans together or drop smaller spans.
static constexpr size_t MAX_LIME_SPANS = 512;

namespace {

Expand Down Expand Up @@ -270,10 +276,14 @@ Trace::~Trace() noexcept(false) {}

void Trace::copyTo(rpc::Trace::Builder builder) {
{
auto list = builder.initLogs(logs.size());
auto list = builder.initLogs(logs.size() + spans.size());
for (auto i: kj::indices(logs)) {
logs[i].copyTo(list[i]);
}
// Add spans represented as logs to the logs object.
for (auto i: kj::indices(spans)) {
spans[i].copyTo(list[i + logs.size()]);
}
}

{
Expand Down Expand Up @@ -485,10 +495,21 @@ SpanBuilder& SpanBuilder::operator=(SpanBuilder&& other) {
return *this;
}

LimeSpanBuilder& LimeSpanBuilder::operator=(LimeSpanBuilder&& other) {
end();
observer = kj::mv(other.observer);
span = kj::mv(other.span);
return *this;
}

SpanBuilder::~SpanBuilder() noexcept(false) {
end();
}

LimeSpanBuilder::~LimeSpanBuilder() noexcept(false) {
end();
}

void SpanBuilder::end() {
KJ_IF_SOME(o, observer) {
KJ_IF_SOME(s, span) {
Expand All @@ -499,13 +520,23 @@ void SpanBuilder::end() {
}
}

void SpanBuilder::setOperationName(kj::ConstString operationName) {
void LimeSpanBuilder::end() {
KJ_IF_SOME(o, observer) {
KJ_IF_SOME(s, span) {
s.endTime = kj::systemPreciseCalendarClock().now();
o->report(s);
span = kj::none;
}
}
}

void SpanBuilderBase::setOperationName(kj::ConstString operationName) {
KJ_IF_SOME(s, span) {
s.operationName = kj::mv(operationName);
}
}

void SpanBuilder::setTag(kj::ConstString key, TagValue value) {
void SpanBuilderBase::setTag(kj::ConstString key, TagValue value) {
KJ_IF_SOME(s, span) {
auto keyPtr = key.asPtr();
s.tags.upsert(
Expand All @@ -521,7 +552,7 @@ void SpanBuilder::setTag(kj::ConstString key, TagValue value) {
}
}

void SpanBuilder::addLog(kj::Date timestamp, kj::ConstString key, TagValue value) {
void SpanBuilderBase::addLog(kj::Date timestamp, kj::ConstString key, TagValue value) {
KJ_IF_SOME(s, span) {
if (s.logs.size() >= Span::MAX_LOGS) {
++s.droppedLogs;
Expand Down Expand Up @@ -578,7 +609,7 @@ WorkerTracer::WorkerTracer(PipelineLogLevel pipelineLogLevel)
trace(kj::refcounted<Trace>(
kj::none, kj::none, kj::none, kj::none, kj::none, nullptr, kj::none)) {}

void WorkerTracer::log(kj::Date timestamp, LogLevel logLevel, kj::String message) {
void WorkerTracer::log(kj::Date timestamp, LogLevel logLevel, kj::String message, bool isSpan) {
if (trace->exceededLogLimit) {
return;
}
Expand All @@ -596,9 +627,57 @@ void WorkerTracer::log(kj::Date timestamp, LogLevel logLevel, kj::String message
return;
}
trace->bytesUsed = newSize;
if (isSpan) {
trace->spans.add(timestamp, logLevel, kj::mv(message));
trace->numSpans++;
return;
}
trace->logs.add(timestamp, logLevel, kj::mv(message));
}

void WorkerTracer::addSpan(const Span& span, kj::String spanContext) {
// This is where we'll actually encode the span for now.
// Drop any spans beyond MAX_LIME_SPANS.
if (trace->numSpans >= MAX_LIME_SPANS) {
return;
}
if (isPredictableModeForTest()) {
// Do not emit span duration information in predictable mode.
log(span.endTime, LogLevel::LOG, kj::str("[\"span: ", span.operationName, "\"]"), true);
} else {
// Time since Unix epoch in seconds, with millisecond precision
double epochSecondsStart = (span.startTime - kj::UNIX_EPOCH) / kj::MILLISECONDS / 1000.0;
double epochSecondsEnd = (span.endTime - kj::UNIX_EPOCH) / kj::MILLISECONDS / 1000.0;
auto message = kj::str("[\"span: ", span.operationName, " ", kj::mv(spanContext), " ",
epochSecondsStart, " ", epochSecondsEnd, "\"]");
log(span.endTime, LogLevel::LOG, kj::mv(message), true);
}

// TODO(cleanup): Create a function in kj::OneOf to automatically convert to a given type (i.e
// String) to avoid having to handle each type explicitly here.
for (const Span::TagMap::Entry& tag: span.tags) {
auto value = [&]() {
KJ_SWITCH_ONEOF(tag.value) {
KJ_CASE_ONEOF(str, kj::String) {
return kj::str(str);
}
KJ_CASE_ONEOF(val, int64_t) {
return kj::str(val);
}
KJ_CASE_ONEOF(val, double) {
return kj::str(val);
}
KJ_CASE_ONEOF(val, bool) {
return kj::str(val);
}
}
KJ_UNREACHABLE;
}();
kj::String message = kj::str("[\"tag: "_kj, tag.key, " => "_kj, value, "\"]");
log(span.endTime, LogLevel::LOG, kj::mv(message), true);
}
}

void WorkerTracer::addException(
kj::Date timestamp, kj::String name, kj::String message, kj::Maybe<kj::String> stack) {
if (trace->exceededExceptionLimit) {
Expand Down
Loading

0 comments on commit 081f031

Please sign in to comment.