Skip to content

Commit

Permalink
Add jwt to R2 rpc, introduce R2CrossAccount
Browse files Browse the repository at this point in the history
To allow greater flexibility in how R2 RPC operations can be performed,
introducing the jwt as optional attribute. Also refactored the way
R2 paths are built and passed to the doR2HTTP<verb>Request functions,
since we expect to support greater variety of paths.

Introduced concept of R2CrossAccount, which will leverage the new
underlying flexibility to perform cross account operations with proper
auth.
  • Loading branch information
OilyLime committed May 1, 2023
1 parent be93707 commit 7f461b2
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 28 deletions.
23 changes: 15 additions & 8 deletions src/workerd/api/r2-admin.c++
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@

namespace workerd::api::public_beta {
jsg::Ref<R2Bucket> R2Admin::get(jsg::Lock& js, kj::String bucketName) {
return jsg::alloc<R2Bucket>(featureFlags, subrequestChannel, kj::mv(bucketName),
R2Bucket::friend_tag_t{});
KJ_IF_MAYBE(a, this->adminAccount) {
KJ_IF_MAYBE(j, this->jwt) {
return jsg::alloc<R2Bucket>(featureFlags, subrequestChannel, kj::mv(bucketName),
kj::mv(*a), kj::mv(*j), R2Bucket::friend_tag_t{});
} else {
KJ_FAIL_ASSERT("adminAccount without corresponding jwt");
}
}
return jsg::alloc<R2Bucket>(featureFlags, subrequestChannel, kj::mv(bucketName), R2Bucket::friend_tag_t{});
}

jsg::Promise<jsg::Ref<R2Bucket>> R2Admin::create(jsg::Lock& js, kj::String name,
Expand All @@ -33,9 +40,9 @@ jsg::Promise<jsg::Ref<R2Bucket>> R2Admin::create(jsg::Lock& js, kj::String name,
createBucketBuilder.setBucket(name);

auto requestJson = json.encode(requestBuilder);

auto jsonWebToken = jwt.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPPutRequest(js, kj::mv(client), nullptr, nullptr,
kj::mv(requestJson), nullptr);
kj::mv(requestJson), kj::ArrayPtr<kj::StringPtr>(), kj::mv(jsonWebToken));

return context.awaitIo(kj::mv(promise),
[this, subrequestChannel = subrequestChannel, name = kj::mv(name), &errorType]
Expand Down Expand Up @@ -72,8 +79,8 @@ jsg::Promise<R2Admin::ListResult> R2Admin::list(jsg::Lock& js,
}

auto requestJson = json.encode(requestBuilder);

auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), nullptr);
auto jsonWebToken = jwt.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), nullptr, kj::mv(jsonWebToken));

return context.awaitIo(js, kj::mv(promise),
[this, &retrievedBucketType, &errorType](jsg::Lock& js, R2Result r2Result) mutable {
Expand Down Expand Up @@ -126,9 +133,9 @@ jsg::Promise<void> R2Admin::delete_(jsg::Lock& js, kj::String name,
deleteBucketBuilder.setBucket(name);

auto requestJson = json.encode(requestBuilder);

auto jsonWebToken = jwt.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPPutRequest(js, kj::mv(client), nullptr, nullptr,
kj::mv(requestJson), nullptr);
kj::mv(requestJson), kj::ArrayPtr<kj::StringPtr>(), kj::mv(jsonWebToken));

return context.awaitIo(kj::mv(promise), [&errorType](R2Result r2Result) mutable {
r2Result.throwIfError("deleteBucket", errorType);
Expand Down
22 changes: 22 additions & 0 deletions src/workerd/api/r2-admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,17 @@
#include <workerd/jsg/jsg.h>
#include <workerd/api/http.h>

namespace edgeworker::api {
class R2CrossAccount;
}

namespace workerd::api::public_beta {

class R2Admin: public jsg::Object {
//

struct friend_tag_t {};

// A capability to an R2 Admin interface.

struct FeatureFlags: public R2Bucket::FeatureFlags {
Expand All @@ -23,6 +31,16 @@ class R2Admin: public jsg::Object {
// `subrequestChannel` is what to pass to IoContext::getHttpClient() to get an HttpClient
// representing this namespace.

R2Admin(FeatureFlags featureFlags,
uint subrequestChannel,
kj::String account,
kj::String jwt,
friend_tag_t)
: featureFlags(featureFlags),
subrequestChannel(subrequestChannel),
adminAccount(kj::mv(account)),
jwt(kj::mv(jwt)) {}

struct ListOptions {
jsg::Optional<int> limit;
jsg::Optional<kj::String> cursor;
Expand Down Expand Up @@ -80,6 +98,10 @@ class R2Admin: public jsg::Object {
private:
R2Bucket::FeatureFlags featureFlags;
uint subrequestChannel;
kj::Maybe<kj::String> adminAccount;
kj::Maybe<kj::String> jwt;

friend class edgeworker::api::R2CrossAccount;
};

#define EW_R2_PUBLIC_BETA_ADMIN_ISOLATE_TYPES \
Expand Down
64 changes: 57 additions & 7 deletions src/workerd/api/r2-bucket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,12 @@ jsg::Promise<kj::Maybe<jsg::Ref<R2Bucket::HeadResult>>> R2Bucket::head(
auto requestJson = json.encode(requestBuilder);

auto bucket = adminBucket.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), kj::mv(bucket));
kj::Vector<kj::StringPtr> path;
KJ_IF_MAYBE(b, bucket) {
path.add(*b);
}
auto jsonWebToken = jwt.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), path.asPtr(), kj::mv(jsonWebToken));

return context.awaitIo(kj::mv(promise), [&errorType](R2Result r2Result) {
return parseObjectMetadata<HeadResult>("head", r2Result, errorType);
Expand Down Expand Up @@ -335,7 +340,12 @@ R2Bucket::get(jsg::Lock& js, kj::String name, jsg::Optional<GetOptions> options,
auto requestJson = json.encode(requestBuilder);

auto bucket = adminBucket.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), kj::mv(bucket));
kj::Vector<kj::StringPtr> path;
KJ_IF_MAYBE(b, bucket) {
path.add(*b);
}
auto jsonWebToken = jwt.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), path.asPtr(), kj::mv(jsonWebToken));

return context.awaitIo(kj::mv(promise), [&context, &errorType](R2Result r2Result)
-> kj::OneOf<kj::Maybe<jsg::Ref<GetResult>>, jsg::Ref<HeadResult>> {
Expand Down Expand Up @@ -521,11 +531,23 @@ R2Bucket::put(jsg::Lock& js, kj::String name, kj::Maybe<R2PutValue> value,
}

auto requestJson = json.encode(requestBuilder);
auto account = adminAccount.map([](auto&& s) { return kj::str(s); });
auto bucket = adminBucket.map([](auto&& s) { return kj::str(s); });

kj::Vector<kj::StringPtr> path;
KJ_IF_MAYBE(a, account) {
path.add(*a);
// When account is nonnull for a bucket operation the bucket must also be nonnull
KJ_ASSERT_NONNULL(bucket);
}
KJ_IF_MAYBE(b, bucket) {
path.add(*b);
}

cancelReader.cancel();
auto jsonWebToken = jwt.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPPutRequest(js, kj::mv(client), kj::mv(value), nullptr,
kj::mv(requestJson), kj::mv(bucket));
kj::mv(requestJson), path.asPtr(), kj::mv(jsonWebToken));

return context.awaitIo(js, kj::mv(promise),
[sentHttpMetadata = kj::mv(sentHttpMetadata),
Expand Down Expand Up @@ -608,10 +630,22 @@ jsg::Promise<jsg::Ref<R2MultipartUpload>> R2Bucket::createMultipartUpload(jsg::L
}

auto requestJson = json.encode(requestBuilder);
auto account = adminAccount.map([](auto&& s) { return kj::str(s); });
auto bucket = adminBucket.map([](auto&& s) { return kj::str(s); });

kj::Vector<kj::StringPtr> path;
KJ_IF_MAYBE(a, account) {
path.add(*a);
// When account is nonnull for a bucket operation the bucket must also be nonnull
KJ_ASSERT_NONNULL(bucket);
}
KJ_IF_MAYBE(b, bucket) {
path.add(*b);
}

auto jsonWebToken = jwt.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPPutRequest(js, kj::mv(client), nullptr, nullptr, kj::mv(requestJson),
kj::mv(bucket));
path.asPtr(), kj::mv(jsonWebToken));

return context.awaitIo(js, kj::mv(promise),
[&errorType, key=kj::mv(key), this] (jsg::Lock& js, R2Result r2Result) mutable {
Expand Down Expand Up @@ -661,11 +695,22 @@ jsg::Promise<void> R2Bucket::delete_(jsg::Lock& js, kj::OneOf<kj::String, kj::Ar
}

auto requestJson = json.encode(requestBuilder);

auto account = adminAccount.map([](auto&& s) { return kj::str(s); });
auto bucket = adminBucket.map([](auto&& s) { return kj::str(s); });

kj::Vector<kj::StringPtr> path;
KJ_IF_MAYBE(a, account) {
path.add(*a);
// When account is nonnull for a bucket operation the bucket must also be nonnull
KJ_ASSERT_NONNULL(bucket);
}
KJ_IF_MAYBE(b, bucket) {
path.add(*b);
}

auto jsonWebToken = jwt.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPPutRequest(js, kj::mv(client), nullptr, nullptr, kj::mv(requestJson),
kj::mv(bucket));
path.asPtr(), kj::mv(jsonWebToken));

return context.awaitIo(js, kj::mv(promise), [&errorType](jsg::Lock& js, R2Result r) {
if (r.objectNotFound()) {
Expand Down Expand Up @@ -764,7 +809,12 @@ jsg::Promise<R2Bucket::ListResult> R2Bucket::list(
auto requestJson = json.encode(requestBuilder);

auto bucket = adminBucket.map([](auto&& s) { return kj::str(s); });
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), kj::mv(bucket));
kj::Vector<kj::StringPtr> path;
KJ_IF_MAYBE(b, bucket) {
path.add(*b);
}
auto jsonWebToken = jwt.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), path.asPtr(), kj::mv(jsonWebToken));

return context.awaitIo(kj::mv(promise),
[expectedOptionalFields = expectedOptionalFields.releaseAsArray(), &errorType]
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/api/r2-bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class R2Bucket: public jsg::Object {
explicit R2Bucket(FeatureFlags featureFlags, uint clientIndex, kj::String bucket, friend_tag_t)
: featureFlags(featureFlags), clientIndex(clientIndex), adminBucket(kj::mv(bucket)) {}

explicit R2Bucket(FeatureFlags featureFlags, uint clientIndex, kj::String bucket, kj::String account, kj::String jwt, friend_tag_t)
: featureFlags(featureFlags), clientIndex(clientIndex), adminBucket(kj::mv(bucket)), adminAccount(kj::mv(account)), jwt(kj::mv(jwt)) {}

struct Range {
jsg::Optional<double> offset;
jsg::Optional<double> length;
Expand Down Expand Up @@ -394,6 +397,8 @@ class R2Bucket: public jsg::Object {
FeatureFlags featureFlags;
uint clientIndex;
kj::Maybe<kj::String> adminBucket;
kj::Maybe<kj::String> adminAccount;
kj::Maybe<kj::String> jwt;

friend class R2Admin;
friend class R2MultipartUpload;
Expand Down
20 changes: 15 additions & 5 deletions src/workerd/api/r2-multipart.c++
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ jsg::Promise<R2MultipartUpload::UploadedPart> R2MultipartUpload::uploadPart(
auto requestJson = json.encode(requestBuilder);
auto bucket = this->bucket->adminBucket.map([](auto&& s) { return kj::str(s); });

kj::Vector<kj::StringPtr> path;
KJ_IF_MAYBE(b, bucket) {
path.add(*b);
}
auto promise = doR2HTTPPutRequest(js, kj::mv(client), kj::mv(value), nullptr,
kj::mv(requestJson), kj::mv(bucket));
kj::mv(requestJson), path.asPtr(), nullptr);

return context.awaitIo(js, kj::mv(promise),
[&errorType, partNumber]
Expand Down Expand Up @@ -99,9 +103,12 @@ jsg::Promise<jsg::Ref<R2Bucket::HeadResult>> R2MultipartUpload::complete(
auto requestJson = json.encode(requestBuilder);

auto bucket = this->bucket->adminBucket.map([](auto&& s) { return kj::str(s); });

kj::Vector<kj::StringPtr> path;
KJ_IF_MAYBE(b, bucket) {
path.add(*b);
}
auto promise = doR2HTTPPutRequest(js, kj::mv(client), nullptr, nullptr, kj::mv(requestJson),
kj::mv(bucket));
path.asPtr(), nullptr);

return context.awaitIo(js, kj::mv(promise),
[&errorType]
Expand Down Expand Up @@ -135,9 +142,12 @@ jsg::Promise<void> R2MultipartUpload::abort(jsg::Lock& js, const jsg::TypeHandle
auto requestJson = json.encode(requestBuilder);

auto bucket = this->bucket->adminBucket.map([](auto&& s) { return kj::str(s); });

kj::Vector<kj::StringPtr> path;
KJ_IF_MAYBE(b, bucket) {
path.add(*b);
}
auto promise = doR2HTTPPutRequest(js, kj::mv(client), nullptr, nullptr, kj::mv(requestJson),
kj::mv(bucket));
path.asPtr(), nullptr);

return context.awaitIo(js, kj::mv(promise), [&errorType](jsg::Lock& js, R2Result r) {
if (r.objectNotFound()) {
Expand Down
18 changes: 12 additions & 6 deletions src/workerd/api/r2-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,22 @@ void R2Result::throwIfError(kj::StringPtr action,
}

kj::Promise<R2Result> doR2HTTPGetRequest(kj::Own<kj::HttpClient> client,
kj::String metadataPayload, kj::Maybe<kj::String> path) {
kj::String metadataPayload, kj::ArrayPtr<kj::StringPtr> path, kj::Maybe<kj::String> jwt) {
auto& context = IoContext::current();
kj::Url url;
url.scheme = kj::str("https");
url.host = kj::str("fake-host");
KJ_IF_MAYBE(p, path) {
url.path.add(kj::mv(*p));
for (const auto &p : path) {
url.path.add(kj::str(p));
}

auto& headerIds = context.getHeaderIds();

auto requestHeaders = kj::HttpHeaders(context.getHeaderTable());
requestHeaders.set(headerIds.cfBlobRequest, kj::mv(metadataPayload));
KJ_IF_MAYBE(j, jwt) {
requestHeaders.set(headerIds.authorization, kj::str("Bearer ", *j));
}
return client->request(
kj::HttpMethod::GET, url.toString(kj::Url::Context::HTTP_PROXY_REQUEST),
requestHeaders)
Expand Down Expand Up @@ -130,16 +133,16 @@ kj::Promise<R2Result> doR2HTTPGetRequest(kj::Own<kj::HttpClient> client,

kj::Promise<R2Result> doR2HTTPPutRequest(jsg::Lock& js, kj::Own<kj::HttpClient> client,
kj::Maybe<R2PutValue> supportedBody, kj::Maybe<uint64_t> streamSize, kj::String metadataPayload,
kj::Maybe<kj::String> path) {
kj::ArrayPtr<kj::StringPtr> path, kj::Maybe<kj::String> jwt) {
// NOTE: A lot of code here is duplicated with kv.c++. Maybe it can be refactored to be more
// reusable?
auto& context = IoContext::current();
auto headers = kj::HttpHeaders(context.getHeaderTable());
kj::Url url;
url.scheme = kj::str("https");
url.host = kj::str("fake-host");
KJ_IF_MAYBE(p, path) {
url.path.add(kj::mv(*p));
for (const auto &p : path) {
url.path.add(kj::str(p));
}

kj::Maybe<uint64_t> expectedBodySize;
Expand Down Expand Up @@ -177,6 +180,9 @@ kj::Promise<R2Result> doR2HTTPPutRequest(jsg::Lock& js, kj::Own<kj::HttpClient>
}

headers.set(context.getHeaderIds().cfBlobMetadataSize, kj::str(metadataPayload.size()));
KJ_IF_MAYBE(j, jwt) {
headers.set(context.getHeaderIds().authorization, kj::str("Bearer ", *j));
}

auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

Expand Down
6 changes: 4 additions & 2 deletions src/workerd/api/r2-rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ struct R2Result {
kj::Promise<R2Result> doR2HTTPGetRequest(
kj::Own<kj::HttpClient> client,
kj::String metadataPayload,
kj::Maybe<kj::String> path);
kj::ArrayPtr<kj::StringPtr> path,
kj::Maybe<kj::String> jwt);

kj::Promise<R2Result> doR2HTTPPutRequest(
jsg::Lock& js,
Expand All @@ -82,6 +83,7 @@ kj::Promise<R2Result> doR2HTTPPutRequest(
kj::Maybe<uint64_t> streamSize,
// Deprecated. For internal beta API only.
kj::String metadataPayload,
kj::Maybe<kj::String> path);
kj::ArrayPtr<kj::StringPtr> path,
kj::Maybe<kj::String> jwt);

} // namespace workerd::api
1 change: 1 addition & 0 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ ThreadContext::HeaderIdBundle::HeaderIdBundle(kj::HttpHeaderTable::Builder& buil
cfR2ErrorHeader(builder.add("CF-R2-Error")),
cfBlobMetadataSize(builder.add("CF-R2-Metadata-Size")),
cfBlobRequest(builder.add("CF-R2-Request")),
authorization(builder.add("Authorization")),
secWebSocketProtocol(builder.add("Sec-WebSocket-Protocol")) {}

ThreadContext::ThreadContext(
Expand Down
1 change: 1 addition & 0 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class ThreadContext {
const kj::HttpHeaderId cfR2ErrorHeader; // used by R2 binding implementation
const kj::HttpHeaderId cfBlobMetadataSize; // used by R2 binding implementation
const kj::HttpHeaderId cfBlobRequest; // used by R2 binding implementation
const kj::HttpHeaderId authorization; // used by R2 binding implementation
const kj::HttpHeaderId secWebSocketProtocol;
};

Expand Down

0 comments on commit 7f461b2

Please sign in to comment.