Skip to content

Commit

Permalink
Update the MemoryCache impl
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Mar 1, 2024
1 parent 8dfa936 commit 1ee1e0f
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 86 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
159 changes: 109 additions & 50 deletions src/workerd/api/memory-cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <workerd/jsg/ser.h>
#include <workerd/io/io-context.h>
#include <workerd/api/util.h>
#include <workerd/util/weak-refs.h>

namespace workerd::api {

Expand Down Expand Up @@ -34,6 +35,21 @@ static bool hasExpired(const kj::Maybe<double>& expiration, bool allowOutsideIoC
return false;
}

SharedMemoryCache::SharedMemoryCache(
kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler)
: data(),
provider(provider),
id(kj::str(id)),
additionalResizeMemoryLimitHandler(additionalResizeMemoryLimitHandler) {}

SharedMemoryCache::~SharedMemoryCache() noexcept(false) {
KJ_IF_SOME(p, provider) {
p.removeInstance(*this);
}
}

void SharedMemoryCache::suggest(const Limits& limits) {
auto data = this->data.lockExclusive();
bool isKnownLimit = data->suggestedLimits.find(limits) != data->suggestedLimits.end();
Expand Down Expand Up @@ -198,15 +214,41 @@ void SharedMemoryCache::removeIfExistsWhileLocked(ThreadUnsafeData& data, const
}
}

kj::Maybe<kj::Own<SharedMemoryCache>> SharedMemoryCache::tryAddRef() {
return kj::atomicAddRefWeak(*this);
}

kj::Own<SharedMemoryCache> SharedMemoryCache::create(
kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> handler) {
return kj::atomicRefcounted<SharedMemoryCache>(provider, id, handler);
}

SharedMemoryCache::Use::Use(kj::Own<SharedMemoryCache> cache, const Limits& limits)
: cache(kj::mv(cache)), limits(limits) {
this->cache->suggest(limits);
}

SharedMemoryCache::Use::Use(Use&& other): cache(kj::mv(other.cache)), limits(other.limits) {
this->cache->suggest(limits);
}

SharedMemoryCache::Use::~Use() noexcept(false) {
if (cache.get() != nullptr) {
cache->unsuggest(limits);
}
}

kj::Maybe<kj::Own<CacheValue>> SharedMemoryCache::Use::getWithoutFallback(const kj::String& key) {
auto data = cache.data.lockExclusive();
return cache.getWhileLocked(*data, key);
auto data = cache->data.lockExclusive();
return cache->getWhileLocked(*data, key);
}

kj::OneOf<kj::Own<CacheValue>, kj::Promise<SharedMemoryCache::Use::GetWithFallbackOutcome>>
SharedMemoryCache::Use::getWithFallback(const kj::String& key) {
auto data = cache.data.lockExclusive();
KJ_IF_SOME(existingValue, cache.getWhileLocked(*data, key)) {
auto data = cache->data.lockExclusive();
KJ_IF_SOME(existingValue, cache->getWhileLocked(*data, key)) {
return kj::mv(existingValue);
} else KJ_IF_SOME(existingInProgress, data->inProgress.find(key)) {
// We return a Promise, but we keep the fulfiller. We might fulfill it
Expand Down Expand Up @@ -250,8 +292,8 @@ SharedMemoryCache::Use::FallbackDoneCallback SharedMemoryCache::Use::prepareFall
// The fallback succeeded. Store the value in the cache and propagate it to
// all waiting requests, even if it has expired already.
status.hasSettled = true;
auto data = cache.data.lockExclusive();
cache.putWhileLocked(
auto data = cache->data.lockExclusive();
cache->putWhileLocked(
*data, kj::str(inProgress.key), kj::atomicAddRef(*result.value), result.expiration);
for (auto& waiter: inProgress.waiting) {
waiter.fulfiller->fulfill(kj::atomicAddRef(*result.value));
Expand All @@ -273,7 +315,7 @@ void SharedMemoryCache::Use::handleFallbackFailure(InProgress& inProgress) {
// If there is another queued fallback, retrieve it and remove it from the
// queue. Otherwise, just delete the queue entirely.
{
auto data = cache.data.lockExclusive();
auto data = cache->data.lockExclusive();
auto next = inProgress.waiting.begin();
if (next != inProgress.waiting.end()) {
nextFulfiller = kj::mv(next->fulfiller);
Expand Down Expand Up @@ -389,54 +431,71 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> MemoryCache::read(jsg::Lock& js,

// ======================================================================================

namespace {
// Data structure that maps unique cache identifiers to cache instances.
// This allows separate isolates to access the same in-memory caches.
class MemoryCacheMap: public MemoryCacheProvider {
public:
MemoryCacheMap(
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler = kj::none)
: additionalResizeMemoryLimitHandler(kj::mv(additionalResizeMemoryLimitHandler)) {}
KJ_DISALLOW_COPY_AND_MOVE(MemoryCacheMap);

// Gets an existing SharedMemoryCache instance or creates a new one if no
// cache with the given id exists.
SharedMemoryCache& getInstance(kj::StringPtr cacheId, uint32_t ownerId) const override;

private:
using HashMap = kj::HashMap<kj::String, kj::Own<SharedMemoryCache>>;

kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler;

// All existing in-memory caches.
kj::MutexGuarded<HashMap> caches;
// TODO(later): consider using a kj::Table with a HashIndex that uses
// SharedMemoryCache::uuid() instead.
};

SharedMemoryCache& MemoryCacheMap::getInstance(
kj::StringPtr cacheId, uint32_t ownerId) const {
auto lock = caches.lockExclusive();
auto id = kj::str(cacheId, "::", ownerId);
return *lock->findOrCreate(id, [this, &id]() {
MemoryCacheProvider::MemoryCacheProvider(
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler)
: additionalResizeMemoryLimitHandler(kj::mv(additionalResizeMemoryLimitHandler)) {}

MemoryCacheProvider::~MemoryCacheProvider() noexcept(false) {
// TODO(cleanup): Later, assuming progress is made on kj::Ptr<T>, we ought to be able
// to remove this. For now we just need to make sure that the MemoryCacheProvider instance
// outlives any SharedMemoryCache instances that are referencing it.
KJ_REQUIRE(caches.lockShared()->size() == 0,
"There are still active SharedMemoryCache instances. Use-after-free errors are likely.");
}

kj::Own<SharedMemoryCache> MemoryCacheProvider::getInstance(
kj::Maybe<kj::StringPtr> cacheId) const {

const auto makeCache = [this](kj::Maybe<const MemoryCacheProvider&> provider, kj::StringPtr id) {
// The cache doesn't exist in the map. Let's create it.
auto handler = additionalResizeMemoryLimitHandler.map([](
const SharedMemoryCache::AdditionalResizeMemoryLimitHandler& handler)
-> SharedMemoryCache::AdditionalResizeMemoryLimitHandler& {
const SharedMemoryCache::AdditionalResizeMemoryLimitHandler& handler)
-> SharedMemoryCache::AdditionalResizeMemoryLimitHandler& {
return const_cast<SharedMemoryCache::AdditionalResizeMemoryLimitHandler&>(handler);
});
return HashMap::Entry{
kj::str(id),
kj::heap<SharedMemoryCache>(id, handler)
};
});
return SharedMemoryCache::create(provider, id, handler);
};

KJ_IF_SOME(cid, cacheId) {
auto lock = caches.lockExclusive();

// First, let's see if the cache already exists. If it does, we'll just return
// a strong reference to it.
KJ_IF_SOME(found, lock->find(cid)) {
KJ_IF_SOME(ref, found->tryAddRef()) {
return kj::mv(ref);
} else {
// We found an entry in the map, but atomicAddRefWeak failed. Doh. We have
// to replace the map entry with a new cache instance.
auto cache = makeCache(kj::Maybe<const MemoryCacheProvider&>(*this), cid);
lock->upsert(kj::str(cid), cache.get());
return kj::mv(cache);
}
}

// The cache doesn't exist, let's create it and add it to the map
auto cache = makeCache(kj::Maybe<const MemoryCacheProvider&>(*this), cid);
lock->insert(kj::str(cid), cache.get());
return kj::mv(cache);
}

// Since we don't have a cache id, we'll just create a new cache and return it.
return makeCache(kj::none, nullptr);
}
} // namespace

kj::Own<MemoryCacheProvider> MemoryCacheProvider::createDefault(
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler> additionalResizeMemoryLimitHandler) {
return kj::heap<MemoryCacheMap>(kj::mv(additionalResizeMemoryLimitHandler));
void MemoryCacheProvider::removeInstance(SharedMemoryCache& instance) const {
// This is fun. We have to make sure that the instance to be removed is actually
// what we expect it to be.
auto lock = caches.lockExclusive();
KJ_IF_SOME(found, lock->find(instance.getId())) {
if (found != &instance) {
// Not the instance we expected it to be. Cache instance was likely replaced
// by a new instance with the same id. Do nothing.
return;
}
}
KJ_ASSERT(lock->erase(instance.getId()));
}

} // namespace workerd::api
111 changes: 86 additions & 25 deletions src/workerd/api/memory-cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,26 @@

namespace workerd::api {

// The MemoryCache mechanism is an in-process, memory-resident data cache that
// can be configured for workers. A single cache instance can be unique to an
// individual worker or shared across multiple workers / isolates.
//
// Instances are configured as bindings on the worker (set up in the workers
// configuration) and accessible via the environment bindings passed into the
// worker handler functions:
//
// async fetch(req, env) {
// await env.MY_CACHE.read('key', () => {
// // Called if the 'key' does not exist in the cache
// return 'new value';
// });
// }
//
// The cache is only capable of storing values that are v8 serializable (so
// JS primitives other than Symbol, ordinary JavaScript objects but not class
// instances, etc). Objects that represent i/o (like streams or promises are
// explicitly not supported.

struct CacheValue: kj::AtomicRefcounted {
CacheValue(kj::Array<kj::byte>&& bytes): bytes(kj::mv(bytes)) {}

Expand Down Expand Up @@ -55,14 +75,21 @@ struct CacheValueProduceResult {
JSG_STRUCT(value, expiration);
};

class MemoryCacheProvider;

// An in-memory cache that can be accessed by any number of workers/isolates
// within the same process.
class SharedMemoryCache {
// TODO(soon): We plan to explore replacing this implementation with a memcached-based
// implementation in the near future. The memcached-based impl would likely be
// fairly different from this implementation so quite a few of the details here
// are expected to change.
class SharedMemoryCache : public kj::AtomicRefcounted {
private:
struct InProgress;
struct ThreadUnsafeData;

public:
struct ThreadUnsafeData;

struct Limits {
// The maximum number of keys that may exist within the cache at the same
// time. The cache size grows at least linearly in the number of entries.
Expand Down Expand Up @@ -116,16 +143,21 @@ class SharedMemoryCache {

using AdditionalResizeMemoryLimitHandler = kj::Function<void(ThreadUnsafeData&)>;

SharedMemoryCache(const kj::StringPtr& uuid,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler)
: data(),
additionalResizeMemoryLimitHandler(additionalResizeMemoryLimitHandler),
uuid_(kj::str(uuid)),
instanceId_(randomUUID(kj::none)) {}
SharedMemoryCache(
kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler);

inline kj::StringPtr uuid() { return uuid_; }
~SharedMemoryCache() noexcept(false);

inline kj::StringPtr instanceId() { return instanceId_; }
kj::StringPtr getId() const { return id; }

static kj::Own<SharedMemoryCache> create(
kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler);

kj::Maybe<kj::Own<SharedMemoryCache>> tryAddRef();

public:
// RAII class that attaches itself to a cache, suggests cache limits to the
Expand All @@ -134,11 +166,9 @@ class SharedMemoryCache {
public:
KJ_DISALLOW_COPY(Use);

Use(SharedMemoryCache& cache, const Limits& limits): cache(cache), limits(limits) {
cache.suggest(limits);
}
Use(Use&& other): cache(other.cache), limits(other.limits) { cache.suggest(limits); }
~Use() noexcept(false) { cache.unsuggest(limits); }
Use(kj::Own<SharedMemoryCache> cache, const Limits& limits);
Use(Use&& other);
~Use() noexcept(false);

// Returns a cached value for the given key if one exists (and has not
// expired). If no such value exists, nothing is returned, regardless of any
Expand Down Expand Up @@ -176,7 +206,7 @@ class SharedMemoryCache {
// erased.
void handleFallbackFailure(InProgress& inProgress);

SharedMemoryCache& cache;
kj::Own<SharedMemoryCache> cache;
Limits limits;
};

Expand Down Expand Up @@ -348,6 +378,7 @@ class SharedMemoryCache {
}
};

public:
struct ThreadUnsafeData {
KJ_DISALLOW_COPY_AND_MOVE(ThreadUnsafeData);

Expand Down Expand Up @@ -393,17 +424,28 @@ class SharedMemoryCache {
kj::Table<kj::Own<InProgress>, kj::HashIndex<InProgress::KeyCallbacks>> inProgress;
};

private:

// To ensure thread-safety, all mutable data is guarded by a mutex. Each cache
// operation requires an exclusive lock. Even read-only operations need to
// update the liveliness of cache entries, which currently requires a lock.
kj::MutexGuarded<ThreadUnsafeData> data;
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler;

// A unique identifier associated with this cache.
const kj::String uuid_;
// The MemoryCacheProvider instance needs to be guaranteed to outlive the SharedMemoryCache
// instance. When the SharedMemoryCache is destroyed, it will remove itself from the provider.
// TODO(cleanup): Eventually, assuming/once the kj::Ptr<T> work progresses, it would be safer
// to replace this with a kj::Ptr<MemoryCacheProvider>
kj::Maybe<const MemoryCacheProvider&> provider;

// It's a bit unfortunate that we need to keep a copy of the id here as well as in the map
// in the MemoryCacheProvider, however, it's entirely possible (at least theoretically) that
// the map entry in the MemoryCacheProvider could be removed before the SharedMemoryCache is
// fully destroyed, leaving a dangling reference. This be safe and keep a copy.
kj::String id;

// Uniquely identifies this instance of this cache.
const kj::String instanceId_;
// Same as above, the MemoryCacheProvider owns the actual handler here. Since that is guaranteed
// to outlive this SharedMemoryCache instance, so is the handler.
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler;
};

// JavaScript class that allows accessing an in-memory cache.
Expand Down Expand Up @@ -432,13 +474,32 @@ class MemoryCache: public jsg::Object {
// It is responsible for owning the SharedMemoryCache instances and providing them to the
// bindings as needed. The default implementation (created and returned by createDefault())
// uses a simple in-memory map to store the SharedMemoryCache instances.
// TODO(later): It may be worth considering some kind of metrics observer for the provider
// that can be passed along to the individual cache instances so we can monitor just how much
// the in memory cache is being used.
class MemoryCacheProvider {
public:
virtual ~MemoryCacheProvider() noexcept(false) = default;
virtual SharedMemoryCache& getInstance(kj::StringPtr cacheId, uint32_t ownerId) const = 0;
MemoryCacheProvider(
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler = kj::none);
KJ_DISALLOW_COPY_AND_MOVE(MemoryCacheProvider);
~MemoryCacheProvider() noexcept(false);

kj::Own<SharedMemoryCache> getInstance(
kj::Maybe<kj::StringPtr> cacheId = kj::none) const;

static kj::Own<MemoryCacheProvider> createDefault(
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler> additionalResizeMemoryLimitHandler = kj::none);
void removeInstance(SharedMemoryCache& instance) const;

private:
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler;

// All existing in-memory *shared* caches. This table will not include caches created
// that do not have an id (and therefore cannot be shared).
// TODO(cleanup): Later, assuming progress is made on kj::Ptr<T>, it would be nice
// to avoid the use of the bare pointer to SharedMemoryCache* here. When the SharedMemoryCache
// is destroyed, it will remove itself from this cache by calling removeInstance.
kj::MutexGuarded<kj::HashMap<kj::String, SharedMemoryCache*>> caches;
};

// clang-format off
Expand Down
Loading

0 comments on commit 1ee1e0f

Please sign in to comment.