Skip to content

Commit

Permalink
Update the MemoryCache impl
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Feb 21, 2024
1 parent 83e7e77 commit fd68ef1
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 54 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
106 changes: 83 additions & 23 deletions src/workerd/api/memory-cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <workerd/jsg/ser.h>
#include <workerd/io/io-context.h>
#include <workerd/api/util.h>
#include <workerd/util/weak-refs.h>

namespace workerd::api {

Expand Down Expand Up @@ -34,6 +35,15 @@ static bool hasExpired(const kj::Maybe<double>& expiration, bool allowOutsideIoC
return false;
}

SharedMemoryCache::SharedMemoryCache(
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler)
: data(),
additionalResizeMemoryLimitHandler(additionalResizeMemoryLimitHandler),
self(kj::refcounted<WeakRef<SharedMemoryCache>>(
kj::Badge<SharedMemoryCache> {}, *this)) {}

SharedMemoryCache::~SharedMemoryCache() noexcept(false) { self->invalidate(); }

void SharedMemoryCache::suggest(const Limits& limits) {
auto data = this->data.lockExclusive();
bool isKnownLimit = data->suggestedLimits.find(limits) != data->suggestedLimits.end();
Expand Down Expand Up @@ -198,15 +208,38 @@ void SharedMemoryCache::removeIfExistsWhileLocked(ThreadUnsafeData& data, const
}
}

kj::Own<WeakRef<SharedMemoryCache>> SharedMemoryCache::addWeakRef() {
return self->addRef();
}

kj::Own<SharedMemoryCache> SharedMemoryCache::addRef() {
return kj::atomicAddRef(*this);
}

SharedMemoryCache::Use::Use(kj::Own<SharedMemoryCache> cache, const Limits& limits)
: cache(kj::mv(cache)), limits(limits) {
this->cache->suggest(limits);
}

SharedMemoryCache::Use::Use(Use&& other): cache(kj::mv(other.cache)), limits(other.limits) {
this->cache->suggest(limits);
}

SharedMemoryCache::Use::~Use() noexcept(false) {
if (cache.get() != nullptr) {
cache->unsuggest(limits);
}
}

kj::Maybe<kj::Own<CacheValue>> SharedMemoryCache::Use::getWithoutFallback(const kj::String& key) {
auto data = cache.data.lockExclusive();
return cache.getWhileLocked(*data, key);
auto data = cache->data.lockExclusive();
return cache->getWhileLocked(*data, key);
}

kj::OneOf<kj::Own<CacheValue>, kj::Promise<SharedMemoryCache::Use::GetWithFallbackOutcome>>
SharedMemoryCache::Use::getWithFallback(const kj::String& key) {
auto data = cache.data.lockExclusive();
KJ_IF_SOME(existingValue, cache.getWhileLocked(*data, key)) {
auto data = cache->data.lockExclusive();
KJ_IF_SOME(existingValue, cache->getWhileLocked(*data, key)) {
return kj::mv(existingValue);
} else KJ_IF_SOME(existingInProgress, data->inProgress.find(key)) {
// We return a Promise, but we keep the fulfiller. We might fulfill it
Expand Down Expand Up @@ -250,8 +283,8 @@ SharedMemoryCache::Use::FallbackDoneCallback SharedMemoryCache::Use::prepareFall
// The fallback succeeded. Store the value in the cache and propagate it to
// all waiting requests, even if it has expired already.
status.hasSettled = true;
auto data = cache.data.lockExclusive();
cache.putWhileLocked(
auto data = cache->data.lockExclusive();
cache->putWhileLocked(
*data, kj::str(inProgress.key), kj::atomicAddRef(*result.value), result.expiration);
for (auto& waiter: inProgress.waiting) {
waiter.fulfiller->fulfill(kj::atomicAddRef(*result.value));
Expand All @@ -273,7 +306,7 @@ void SharedMemoryCache::Use::handleFallbackFailure(InProgress& inProgress) {
// If there is another queued fallback, retrieve it and remove it from the
// queue. Otherwise, just delete the queue entirely.
{
auto data = cache.data.lockExclusive();
auto data = cache->data.lockExclusive();
auto next = inProgress.waiting.begin();
if (next != inProgress.waiting.end()) {
nextFulfiller = kj::mv(next->fulfiller);
Expand Down Expand Up @@ -402,35 +435,62 @@ public:

// Gets an existing SharedMemoryCache instance or creates a new one if no
// cache with the given id exists.
SharedMemoryCache& getInstance(kj::StringPtr cacheId, uint32_t ownerId) const override;
kj::Own<SharedMemoryCache> getInstance(
kj::Maybe<kj::StringPtr> cacheId) const override;

private:
using HashMap = kj::HashMap<kj::String, kj::Own<SharedMemoryCache>>;
// In this implementation, we maintain a mapping of weak pointers to atomic
// refcounted SharedMemoryCache instances. We use weak refs here to prevent
// the presence of the reference in this map from keeping the cache instance
// alive. It will be the the individual SharedMemoryCache::Use instances that
// will hold the owning references to the cache instance.
using HashMap = kj::HashMap<kj::String, kj::Own<WeakRef<SharedMemoryCache>>>;

kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler;

// All existing in-memory caches.
kj::MutexGuarded<HashMap> caches;
// TODO(later): consider using a kj::Table with a HashIndex that uses
// SharedMemoryCache::uuid() instead.
};

SharedMemoryCache& MemoryCacheMap::getInstance(
kj::StringPtr cacheId, uint32_t ownerId) const {
auto lock = caches.lockExclusive();
auto id = kj::str(cacheId, "::", ownerId);
return *lock->findOrCreate(id, [this, &id]() {
kj::Own<SharedMemoryCache> MemoryCacheMap::getInstance(
kj::Maybe<kj::StringPtr> cacheId) const {

const auto makeCache = [this] {
// The cache doesn't exist in the map. Let's create it.
auto handler = additionalResizeMemoryLimitHandler.map([](
const SharedMemoryCache::AdditionalResizeMemoryLimitHandler& handler)
-> SharedMemoryCache::AdditionalResizeMemoryLimitHandler& {
const SharedMemoryCache::AdditionalResizeMemoryLimitHandler& handler)
-> SharedMemoryCache::AdditionalResizeMemoryLimitHandler& {
return const_cast<SharedMemoryCache::AdditionalResizeMemoryLimitHandler&>(handler);
});
return HashMap::Entry{
kj::str(id),
kj::heap<SharedMemoryCache>(id, handler)
};
});
return kj::atomicRefcounted<SharedMemoryCache>(handler);
};

KJ_IF_SOME(cid, cacheId) {
auto lock = caches.lockExclusive();

// Since our map contains weak refs, let's go through and purge any
// that are no longer valid.
lock->eraseAll([](auto& key, auto& value) {
return !value->isValid();
});

// First, let's see if the cache already exists. If it does, we'll just return
// a strong reference to it.
KJ_IF_SOME(weak, lock->find(cid)) {
// Since we purged invalid refs above, we can assert here that if we found one,
// it must still be good.
return KJ_ASSERT_NONNULL(weak->tryGet()).addRef();
}

// The cache doesn't exist, let's create it and add it to the map
auto cache = makeCache();
lock->insert(kj::str(cid), cache->addWeakRef());
return kj::mv(cache);
}

// Since we don't have a cache id, we'll just create a new cache and return it.
return makeCache();
}
} // namespace

Expand Down
49 changes: 27 additions & 22 deletions src/workerd/api/memory-cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <workerd/jsg/jsg.h>
#include <workerd/jsg/function.h>
#include <workerd/util/uuid.h>
#include <workerd/util/weak-refs.h>
#include <kj/hash.h>
#include <kj/map.h>
#include <kj/mutex.h>
Expand Down Expand Up @@ -57,7 +58,11 @@ struct CacheValueProduceResult {

// An in-memory cache that can be accessed by any number of workers/isolates
// within the same process.
class SharedMemoryCache {
// TODO(soon): We expect to replace this implementation with a memcached-based
// implementation in the near future. The memcached-based impl will likely be
// fairly different from this implementation so quite a few of the details here
// are expected to change.
class SharedMemoryCache : public kj::AtomicRefcounted {
private:
struct InProgress;
struct ThreadUnsafeData;
Expand Down Expand Up @@ -116,16 +121,13 @@ class SharedMemoryCache {

using AdditionalResizeMemoryLimitHandler = kj::Function<void(ThreadUnsafeData&)>;

SharedMemoryCache(const kj::StringPtr& uuid,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler)
: data(),
additionalResizeMemoryLimitHandler(additionalResizeMemoryLimitHandler),
uuid_(kj::str(uuid)),
instanceId_(randomUUID(kj::none)) {}
SharedMemoryCache(
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler);

inline kj::StringPtr uuid() { return uuid_; }
~SharedMemoryCache() noexcept(false);

inline kj::StringPtr instanceId() { return instanceId_; }
kj::Own<WeakRef<SharedMemoryCache>> addWeakRef();
kj::Own<SharedMemoryCache> addRef();

public:
// RAII class that attaches itself to a cache, suggests cache limits to the
Expand All @@ -134,11 +136,9 @@ class SharedMemoryCache {
public:
KJ_DISALLOW_COPY(Use);

Use(SharedMemoryCache& cache, const Limits& limits): cache(cache), limits(limits) {
cache.suggest(limits);
}
Use(Use&& other): cache(other.cache), limits(other.limits) { cache.suggest(limits); }
~Use() noexcept(false) { cache.unsuggest(limits); }
Use(kj::Own<SharedMemoryCache> cache, const Limits& limits);
Use(Use&& other);
~Use() noexcept(false);

// Returns a cached value for the given key if one exists (and has not
// expired). If no such value exists, nothing is returned, regardless of any
Expand Down Expand Up @@ -176,7 +176,7 @@ class SharedMemoryCache {
// erased.
void handleFallbackFailure(InProgress& inProgress);

SharedMemoryCache& cache;
kj::Own<SharedMemoryCache> cache;
Limits limits;
};

Expand Down Expand Up @@ -399,11 +399,7 @@ class SharedMemoryCache {
kj::MutexGuarded<ThreadUnsafeData> data;
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler;

// A unique identifier associated with this cache.
const kj::String uuid_;

// Uniquely identifies this instance of this cache.
const kj::String instanceId_;
kj::Own<WeakRef<SharedMemoryCache>> self;
};

// JavaScript class that allows accessing an in-memory cache.
Expand Down Expand Up @@ -432,13 +428,22 @@ class MemoryCache: public jsg::Object {
// It is responsible for owning the SharedMemoryCache instances and providing them to the
// bindings as needed. The default implementation (created and returned by createDefault())
// uses a simple in-memory map to store the SharedMemoryCache instances.
//
// This class is virtualized for a number of reasons, the most important of which is that
// we may want to replace the default implementation in production with one that includes
// more metrics and monitoring, or that uses a different cache implementation. Ultimately,
// however, if we do end up having additional implementations, it will be the SharedMemoryCache
// class that will need to be updated.
class MemoryCacheProvider {
public:
virtual ~MemoryCacheProvider() noexcept(false) = default;
virtual SharedMemoryCache& getInstance(kj::StringPtr cacheId, uint32_t ownerId) const = 0;

virtual kj::Own<SharedMemoryCache> getInstance(
kj::Maybe<kj::StringPtr> cacheId) const = 0;

static kj::Own<MemoryCacheProvider> createDefault(
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler> additionalResizeMemoryLimitHandler = kj::none);
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler = kj::none);
};

// clang-format off
Expand Down
15 changes: 8 additions & 7 deletions src/workerd/server/workerd-api.c++
Original file line number Diff line number Diff line change
Expand Up @@ -625,13 +625,14 @@ static v8::Local<v8::Value> createBindingValue(
}

KJ_CASE_ONEOF(cache, Global::MemoryCache) {
api::SharedMemoryCache::Limits limits = {.maxKeys = cache.maxKeys,
.maxValueSize = cache.maxValueSize,
.maxTotalValueSize = cache.maxTotalValueSize};
api::SharedMemoryCache& sharedCache =
memoryCacheProvider.getInstance(cache.cacheId, ownerId);
api::SharedMemoryCache::Use cacheUse(sharedCache, limits);
value = lock.wrap(context, jsg::alloc<api::MemoryCache>(kj::mv(cacheUse)));
value = lock.wrap(context, jsg::alloc<api::MemoryCache>(
api::SharedMemoryCache::Use(
memoryCacheProvider.getInstance(cache.cacheId),
{
.maxKeys = cache.maxKeys,
.maxValueSize = cache.maxValueSize,
.maxTotalValueSize = cache.maxTotalValueSize,
})));
}

KJ_CASE_ONEOF(ns, Global::EphemeralActorNamespace) {
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/server/workerd-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ class WorkerdApi final: public Worker::Api {
};

struct MemoryCache {
kj::String cacheId;
kj::Maybe<kj::String> cacheId = kj::none;
uint32_t maxKeys;
uint32_t maxValueSize;
uint64_t maxTotalValueSize;

MemoryCache clone() const {
return MemoryCache {
.cacheId = kj::str(cacheId),
.cacheId = cacheId.map([](auto& id) { return kj::str(id);}),
.maxKeys = maxKeys,
.maxValueSize = maxValueSize,
.maxTotalValueSize = maxTotalValueSize,
Expand Down

0 comments on commit fd68ef1

Please sign in to comment.