Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various dropped promises #1222

Merged
merged 6 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/workerd/api/crypto.c++
Original file line number Diff line number Diff line change
Expand Up @@ -700,9 +700,8 @@ kj::Promise<void> DigestStreamSink::write(const void* buffer, size_t size) {

kj::Promise<void> DigestStreamSink::write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) {
for (auto& piece : pieces) {
write(piece.begin(), piece.size());
co_await write(piece.begin(), piece.size());
}
return kj::READY_NOW;
}

kj::Promise<void> DigestStreamSink::end() {
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ kj::Promise<DeferredProxy<void>> ServiceWorkerGlobalScope::request(
if (jsStream->isDisturbed()) {
lock.logUncaughtException(
"Script consumed request body but didn't call respondWith(). Can't forward request.");
response.sendError(500, "Internal Server Error", ioContext.getHeaderTable());
return addNoopDeferredProxy(kj::READY_NOW);
return addNoopDeferredProxy(
response.sendError(500, "Internal Server Error", ioContext.getHeaderTable()));
} else {
auto client = ioContext.getHttpClient(
IoContext::NEXT_CLIENT_CHANNEL, false,
Expand Down
100 changes: 50 additions & 50 deletions src/workerd/io/actor-cache-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -974,9 +974,9 @@ KJ_TEST("ActorCache canceled deletes are coalesced") {
auto& mockStorage = test.mockStorage;

// A bunch of deletes where we immediately drop the returned promises.
expectUncached(test.delete_("foo"));
expectUncached(test.delete_({"bar"_kj, "baz"_kj}));
expectUncached(test.delete_("qux"));
(void)expectUncached(test.delete_("foo"));
(void)expectUncached(test.delete_({"bar"_kj, "baz"_kj}));
(void)expectUncached(test.delete_("qux"));

// Keep one promise.
auto promise = expectUncached(test.delete_("corge"));
Expand Down Expand Up @@ -1530,9 +1530,9 @@ KJ_TEST("ActorCache get-multiple multiple blocks") {
// At this point, "bar" and "baz" are considered cached.
KJ_ASSERT(expectCached(test.get("bar")) == nullptr);
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("baz"))) == "456");
expectUncached(test.get("corge"));
expectUncached(test.get("foo"));
expectUncached(test.get("qux"));
(void)expectUncached(test.get("corge"));
(void)expectUncached(test.get("foo"));
(void)expectUncached(test.get("qux"));

stream.call("values", CAPNP(list = [(key = "foo", value = "789")]))
.expectReturns(CAPNP(), ws);
Expand All @@ -1542,7 +1542,7 @@ KJ_TEST("ActorCache get-multiple multiple blocks") {
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("baz"))) == "456");
KJ_ASSERT(expectCached(test.get("corge")) == nullptr);
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("foo"))) == "789");
expectUncached(test.get("qux"));
(void)expectUncached(test.get("qux"));

stream.call("end", CAPNP()).expectReturns(CAPNP(), ws);

Expand Down Expand Up @@ -1712,7 +1712,7 @@ KJ_TEST("ActorCache list() with limit") {
KJ_ASSERT(expectCached(test.get("fon")) == nullptr);

// Stuff after the last key is not in cache.
expectUncached(test.get("fooa"));
(void)expectUncached(test.get("fooa"));

// Listing the same range again, with the same limit or lower, is fully cached.
KJ_ASSERT(expectCached(test.list("bar", "qux", 3)) ==
Expand Down Expand Up @@ -1859,9 +1859,9 @@ KJ_TEST("ActorCache list() multiple ranges") {
KJ_ASSERT(expectCached(test.list("a", "c")) == kvs({{"a", "1"}, {"b", "2"}}));
KJ_ASSERT(expectCached(test.list("x", "z")) == kvs({{"y", "9"}}));

expectUncached(test.get("w"));
expectUncached(test.get("d"));
expectUncached(test.get("c"));
(void)expectUncached(test.get("w"));
(void)expectUncached(test.get("d"));
(void)expectUncached(test.get("c"));
}

KJ_TEST("ActorCache list() with some already-cached keys in range") {
Expand Down Expand Up @@ -2714,7 +2714,7 @@ KJ_TEST("ActorCache listReverse() with limit") {
KJ_ASSERT(expectCached(test.get("fon")) == nullptr);

// Stuff before the first key is not in cache.
expectUncached(test.get("baq"));
(void)expectUncached(test.get("baq"));

// Listing the same range again, with the same limit or lower, is fully cached.
KJ_ASSERT(expectCached(test.listReverse("bar", "qux", 3)) ==
Expand Down Expand Up @@ -2861,9 +2861,9 @@ KJ_TEST("ActorCache listReverse() multiple ranges") {
KJ_ASSERT(expectCached(test.listReverse("a", "c")) == kvs({{"b", "2"}, {"a", "1"}}));
KJ_ASSERT(expectCached(test.listReverse("x", "z")) == kvs({{"y", "9"}}));

expectUncached(test.get("w"));
expectUncached(test.get("d"));
expectUncached(test.get("c"));
(void)expectUncached(test.get("w"));
(void)expectUncached(test.get("d"));
(void)expectUncached(test.get("c"));
}

KJ_TEST("ActorCache listReverse() with some already-cached keys in range") {
Expand Down Expand Up @@ -3524,7 +3524,7 @@ KJ_TEST("ActorCache LRU purge") {
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("bar"))) == "456");

// But foo was evicted.
expectUncached(test.get("foo"));
(void)expectUncached(test.get("foo"));
}

KJ_TEST("ActorCache LRU purge ordering") {
Expand Down Expand Up @@ -3555,8 +3555,8 @@ KJ_TEST("ActorCache LRU purge ordering") {

// Foo and qux live, bar and baz evicted.
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("foo"))) == "123");
expectUncached(test.get("bar"));
expectUncached(test.get("baz"));
(void)expectUncached(test.get("bar"));
(void)expectUncached(test.get("baz"));
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("qux"))) == "555");
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("xxx"))) == "aaa");
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("yyy"))) == "bbb");
Expand Down Expand Up @@ -3615,9 +3615,9 @@ KJ_TEST("ActorCache LRU purge larger") {
test.gate.wait().wait(ws);
}

expectUncached(test.get("bar"));
expectUncached(test.get("baz"));
expectUncached(test.get("qux"));
(void)expectUncached(test.get("bar"));
(void)expectUncached(test.get("baz"));
(void)expectUncached(test.get("qux"));
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("corge"))) == kilobyte);
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("grault"))) == kilobyte);
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("garply"))) == kilobyte);
Expand All @@ -3643,9 +3643,9 @@ KJ_TEST("ActorCache LRU purge") {

// Nothing was cached, because nothing fit in the LRU.
KJ_ASSERT(test.lru.currentSize() == 0);
expectUncached(test.get("foo"));
expectUncached(test.get("bar"));
expectUncached(test.get("baz"));
(void)expectUncached(test.get("foo"));
(void)expectUncached(test.get("bar"));
(void)expectUncached(test.get("baz"));
}

KJ_TEST("ActorCache evict on timeout") {
Expand Down Expand Up @@ -3687,7 +3687,7 @@ KJ_TEST("ActorCache evict on timeout") {
// Now foo should be evicted and bar and baz stale.

// Verify foo is evicted.
expectUncached(test.get("foo"));
(void)expectUncached(test.get("foo"));

// Touch bar.
expectCached(test.get("bar"));
Expand All @@ -3696,7 +3696,7 @@ KJ_TEST("ActorCache evict on timeout") {
// Now baz should have been evicted, but bar is still here because we keep touching it.

expectCached(test.get("bar"));
expectUncached(test.get("baz"));
(void)expectUncached(test.get("baz"));
}

KJ_TEST("ActorCache backpressure due to dirtyPressureThreshold") {
Expand Down Expand Up @@ -3801,9 +3801,9 @@ KJ_TEST("ActorCache lru evict entry with known-empty gaps") {
KJ_ASSERT(expectCached(test.list("foo", "qux")) == kvs({{"foo", "123"}}));
KJ_ASSERT(expectCached(test.get("fooa")) == nullptr);

expectUncached(test.get("baza"));
expectUncached(test.get("corge"));
expectUncached(test.get("fo"));
(void)expectUncached(test.get("baza"));
(void)expectUncached(test.get("corge"));
(void)expectUncached(test.get("fo"));
}

KJ_TEST("ActorCache lru evict gap entry with known-empty gaps") {
Expand Down Expand Up @@ -3852,7 +3852,7 @@ KJ_TEST("ActorCache lru evict gap entry with known-empty gaps") {
}

// Okay, that gap is gone now.
expectUncached(test.get("foo+1"));
(void)expectUncached(test.get("foo+1"));
}

KJ_TEST("ActorCache lru evict entry with trailing known-empty gap (followed by END_GAP)") {
Expand Down Expand Up @@ -3900,11 +3900,11 @@ KJ_TEST("ActorCache lru evict entry with trailing known-empty gap (followed by E
KJ_ASSERT(expectCached(test.list("bar", "corge")) == kvs({{"bar", "456"}, {"baz", "789"}}));
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("corge"))) == "555");

expectUncached(test.get("corgf"));
expectUncached(test.get("foo"));
expectUncached(test.get("quw"));
expectUncached(test.get("qux"));
expectUncached(test.get("quy"));
(void)expectUncached(test.get("corgf"));
(void)expectUncached(test.get("foo"));
(void)expectUncached(test.get("quw"));
(void)expectUncached(test.get("qux"));
(void)expectUncached(test.get("quy"));
}

KJ_TEST("ActorCache timeout entry with known-empty gaps") {
Expand Down Expand Up @@ -3955,9 +3955,9 @@ KJ_TEST("ActorCache timeout entry with known-empty gaps") {
KJ_ASSERT(expectCached(test.list("foo", "qux")) == kvs({{"foo", "123"}}));
KJ_ASSERT(expectCached(test.get("fooa")) == nullptr);

expectUncached(test.get("baza"));
expectUncached(test.get("corge"));
expectUncached(test.get("fo"));
(void)expectUncached(test.get("baza"));
(void)expectUncached(test.get("corge"));
(void)expectUncached(test.get("fo"));
}


Expand Down Expand Up @@ -4025,10 +4025,10 @@ KJ_TEST("ActorCache purge everything while listing") {
kvs({{"bar", "456"}, {"baz", "789"}, {"corge", "555"}, {"foo", "123"}}));
}

expectUncached(test.get("bar"));
expectUncached(test.get("baz"));
expectUncached(test.get("corge"));
expectUncached(test.get("foo"));
(void)expectUncached(test.get("bar"));
(void)expectUncached(test.get("baz"));
(void)expectUncached(test.get("corge"));
(void)expectUncached(test.get("foo"));
}

KJ_TEST("ActorCache purge everything while listing; has previous entry") {
Expand Down Expand Up @@ -4229,12 +4229,12 @@ KJ_TEST("ActorCache skip cache") {
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("foo"))) == "qux");

// The other things that were returned weren't cached.
expectUncached(test.get("bar"));
expectUncached(test.get("baz"));
(void)expectUncached(test.get("bar"));
(void)expectUncached(test.get("baz"));

// No gaps were cached as empty either.
expectUncached(test.get("corge"));
expectUncached(test.get("grault"));
(void)expectUncached(test.get("corge"));
(void)expectUncached(test.get("grault"));

// Again, but reverse list.
{
Expand All @@ -4257,12 +4257,12 @@ KJ_TEST("ActorCache skip cache") {
KJ_ASSERT(KJ_ASSERT_NONNULL(expectCached(test.get("foo"))) == "qux");

// The other things that were returned weren't cached.
expectUncached(test.get("bar"));
expectUncached(test.get("baz"));
(void)expectUncached(test.get("bar"));
(void)expectUncached(test.get("baz"));

// No gaps were cached as empty either.
expectUncached(test.get("corge"));
expectUncached(test.get("grault"));
(void)expectUncached(test.get("corge"));
(void)expectUncached(test.get("grault"));
}

// =======================================================================================
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/io/hibernation-manager.c++
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ kj::Promise<void> HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) {
// autoResponseTimestamp on the active websocket.
(active)->setAutoResponseTimestamp(hib.autoResponseTimestamp);
}
ws.send((reqResp)->getResponse().asArray());
co_await ws.send((reqResp)->getResponse().asArray());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jqmmes what would you like to do here? Have it co_await or continue discarding until your fix is in?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK to merge it as is. I can update it later, once my PR is ready.

skip = true;
// If we've sent an auto response message, we should not unhibernate or deliver the
// received message to the actor
Expand Down
8 changes: 4 additions & 4 deletions src/workerd/io/io-gate-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ KJ_TEST("InputGate nested critical sections") {
}

// Start cs2.
cs2->wait();
cs2->wait().wait(ws);

// Can't start new tasks in cs1 until cs2 finishes.
auto cs1Wait = cs1->wait();
Expand Down Expand Up @@ -166,7 +166,7 @@ KJ_TEST("InputGate nested critical section outlives parent") {
}

// Start cs2.
cs2->wait();
cs2->wait().wait(ws);

// Mark cs1 done. (Note that, in a real program, this probably can't happen like this, because a
// lock would be taken on cs1 before marking it done, and that lock would wait for cs2 to
Expand Down Expand Up @@ -210,7 +210,7 @@ KJ_TEST("InputGate deeply nested critical sections") {
}

// Start cs2
cs2->wait();
cs2->wait().wait(ws);

// Add some waiters to cs2, some of which are waiting to start more nested critical sections
auto lock = cs2->wait().wait(ws);
Expand Down Expand Up @@ -336,7 +336,7 @@ KJ_TEST("InputGate broken") {
}

// start cs2
cs2->wait();
cs2->wait().wait(ws);

auto cs1Wait = cs1->wait();
KJ_EXPECT(!cs1Wait.poll(ws));
Expand Down
4 changes: 3 additions & 1 deletion src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2546,9 +2546,11 @@ kj::Promise<void> Server::handleDrain(kj::Promise<void> drainWhen) {
co_await drainWhen;
// Tell all HttpServers to drain. This causes them to disconnect any connections that don't
// have a request in-flight.
auto drainPromises = kj::heapArrayBuilder<kj::Promise<void>>(httpServers.size());
for (auto& httpServer: httpServers) {
httpServer.httpServer.drain();
drainPromises.add(httpServer.httpServer.drain());
}
co_await kj::joinPromisesFailFast(drainPromises.finish());
}

kj::Promise<void> Server::run(jsg::V8System& v8System, config::Config::Reader config,
Expand Down
Loading