diff --git a/src/workerd/api/crypto.c++ b/src/workerd/api/crypto.c++ index 450faf62d30..b8115db4821 100644 --- a/src/workerd/api/crypto.c++ +++ b/src/workerd/api/crypto.c++ @@ -700,9 +700,8 @@ kj::Promise DigestStreamSink::write(const void* buffer, size_t size) { kj::Promise DigestStreamSink::write(kj::ArrayPtr> pieces) { for (auto& piece : pieces) { - write(piece.begin(), piece.size()); + co_await write(piece.begin(), piece.size()); } - return kj::READY_NOW; } kj::Promise DigestStreamSink::end() { diff --git a/src/workerd/api/global-scope.c++ b/src/workerd/api/global-scope.c++ index ef2b3230802..c439badeca2 100644 --- a/src/workerd/api/global-scope.c++ +++ b/src/workerd/api/global-scope.c++ @@ -218,8 +218,8 @@ kj::Promise> ServiceWorkerGlobalScope::request( if (jsStream->isDisturbed()) { lock.logUncaughtException( "Script consumed request body but didn't call respondWith(). Can't forward request."); - response.sendError(500, "Internal Server Error", ioContext.getHeaderTable()); - return addNoopDeferredProxy(kj::READY_NOW); + return addNoopDeferredProxy( + response.sendError(500, "Internal Server Error", ioContext.getHeaderTable())); } else { auto client = ioContext.getHttpClient( IoContext::NEXT_CLIENT_CHANNEL, false, diff --git a/src/workerd/io/actor-cache-test.c++ b/src/workerd/io/actor-cache-test.c++ index c60ce6285c7..e964be4c20e 100644 --- a/src/workerd/io/actor-cache-test.c++ +++ b/src/workerd/io/actor-cache-test.c++ @@ -974,9 +974,9 @@ KJ_TEST("ActorCache canceled deletes are coalesced") { auto& mockStorage = test.mockStorage; // A bunch of deletes where we immediately drop the returned promises. - expectUncached(test.delete_("foo")); - expectUncached(test.delete_({"bar"_kj, "baz"_kj})); - expectUncached(test.delete_("qux")); + (void)expectUncached(test.delete_("foo")); + (void)expectUncached(test.delete_({"bar"_kj, "baz"_kj})); + (void)expectUncached(test.delete_("qux")); // Keep one promise. auto promise = expectUncached(test.delete_("corge")); @@ -1530,9 +1530,9 @@ KJ_TEST("ActorCache get-multiple multiple blocks") { // At this point, "bar" and "baz" are considered cached. KJ_ASSERT(expectCached(test.get("bar")) == nullptr); KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("baz"))) == "456"); - expectUncached(test.get("corge")); - expectUncached(test.get("foo")); - expectUncached(test.get("qux")); + (void)expectUncached(test.get("corge")); + (void)expectUncached(test.get("foo")); + (void)expectUncached(test.get("qux")); stream.call("values", CAPNP(list = [(key = "foo", value = "789")])) .expectReturns(CAPNP(), ws); @@ -1542,7 +1542,7 @@ KJ_TEST("ActorCache get-multiple multiple blocks") { KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("baz"))) == "456"); KJ_ASSERT(expectCached(test.get("corge")) == nullptr); KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("foo"))) == "789"); - expectUncached(test.get("qux")); + (void)expectUncached(test.get("qux")); stream.call("end", CAPNP()).expectReturns(CAPNP(), ws); @@ -1712,7 +1712,7 @@ KJ_TEST("ActorCache list() with limit") { KJ_ASSERT(expectCached(test.get("fon")) == nullptr); // Stuff after the last key is not in cache. - expectUncached(test.get("fooa")); + (void)expectUncached(test.get("fooa")); // Listing the same range again, with the same limit or lower, is fully cached. KJ_ASSERT(expectCached(test.list("bar", "qux", 3)) == @@ -1859,9 +1859,9 @@ KJ_TEST("ActorCache list() multiple ranges") { KJ_ASSERT(expectCached(test.list("a", "c")) == kvs({{"a", "1"}, {"b", "2"}})); KJ_ASSERT(expectCached(test.list("x", "z")) == kvs({{"y", "9"}})); - expectUncached(test.get("w")); - expectUncached(test.get("d")); - expectUncached(test.get("c")); + (void)expectUncached(test.get("w")); + (void)expectUncached(test.get("d")); + (void)expectUncached(test.get("c")); } KJ_TEST("ActorCache list() with some already-cached keys in range") { @@ -2714,7 +2714,7 @@ KJ_TEST("ActorCache listReverse() with limit") { KJ_ASSERT(expectCached(test.get("fon")) == nullptr); // Stuff before the first key is not in cache. - expectUncached(test.get("baq")); + (void)expectUncached(test.get("baq")); // Listing the same range again, with the same limit or lower, is fully cached. KJ_ASSERT(expectCached(test.listReverse("bar", "qux", 3)) == @@ -2861,9 +2861,9 @@ KJ_TEST("ActorCache listReverse() multiple ranges") { KJ_ASSERT(expectCached(test.listReverse("a", "c")) == kvs({{"b", "2"}, {"a", "1"}})); KJ_ASSERT(expectCached(test.listReverse("x", "z")) == kvs({{"y", "9"}})); - expectUncached(test.get("w")); - expectUncached(test.get("d")); - expectUncached(test.get("c")); + (void)expectUncached(test.get("w")); + (void)expectUncached(test.get("d")); + (void)expectUncached(test.get("c")); } KJ_TEST("ActorCache listReverse() with some already-cached keys in range") { @@ -3524,7 +3524,7 @@ KJ_TEST("ActorCache LRU purge") { KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("bar"))) == "456"); // But foo was evicted. - expectUncached(test.get("foo")); + (void)expectUncached(test.get("foo")); } KJ_TEST("ActorCache LRU purge ordering") { @@ -3555,8 +3555,8 @@ KJ_TEST("ActorCache LRU purge ordering") { // Foo and qux live, bar and baz evicted. KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("foo"))) == "123"); - expectUncached(test.get("bar")); - expectUncached(test.get("baz")); + (void)expectUncached(test.get("bar")); + (void)expectUncached(test.get("baz")); KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("qux"))) == "555"); KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("xxx"))) == "aaa"); KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("yyy"))) == "bbb"); @@ -3615,9 +3615,9 @@ KJ_TEST("ActorCache LRU purge larger") { test.gate.wait().wait(ws); } - expectUncached(test.get("bar")); - expectUncached(test.get("baz")); - expectUncached(test.get("qux")); + (void)expectUncached(test.get("bar")); + (void)expectUncached(test.get("baz")); + (void)expectUncached(test.get("qux")); KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("corge"))) == kilobyte); KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("grault"))) == kilobyte); KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("garply"))) == kilobyte); @@ -3643,9 +3643,9 @@ KJ_TEST("ActorCache LRU purge") { // Nothing was cached, because nothing fit in the LRU. KJ_ASSERT(test.lru.currentSize() == 0); - expectUncached(test.get("foo")); - expectUncached(test.get("bar")); - expectUncached(test.get("baz")); + (void)expectUncached(test.get("foo")); + (void)expectUncached(test.get("bar")); + (void)expectUncached(test.get("baz")); } KJ_TEST("ActorCache evict on timeout") { @@ -3687,7 +3687,7 @@ KJ_TEST("ActorCache evict on timeout") { // Now foo should be evicted and bar and baz stale. // Verify foo is evicted. - expectUncached(test.get("foo")); + (void)expectUncached(test.get("foo")); // Touch bar. expectCached(test.get("bar")); @@ -3696,7 +3696,7 @@ KJ_TEST("ActorCache evict on timeout") { // Now baz should have been evicted, but bar is still here because we keep touching it. expectCached(test.get("bar")); - expectUncached(test.get("baz")); + (void)expectUncached(test.get("baz")); } KJ_TEST("ActorCache backpressure due to dirtyPressureThreshold") { @@ -3801,9 +3801,9 @@ KJ_TEST("ActorCache lru evict entry with known-empty gaps") { KJ_ASSERT(expectCached(test.list("foo", "qux")) == kvs({{"foo", "123"}})); KJ_ASSERT(expectCached(test.get("fooa")) == nullptr); - expectUncached(test.get("baza")); - expectUncached(test.get("corge")); - expectUncached(test.get("fo")); + (void)expectUncached(test.get("baza")); + (void)expectUncached(test.get("corge")); + (void)expectUncached(test.get("fo")); } KJ_TEST("ActorCache lru evict gap entry with known-empty gaps") { @@ -3852,7 +3852,7 @@ KJ_TEST("ActorCache lru evict gap entry with known-empty gaps") { } // Okay, that gap is gone now. - expectUncached(test.get("foo+1")); + (void)expectUncached(test.get("foo+1")); } KJ_TEST("ActorCache lru evict entry with trailing known-empty gap (followed by END_GAP)") { @@ -3900,11 +3900,11 @@ KJ_TEST("ActorCache lru evict entry with trailing known-empty gap (followed by E KJ_ASSERT(expectCached(test.list("bar", "corge")) == kvs({{"bar", "456"}, {"baz", "789"}})); KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("corge"))) == "555"); - expectUncached(test.get("corgf")); - expectUncached(test.get("foo")); - expectUncached(test.get("quw")); - expectUncached(test.get("qux")); - expectUncached(test.get("quy")); + (void)expectUncached(test.get("corgf")); + (void)expectUncached(test.get("foo")); + (void)expectUncached(test.get("quw")); + (void)expectUncached(test.get("qux")); + (void)expectUncached(test.get("quy")); } KJ_TEST("ActorCache timeout entry with known-empty gaps") { @@ -3955,9 +3955,9 @@ KJ_TEST("ActorCache timeout entry with known-empty gaps") { KJ_ASSERT(expectCached(test.list("foo", "qux")) == kvs({{"foo", "123"}})); KJ_ASSERT(expectCached(test.get("fooa")) == nullptr); - expectUncached(test.get("baza")); - expectUncached(test.get("corge")); - expectUncached(test.get("fo")); + (void)expectUncached(test.get("baza")); + (void)expectUncached(test.get("corge")); + (void)expectUncached(test.get("fo")); } @@ -4025,10 +4025,10 @@ KJ_TEST("ActorCache purge everything while listing") { kvs({{"bar", "456"}, {"baz", "789"}, {"corge", "555"}, {"foo", "123"}})); } - expectUncached(test.get("bar")); - expectUncached(test.get("baz")); - expectUncached(test.get("corge")); - expectUncached(test.get("foo")); + (void)expectUncached(test.get("bar")); + (void)expectUncached(test.get("baz")); + (void)expectUncached(test.get("corge")); + (void)expectUncached(test.get("foo")); } KJ_TEST("ActorCache purge everything while listing; has previous entry") { @@ -4229,12 +4229,12 @@ KJ_TEST("ActorCache skip cache") { KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("foo"))) == "qux"); // The other things that were returned weren't cached. - expectUncached(test.get("bar")); - expectUncached(test.get("baz")); + (void)expectUncached(test.get("bar")); + (void)expectUncached(test.get("baz")); // No gaps were cached as empty either. - expectUncached(test.get("corge")); - expectUncached(test.get("grault")); + (void)expectUncached(test.get("corge")); + (void)expectUncached(test.get("grault")); // Again, but reverse list. { @@ -4257,12 +4257,12 @@ KJ_TEST("ActorCache skip cache") { KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("foo"))) == "qux"); // The other things that were returned weren't cached. - expectUncached(test.get("bar")); - expectUncached(test.get("baz")); + (void)expectUncached(test.get("bar")); + (void)expectUncached(test.get("baz")); // No gaps were cached as empty either. - expectUncached(test.get("corge")); - expectUncached(test.get("grault")); + (void)expectUncached(test.get("corge")); + (void)expectUncached(test.get("grault")); } // ======================================================================================= diff --git a/src/workerd/io/hibernation-manager.c++ b/src/workerd/io/hibernation-manager.c++ index cc8da03f44b..e2662a0d1bc 100644 --- a/src/workerd/io/hibernation-manager.c++ +++ b/src/workerd/io/hibernation-manager.c++ @@ -207,7 +207,7 @@ kj::Promise HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) { // autoResponseTimestamp on the active websocket. (active)->setAutoResponseTimestamp(hib.autoResponseTimestamp); } - ws.send((reqResp)->getResponse().asArray()); + co_await ws.send((reqResp)->getResponse().asArray()); skip = true; // If we've sent an auto response message, we should not unhibernate or deliver the // received message to the actor diff --git a/src/workerd/io/io-gate-test.c++ b/src/workerd/io/io-gate-test.c++ index f4f1ecbaa18..c7648f323fe 100644 --- a/src/workerd/io/io-gate-test.c++ +++ b/src/workerd/io/io-gate-test.c++ @@ -135,7 +135,7 @@ KJ_TEST("InputGate nested critical sections") { } // Start cs2. - cs2->wait(); + cs2->wait().wait(ws); // Can't start new tasks in cs1 until cs2 finishes. auto cs1Wait = cs1->wait(); @@ -166,7 +166,7 @@ KJ_TEST("InputGate nested critical section outlives parent") { } // Start cs2. - cs2->wait(); + cs2->wait().wait(ws); // Mark cs1 done. (Note that, in a real program, this probably can't happen like this, because a // lock would be taken on cs1 before marking it done, and that lock would wait for cs2 to @@ -210,7 +210,7 @@ KJ_TEST("InputGate deeply nested critical sections") { } // Start cs2 - cs2->wait(); + cs2->wait().wait(ws); // Add some waiters to cs2, some of which are waiting to start more nested critical sections auto lock = cs2->wait().wait(ws); @@ -336,7 +336,7 @@ KJ_TEST("InputGate broken") { } // start cs2 - cs2->wait(); + cs2->wait().wait(ws); auto cs1Wait = cs1->wait(); KJ_EXPECT(!cs1Wait.poll(ws)); diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 8e861a3b182..5064845f8d4 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -2546,9 +2546,11 @@ kj::Promise Server::handleDrain(kj::Promise drainWhen) { co_await drainWhen; // Tell all HttpServers to drain. This causes them to disconnect any connections that don't // have a request in-flight. + auto drainPromises = kj::heapArrayBuilder>(httpServers.size()); for (auto& httpServer: httpServers) { - httpServer.httpServer.drain(); + drainPromises.add(httpServer.httpServer.drain()); } + co_await kj::joinPromisesFailFast(drainPromises.finish()); } kj::Promise Server::run(jsg::V8System& v8System, config::Config::Reader config,