Skip to content

Commit

Permalink
Revert "EW-8447 Fix CPU profiling"
Browse files Browse the repository at this point in the history
This reverts commit 19c8ed5.
  • Loading branch information
harrishancock committed Aug 20, 2024
1 parent 187ef2a commit e8e9829
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 100 deletions.
52 changes: 8 additions & 44 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2339,11 +2339,8 @@ struct MessageQueue {

class Worker::Isolate::InspectorChannelImpl final: public v8_inspector::V8Inspector::Channel {
public:
InspectorChannelImpl(kj::Own<const Worker::Isolate> isolateParam,
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::WebSocket& webSocket)
: ioHandler(kj::mv(isolateThreadExecutorNotifierPair), webSocket),
state(kj::heap<State>(this, kj::mv(isolateParam))) {
InspectorChannelImpl(kj::Own<const Worker::Isolate> isolateParam, kj::WebSocket& webSocket)
: ioHandler(webSocket), state(kj::heap<State>(this, kj::mv(isolateParam))) {
ioHandler.connect(*this);
}

Expand Down Expand Up @@ -2596,12 +2593,11 @@ private:
// the InspectorChannelImpl and the InspectorClient.
class WebSocketIoHandler final {
public:
WebSocketIoHandler(ExecutorNotifierPair isolateThreadExecutorNotifierPair, kj::WebSocket& webSocket)
: isolateThreadExecutor(kj::mv(isolateThreadExecutorNotifierPair.executor)),
incomingQueueNotifier(kj::mv(isolateThreadExecutorNotifierPair.notifier)),
webSocket(webSocket) {
WebSocketIoHandler(kj::WebSocket& webSocket)
: webSocket(webSocket) {
// Assume we are being instantiated on the InspectorService thread, the thread that will do
// I/O for CDP messages. Messages are delivered to the InspectorChannelImpl on the Isolate thread.
incomingQueueNotifier = XThreadNotifier::create();
outgoingQueueNotifier = XThreadNotifier::create();
}

Expand Down Expand Up @@ -2634,20 +2630,7 @@ private:
// Message pumping promise that should be evaluated on the InspectorService
// thread.
kj::Promise<void> messagePump() {
// Although inspector I/O must happen on the InspectorService thread (to make sure breakpoints
// don't block inspector I/O), inspector messages must be actually dispatched on the Isolate
// thread. So, we run the dispatch loop on the Isolate thread.
//
// Note that the above comment is only really accurate in vanilla workerd. In the case of the
// internal Cloudflare Workers runtime, `isolateThreadExecutor` may actually refer to the
// current thread's `kj::Executor`. That's fine; calling `executeAsync()` on the current
// thread's executor just posts the task to the event loop, and everything works as expected.
auto dispatchLoopPromise = isolateThreadExecutor->executeAsync([this]() {
return dispatchLoop();
});
return receiveLoop()
.exclusiveJoin(kj::mv(dispatchLoopPromise))
.exclusiveJoin(transmitLoop());
return receiveLoop().exclusiveJoin(dispatchLoop()).exclusiveJoin(transmitLoop());
}

void send(kj::String message) {
Expand Down Expand Up @@ -2688,7 +2671,6 @@ private:
outgoingQueueNotifier->notify();
}

// Must be called on the InspectorService thread.
kj::Promise<void> receiveLoop() {
for (;;) {
auto message = co_await webSocket.receive(MAX_MESSAGE_SIZE);
Expand All @@ -2711,7 +2693,6 @@ private:
}
}

// Must be called on the Isolate thread.
kj::Promise<void> dispatchLoop() {
for (;;) {
co_await incomingQueueNotifier->awaitNotification();
Expand All @@ -2721,7 +2702,6 @@ private:
}
}

// Must be called on the InspectorService thread.
kj::Promise<void> transmitLoop() {
for (;;) {
co_await outgoingQueueNotifier->awaitNotification();
Expand All @@ -2748,17 +2728,10 @@ private:
}
}

// We need access to the Isolate thread's kj::Executor to run the inspector dispatch loop. This
// doesn't actually have to be an Own, because the Isolate thread will destroy the Isolate
// before it exits, but it doesn't hurt.
kj::Own<const kj::Executor> isolateThreadExecutor;

kj::MutexGuarded<MessageQueue> incomingQueue;
// This XThreadNotifier must be created on the Isolate thread.
kj::Own<XThreadNotifier> incomingQueueNotifier;

kj::MutexGuarded<MessageQueue> outgoingQueue;
// This XThreadNotifier must be created on the InspectorService thread.
kj::Own<XThreadNotifier> outgoingQueueNotifier;

kj::WebSocket& webSocket; // only accessed on the InspectorService thread.
Expand Down Expand Up @@ -2908,17 +2881,11 @@ kj::Promise<void> Worker::Isolate::attachInspector(
headers.set(controlHeaderId, "{\"ewLog\":{\"status\":\"ok\"}}");
auto webSocket = response.acceptWebSocket(headers);

// This `attachInspector()` overload is used by the internal Cloudflare Workers runtime, which has
// no concept of a single Isolate thread. Instead, it's okay for all inspector messages to be
// dispatched on the calling thread.
auto executorNotifierPair = ExecutorNotifierPair{};

return attachInspector(kj::mv(executorNotifierPair), timer, timerOffset, *webSocket)
return attachInspector(timer, timerOffset, *webSocket)
.attach(kj::mv(webSocket));
}

kj::Promise<void> Worker::Isolate::attachInspector(
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::Timer& timer,
kj::Duration timerOffset,
kj::WebSocket& webSocket) const {
Expand All @@ -2939,10 +2906,7 @@ kj::Promise<void> Worker::Isolate::attachInspector(

lockedSelf.impl->inspectorClient.setInspectorTimerInfo(timer, timerOffset);

auto channel = kj::heap<Worker::Isolate::InspectorChannelImpl>(
kj::atomicAddRef(*this),
kj::mv(isolateThreadExecutorNotifierPair),
webSocket);
auto channel = kj::heap<Worker::Isolate::InspectorChannelImpl>(kj::atomicAddRef(*this), webSocket);
lockedSelf.currentInspectorSession = *channel;
lockedSelf.impl->inspectorClient.setChannel(*channel);

Expand Down
17 changes: 0 additions & 17 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,31 +288,14 @@ class Worker::Isolate: public kj::AtomicRefcounted {
uint getLockSuccessCount() const;

// Accepts a connection to the V8 inspector and handles requests until the client disconnects.
// Also adds a special JSON value to the header identified by `controlHeaderId`, for compatibility
// with internal Cloudflare systems.
//
// This overload will dispatch all inspector messages on the _calling thread's_ `kj::Executor`.
// When linked against vanilla V8, this means that CPU profiling will only profile JavaScript
// running on the _calling thread_, which will most likely only be inspector console commands, and
// is not typically desired.
//
// For the above reason , this overload is curently only suitable for use by the internal Workers
// Runtime codebase, which patches V8 to profile whichever thread currently holds the `v8::Locker`
// for this Isolate.
kj::Promise<void> attachInspector(
kj::Timer& timer,
kj::Duration timerOffset,
kj::HttpService::Response& response,
const kj::HttpHeaderTable& headerTable,
kj::HttpHeaderId controlHeaderId) const;

// Accepts a connection to the V8 inspector and handles requests until the client disconnects.
//
// This overload will dispatch all inspector messages on the `kj::Executor` passed in via
// `isolateThreadExecutorNotifierPair`. For CPU profiling to work as expected, this `kj::Executor`
// must be associated with the same thread which executes the Worker's JavaScript.
kj::Promise<void> attachInspector(
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::Timer& timer,
kj::Duration timerOffset,
kj::WebSocket& webSocket) const;
Expand Down
28 changes: 4 additions & 24 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1165,12 +1165,10 @@ private:
class Server::InspectorService final: public kj::HttpService, public kj::HttpServerErrorHandler {
public:
InspectorService(
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::Timer& timer,
kj::HttpHeaderTable::Builder& headerTableBuilder,
InspectorServiceIsolateRegistrar& registrar)
: isolateThreadExecutorNotifierPair(kj::mv(isolateThreadExecutorNotifierPair)),
timer(timer),
: timer(timer),
headerTable(headerTableBuilder.getFutureTable()),
server(timer, headerTable, *this, kj::HttpServerSettings {
.errorHandler = *this
Expand Down Expand Up @@ -1228,8 +1226,7 @@ public:
auto webSocket = response.acceptWebSocket(responseHeaders);
kj::Duration timerOffset = 0 * kj::MILLISECONDS;
try {
co_return co_await ref->attachInspector(
isolateThreadExecutorNotifierPair.clone(), timer, timerOffset, *webSocket);
co_return co_await ref->attachInspector(timer, timerOffset, *webSocket);
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
if (exception.getType() == kj::Exception::Type::DISCONNECTED) {
Expand Down Expand Up @@ -1347,7 +1344,6 @@ public:
}

private:
ExecutorNotifierPair isolateThreadExecutorNotifierPair;
kj::Timer& timer;
kj::HttpHeaderTable& headerTable;
kj::HashMap<kj::String, kj::Own<const Worker::Isolate::WeakIsolateRef>> isolates;
Expand Down Expand Up @@ -3441,30 +3437,14 @@ uint startInspector(kj::StringPtr inspectorAddress,
static constexpr uint DEFAULT_PORT = 9229;
kj::MutexGuarded<uint> inspectorPort(UNASSIGNED_PORT);

// `startInspector()` is called on the Isolate thread. V8 requires CPU profiling to be started and
// stopped on the same thread which executes JavaScript -- that is, the Isolate thread -- which
// means we need to dispatch inspector messages on this thread. To help make that happen, we
// capture this thread's kj::Executor and create an XThreadNotifier tied to this thread here, and
// pass it into the InspectorService below. Later, when the InspectorService receives a WebSocket
// connection, it calls `Isolate::attachInspector()`, which starts a dispatch loop on the
// kj::Executor we create here. The InspectorService reads subsequent WebSocket inspector messages
// and feeds them to that dispatch loop via the XThreadNotifier we create here.
auto isolateThreadExecutorNotifierPair = ExecutorNotifierPair{};

// Start the InspectorService thread.
kj::Thread thread([inspectorAddress, &inspectorPort, &registrar,
isolateThreadExecutorNotifierPair = kj::mv(isolateThreadExecutorNotifierPair)]() mutable {
kj::Thread thread([inspectorAddress, &inspectorPort, &registrar](){
kj::AsyncIoContext io = kj::setupAsyncIo();

kj::HttpHeaderTable::Builder headerTableBuilder;

// Create the special inspector service.
auto inspectorService(
kj::heap<Server::InspectorService>(
kj::mv(isolateThreadExecutorNotifierPair),
io.provider->getTimer(),
headerTableBuilder,
registrar));
kj::heap<Server::InspectorService>(io.provider->getTimer(), headerTableBuilder, registrar));
auto ownHeaderTable = headerTableBuilder.build();

// Configure and start the inspector socket.
Expand Down
15 changes: 0 additions & 15 deletions src/workerd/util/xthreadnotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,4 @@ class XThreadNotifier final: public kj::AtomicRefcounted {
kj::MutexGuarded<kj::PromiseCrossThreadFulfillerPair<void>> paf;
};


// Convenience struct for creating and passing around a kj::Executor and XThreadNotifier. The
// default constructor creates a pair of the objects which are both tied to the current thread.
struct ExecutorNotifierPair {
kj::Own<const kj::Executor> executor = kj::getCurrentThreadExecutor().addRef();
kj::Own<XThreadNotifier> notifier = XThreadNotifier::create();

ExecutorNotifierPair clone() {
return {
.executor = executor->addRef(),
.notifier = kj::atomicAddRef(*notifier),
};
}
};

} // namespace workerd

0 comments on commit e8e9829

Please sign in to comment.