From e8e98293bc02f91d132b1a4c818bf40274497a4f Mon Sep 17 00:00:00 2001 From: Harris Hancock Date: Tue, 20 Aug 2024 17:59:39 +0100 Subject: [PATCH] Revert "EW-8447 Fix CPU profiling" This reverts commit 19c8ed568d0ef3d4a60f5094aab46c7103978eee. --- src/workerd/io/worker.c++ | 52 +++++------------------------- src/workerd/io/worker.h | 17 ---------- src/workerd/server/server.c++ | 28 +++------------- src/workerd/util/xthreadnotifier.h | 15 --------- 4 files changed, 12 insertions(+), 100 deletions(-) diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 7c2a6f6821e..8779699daa4 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -2339,11 +2339,8 @@ struct MessageQueue { class Worker::Isolate::InspectorChannelImpl final: public v8_inspector::V8Inspector::Channel { public: - InspectorChannelImpl(kj::Own isolateParam, - ExecutorNotifierPair isolateThreadExecutorNotifierPair, - kj::WebSocket& webSocket) - : ioHandler(kj::mv(isolateThreadExecutorNotifierPair), webSocket), - state(kj::heap(this, kj::mv(isolateParam))) { + InspectorChannelImpl(kj::Own isolateParam, kj::WebSocket& webSocket) + : ioHandler(webSocket), state(kj::heap(this, kj::mv(isolateParam))) { ioHandler.connect(*this); } @@ -2596,12 +2593,11 @@ private: // the InspectorChannelImpl and the InspectorClient. class WebSocketIoHandler final { public: - WebSocketIoHandler(ExecutorNotifierPair isolateThreadExecutorNotifierPair, kj::WebSocket& webSocket) - : isolateThreadExecutor(kj::mv(isolateThreadExecutorNotifierPair.executor)), - incomingQueueNotifier(kj::mv(isolateThreadExecutorNotifierPair.notifier)), - webSocket(webSocket) { + WebSocketIoHandler(kj::WebSocket& webSocket) + : webSocket(webSocket) { // Assume we are being instantiated on the InspectorService thread, the thread that will do // I/O for CDP messages. Messages are delivered to the InspectorChannelImpl on the Isolate thread. + incomingQueueNotifier = XThreadNotifier::create(); outgoingQueueNotifier = XThreadNotifier::create(); } @@ -2634,20 +2630,7 @@ private: // Message pumping promise that should be evaluated on the InspectorService // thread. kj::Promise messagePump() { - // Although inspector I/O must happen on the InspectorService thread (to make sure breakpoints - // don't block inspector I/O), inspector messages must be actually dispatched on the Isolate - // thread. So, we run the dispatch loop on the Isolate thread. - // - // Note that the above comment is only really accurate in vanilla workerd. In the case of the - // internal Cloudflare Workers runtime, `isolateThreadExecutor` may actually refer to the - // current thread's `kj::Executor`. That's fine; calling `executeAsync()` on the current - // thread's executor just posts the task to the event loop, and everything works as expected. - auto dispatchLoopPromise = isolateThreadExecutor->executeAsync([this]() { - return dispatchLoop(); - }); - return receiveLoop() - .exclusiveJoin(kj::mv(dispatchLoopPromise)) - .exclusiveJoin(transmitLoop()); + return receiveLoop().exclusiveJoin(dispatchLoop()).exclusiveJoin(transmitLoop()); } void send(kj::String message) { @@ -2688,7 +2671,6 @@ private: outgoingQueueNotifier->notify(); } - // Must be called on the InspectorService thread. kj::Promise receiveLoop() { for (;;) { auto message = co_await webSocket.receive(MAX_MESSAGE_SIZE); @@ -2711,7 +2693,6 @@ private: } } - // Must be called on the Isolate thread. kj::Promise dispatchLoop() { for (;;) { co_await incomingQueueNotifier->awaitNotification(); @@ -2721,7 +2702,6 @@ private: } } - // Must be called on the InspectorService thread. kj::Promise transmitLoop() { for (;;) { co_await outgoingQueueNotifier->awaitNotification(); @@ -2748,17 +2728,10 @@ private: } } - // We need access to the Isolate thread's kj::Executor to run the inspector dispatch loop. This - // doesn't actually have to be an Own, because the Isolate thread will destroy the Isolate - // before it exits, but it doesn't hurt. - kj::Own isolateThreadExecutor; - kj::MutexGuarded incomingQueue; - // This XThreadNotifier must be created on the Isolate thread. kj::Own incomingQueueNotifier; kj::MutexGuarded outgoingQueue; - // This XThreadNotifier must be created on the InspectorService thread. kj::Own outgoingQueueNotifier; kj::WebSocket& webSocket; // only accessed on the InspectorService thread. @@ -2908,17 +2881,11 @@ kj::Promise Worker::Isolate::attachInspector( headers.set(controlHeaderId, "{\"ewLog\":{\"status\":\"ok\"}}"); auto webSocket = response.acceptWebSocket(headers); - // This `attachInspector()` overload is used by the internal Cloudflare Workers runtime, which has - // no concept of a single Isolate thread. Instead, it's okay for all inspector messages to be - // dispatched on the calling thread. - auto executorNotifierPair = ExecutorNotifierPair{}; - - return attachInspector(kj::mv(executorNotifierPair), timer, timerOffset, *webSocket) + return attachInspector(timer, timerOffset, *webSocket) .attach(kj::mv(webSocket)); } kj::Promise Worker::Isolate::attachInspector( - ExecutorNotifierPair isolateThreadExecutorNotifierPair, kj::Timer& timer, kj::Duration timerOffset, kj::WebSocket& webSocket) const { @@ -2939,10 +2906,7 @@ kj::Promise Worker::Isolate::attachInspector( lockedSelf.impl->inspectorClient.setInspectorTimerInfo(timer, timerOffset); - auto channel = kj::heap( - kj::atomicAddRef(*this), - kj::mv(isolateThreadExecutorNotifierPair), - webSocket); + auto channel = kj::heap(kj::atomicAddRef(*this), webSocket); lockedSelf.currentInspectorSession = *channel; lockedSelf.impl->inspectorClient.setChannel(*channel); diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index e0db7d4d717..cb437916892 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -288,17 +288,6 @@ class Worker::Isolate: public kj::AtomicRefcounted { uint getLockSuccessCount() const; // Accepts a connection to the V8 inspector and handles requests until the client disconnects. - // Also adds a special JSON value to the header identified by `controlHeaderId`, for compatibility - // with internal Cloudflare systems. - // - // This overload will dispatch all inspector messages on the _calling thread's_ `kj::Executor`. - // When linked against vanilla V8, this means that CPU profiling will only profile JavaScript - // running on the _calling thread_, which will most likely only be inspector console commands, and - // is not typically desired. - // - // For the above reason , this overload is curently only suitable for use by the internal Workers - // Runtime codebase, which patches V8 to profile whichever thread currently holds the `v8::Locker` - // for this Isolate. kj::Promise attachInspector( kj::Timer& timer, kj::Duration timerOffset, @@ -306,13 +295,7 @@ class Worker::Isolate: public kj::AtomicRefcounted { const kj::HttpHeaderTable& headerTable, kj::HttpHeaderId controlHeaderId) const; - // Accepts a connection to the V8 inspector and handles requests until the client disconnects. - // - // This overload will dispatch all inspector messages on the `kj::Executor` passed in via - // `isolateThreadExecutorNotifierPair`. For CPU profiling to work as expected, this `kj::Executor` - // must be associated with the same thread which executes the Worker's JavaScript. kj::Promise attachInspector( - ExecutorNotifierPair isolateThreadExecutorNotifierPair, kj::Timer& timer, kj::Duration timerOffset, kj::WebSocket& webSocket) const; diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 99df5bbc2d5..da0d9188022 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1165,12 +1165,10 @@ private: class Server::InspectorService final: public kj::HttpService, public kj::HttpServerErrorHandler { public: InspectorService( - ExecutorNotifierPair isolateThreadExecutorNotifierPair, kj::Timer& timer, kj::HttpHeaderTable::Builder& headerTableBuilder, InspectorServiceIsolateRegistrar& registrar) - : isolateThreadExecutorNotifierPair(kj::mv(isolateThreadExecutorNotifierPair)), - timer(timer), + : timer(timer), headerTable(headerTableBuilder.getFutureTable()), server(timer, headerTable, *this, kj::HttpServerSettings { .errorHandler = *this @@ -1228,8 +1226,7 @@ public: auto webSocket = response.acceptWebSocket(responseHeaders); kj::Duration timerOffset = 0 * kj::MILLISECONDS; try { - co_return co_await ref->attachInspector( - isolateThreadExecutorNotifierPair.clone(), timer, timerOffset, *webSocket); + co_return co_await ref->attachInspector(timer, timerOffset, *webSocket); } catch (...) { auto exception = kj::getCaughtExceptionAsKj(); if (exception.getType() == kj::Exception::Type::DISCONNECTED) { @@ -1347,7 +1344,6 @@ public: } private: - ExecutorNotifierPair isolateThreadExecutorNotifierPair; kj::Timer& timer; kj::HttpHeaderTable& headerTable; kj::HashMap> isolates; @@ -3441,30 +3437,14 @@ uint startInspector(kj::StringPtr inspectorAddress, static constexpr uint DEFAULT_PORT = 9229; kj::MutexGuarded inspectorPort(UNASSIGNED_PORT); - // `startInspector()` is called on the Isolate thread. V8 requires CPU profiling to be started and - // stopped on the same thread which executes JavaScript -- that is, the Isolate thread -- which - // means we need to dispatch inspector messages on this thread. To help make that happen, we - // capture this thread's kj::Executor and create an XThreadNotifier tied to this thread here, and - // pass it into the InspectorService below. Later, when the InspectorService receives a WebSocket - // connection, it calls `Isolate::attachInspector()`, which starts a dispatch loop on the - // kj::Executor we create here. The InspectorService reads subsequent WebSocket inspector messages - // and feeds them to that dispatch loop via the XThreadNotifier we create here. - auto isolateThreadExecutorNotifierPair = ExecutorNotifierPair{}; - - // Start the InspectorService thread. - kj::Thread thread([inspectorAddress, &inspectorPort, ®istrar, - isolateThreadExecutorNotifierPair = kj::mv(isolateThreadExecutorNotifierPair)]() mutable { + kj::Thread thread([inspectorAddress, &inspectorPort, ®istrar](){ kj::AsyncIoContext io = kj::setupAsyncIo(); kj::HttpHeaderTable::Builder headerTableBuilder; // Create the special inspector service. auto inspectorService( - kj::heap( - kj::mv(isolateThreadExecutorNotifierPair), - io.provider->getTimer(), - headerTableBuilder, - registrar)); + kj::heap(io.provider->getTimer(), headerTableBuilder, registrar)); auto ownHeaderTable = headerTableBuilder.build(); // Configure and start the inspector socket. diff --git a/src/workerd/util/xthreadnotifier.h b/src/workerd/util/xthreadnotifier.h index 171b5d52b70..1149059d881 100644 --- a/src/workerd/util/xthreadnotifier.h +++ b/src/workerd/util/xthreadnotifier.h @@ -42,19 +42,4 @@ class XThreadNotifier final: public kj::AtomicRefcounted { kj::MutexGuarded> paf; }; - -// Convenience struct for creating and passing around a kj::Executor and XThreadNotifier. The -// default constructor creates a pair of the objects which are both tied to the current thread. -struct ExecutorNotifierPair { - kj::Own executor = kj::getCurrentThreadExecutor().addRef(); - kj::Own notifier = XThreadNotifier::create(); - - ExecutorNotifierPair clone() { - return { - .executor = executor->addRef(), - .notifier = kj::atomicAddRef(*notifier), - }; - } -}; - } // namespace workerd