Skip to content

Commit

Permalink
Merge pull request #961 from cloudflare/jspspike/scheduled-event
Browse files Browse the repository at this point in the history
  • Loading branch information
mrbbot authored Aug 3, 2023
2 parents 2fbefba + a63a409 commit f0ea846
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 6 deletions.
34 changes: 34 additions & 0 deletions src/workerd/api/http-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import assert from "node:assert";

let scheduledLastCtrl;

export default {
async scheduled(ctrl, env, ctx) {
scheduledLastCtrl = ctrl;
if (ctrl.cron === "* * * * 30") ctrl.noRetry();
},

async test(ctrl, env, ctx) {
// Call `scheduled()` with no options
{
const result = await env.SERVICE.scheduled();
assert.strictEqual(result.outcome, "ok");
assert(!result.noRetry);
assert(Math.abs(Date.now() - scheduledLastCtrl.scheduledTime) < 3_000);
assert.strictEqual(scheduledLastCtrl.cron, "");
}

// Call `scheduled()` with options, and noRetry()
{
const result = await env.SERVICE.scheduled({ scheduledTime: 1000, cron: "* * * * 30" });
assert.strictEqual(result.outcome, "ok");
assert(result.noRetry);
assert.strictEqual(scheduledLastCtrl.scheduledTime, 1000);
assert.strictEqual(scheduledLastCtrl.cron, "* * * * 30");
}
}
}
18 changes: 18 additions & 0 deletions src/workerd/api/http-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "http-test",
worker = (
modules = [
( name = "worker", esModule = embed "http-test.js" )
],
bindings = [
( name = "SERVICE", service = "http-test" )
],
compatibilityDate = "2023-08-01",
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"],
)
),
],
);
32 changes: 29 additions & 3 deletions src/workerd/api/http.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2015,7 +2015,7 @@ jsg::Promise<void> Fetcher::delete_(jsg::Lock& js, kj::String url) {
return throwOnError(js, "DELETE", fetchImpl(js, JSG_THIS, kj::mv(url), kj::mv(subInit)));
}

jsg::Promise<QueueResponse> Fetcher::queue(
jsg::Promise<Fetcher::QueueResult> Fetcher::queue(
jsg::Lock& js, kj::String queueName, kj::Array<ServiceBindingQueueMessage> messages) {
auto& ioContext = IoContext::current();
auto worker = getClient(ioContext, nullptr, "queue"_kjc);
Expand All @@ -2042,8 +2042,8 @@ jsg::Promise<QueueResponse> Fetcher::queue(
auto eventRef = kj::addRef(*event); // attempt to work around windows-specific null pointer deref.
return ioContext.awaitIo(js, worker->customEvent(kj::mv(eventRef)).attach(kj::mv(worker)),
[event=kj::mv(event)](jsg::Lock& js, WorkerInterface::CustomEvent::Result result) {
return QueueResponse{
.outcome=static_cast<uint16_t>(result.outcome),
return Fetcher::QueueResult{
.outcome=kj::str(result.outcome),
.retryAll=event->getRetryAll(),
.ackAll=event->getAckAll(),
.explicitRetries=event->getExplicitRetries(),
Expand All @@ -2052,6 +2052,32 @@ jsg::Promise<QueueResponse> Fetcher::queue(
});
}

jsg::Promise<Fetcher::ScheduledResult> Fetcher::scheduled(
jsg::Lock& js, jsg::Optional<ScheduledOptions> options) {
auto& ioContext = IoContext::current();
auto worker = getClient(ioContext, nullptr, "scheduled"_kjc);

auto scheduledTime = ioContext.now();
auto cron = kj::String();
KJ_IF_MAYBE(o, options) {
KJ_IF_MAYBE(t, o->scheduledTime) {
scheduledTime = *t;
}
KJ_IF_MAYBE(c, o->cron) {
cron = kj::mv(*c);
}
}

return ioContext.awaitIo(js, worker->runScheduled(scheduledTime, cron)
.attach(kj::mv(worker), kj::mv(cron)),
[](jsg::Lock& js, WorkerInterface::ScheduledResult result) {
return Fetcher::ScheduledResult{
.outcome=kj::str(result.outcome),
.noRetry=!result.retry,
};
});
}

kj::Own<WorkerInterface> Fetcher::getClient(
IoContext& ioContext, kj::Maybe<kj::String> cfStr, kj::ConstString operationName) {
KJ_SWITCH_ONEOF(channelOrClientFactory) {
Expand Down
32 changes: 31 additions & 1 deletion src/workerd/api/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,15 +468,42 @@ class Fetcher: public jsg::Object {
});
};

jsg::Promise<QueueResponse> queue(
struct QueueResult {
kj::String outcome;
bool retryAll;
bool ackAll;
kj::Array<kj::String> explicitRetries;
kj::Array<kj::String> explicitAcks;

JSG_STRUCT(outcome, retryAll, ackAll, explicitRetries, explicitAcks);
};

jsg::Promise<QueueResult> queue(
jsg::Lock& js, kj::String queueName, kj::Array<ServiceBindingQueueMessage> messages);

struct ScheduledOptions {
jsg::Optional<kj::Date> scheduledTime;
jsg::Optional<kj::String> cron;

JSG_STRUCT(scheduledTime, cron);
};

struct ScheduledResult {
kj::String outcome;
bool noRetry;

JSG_STRUCT(outcome, noRetry);
};

jsg::Promise<ScheduledResult> scheduled(jsg::Lock& js, jsg::Optional<ScheduledOptions> options);

JSG_RESOURCE_TYPE(Fetcher, CompatibilityFlags::Reader flags) {
JSG_METHOD(fetch);
JSG_METHOD(connect);

if (flags.getServiceBindingExtraHandlers()) {
JSG_METHOD(queue);
JSG_METHOD(scheduled);
}

JSG_METHOD(get);
Expand Down Expand Up @@ -1075,6 +1102,9 @@ kj::String makeRandomBoundaryCharacters();
api::Request::InitializerDict, \
api::Fetcher, \
api::Fetcher::PutOptions, \
api::Fetcher::ScheduledOptions, \
api::Fetcher::ScheduledResult, \
api::Fetcher::QueueResult, \
api::Fetcher::ServiceBindingQueueMessage

// The list of http.h types that are added to worker.c++'s JSG_DECLARE_ISOLATE_TYPE
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/queue-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export default {
{ id: "#2", timestamp, body: { c: { d: 10 } } },
{ id: "#3", timestamp, body: timestamp },
]);
assert.strictEqual(response.outcome, /* OK */ 1);
assert.strictEqual(response.outcome, "ok");
assert(!response.retryAll);
assert(response.ackAll);
assert.deepStrictEqual(response.explicitRetries, ["#2"]);
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/server/server-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,7 @@ KJ_TEST("Server: call queue handler on service binding") {
test.server.allowExperimental();
test.start();
auto conn = test.connect("test-addr");
conn.httpGet200("/", "queue outcome: 1, ackAll: true");
conn.httpGet200("/", "queue outcome: ok, ackAll: true");
}

KJ_TEST("Server: Durable Objects (in memory)") {
Expand Down

0 comments on commit f0ea846

Please sign in to comment.