Skip to content

Commit

Permalink
Update the MemoryCache impl
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Mar 6, 2024
1 parent d5570b7 commit e20b243
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 109 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
188 changes: 126 additions & 62 deletions src/workerd/api/memory-cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <workerd/jsg/ser.h>
#include <workerd/io/io-context.h>
#include <workerd/api/util.h>
#include <workerd/util/weak-refs.h>

namespace workerd::api {

Expand Down Expand Up @@ -34,7 +35,22 @@ static bool hasExpired(const kj::Maybe<double>& expiration, bool allowOutsideIoC
return false;
}

void SharedMemoryCache::suggest(const Limits& limits) {
SharedMemoryCache::SharedMemoryCache(
kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler)
: data(),
provider(provider),
id(kj::str(id)),
additionalResizeMemoryLimitHandler(additionalResizeMemoryLimitHandler) {}

SharedMemoryCache::~SharedMemoryCache() noexcept(false) {
KJ_IF_SOME(p, provider) {
p.removeInstance(*this);
}
}

void SharedMemoryCache::suggest(const Limits& limits) const {
auto data = this->data.lockExclusive();
bool isKnownLimit = data->suggestedLimits.find(limits) != data->suggestedLimits.end();
data->suggestedLimits.insert(limits);
Expand All @@ -43,15 +59,15 @@ void SharedMemoryCache::suggest(const Limits& limits) {
}
}

void SharedMemoryCache::unsuggest(const Limits& limits) {
void SharedMemoryCache::unsuggest(const Limits& limits) const {
auto data = this->data.lockExclusive();
auto loc = data->suggestedLimits.find(limits);
KJ_ASSERT(loc != data->suggestedLimits.end());
data->suggestedLimits.erase(loc);
resize(*data);
}

void SharedMemoryCache::resize(ThreadUnsafeData& data) {
void SharedMemoryCache::resize(ThreadUnsafeData& data) const {
data.effectiveLimits = Limits::min();
for (const auto& limits: data.suggestedLimits) {
data.effectiveLimits = Limits::max(data.effectiveLimits, limits.normalize());
Expand Down Expand Up @@ -87,7 +103,7 @@ void SharedMemoryCache::resize(ThreadUnsafeData& data) {
}

kj::Maybe<kj::Own<CacheValue>> SharedMemoryCache::getWhileLocked(
ThreadUnsafeData& data, const kj::String& key) {
ThreadUnsafeData& data, const kj::String& key) const {
KJ_IF_SOME(existingCacheEntry, data.cache.find(key)) {
if (hasExpired(existingCacheEntry.expiration)) {
// The cache entry has an associated expiration time and that time has
Expand All @@ -114,7 +130,7 @@ kj::Maybe<kj::Own<CacheValue>> SharedMemoryCache::getWhileLocked(
void SharedMemoryCache::putWhileLocked(ThreadUnsafeData& data,
const kj::String& key,
kj::Own<CacheValue>&& value,
kj::Maybe<double> expiration) {
kj::Maybe<double> expiration) const {
size_t valueSize = value->bytes.size();
if (valueSize > data.effectiveLimits.maxValueSize) {
// Silently drop the value. For consistency, also drop the previous value,
Expand Down Expand Up @@ -166,7 +182,9 @@ void SharedMemoryCache::putWhileLocked(ThreadUnsafeData& data,
}
}

void SharedMemoryCache::evictNextWhileLocked(ThreadUnsafeData& data, bool allowOutsideIoContext) {
void SharedMemoryCache::evictNextWhileLocked(
ThreadUnsafeData& data,
bool allowOutsideIoContext) const {
// The caller is responsible for ensuring that the cache is not empty already.
KJ_REQUIRE(data.cache.size() > 0);

Expand All @@ -186,7 +204,9 @@ void SharedMemoryCache::evictNextWhileLocked(ThreadUnsafeData& data, bool allowO
data.cache.erase(leastRecentlyUsed);
}

void SharedMemoryCache::removeIfExistsWhileLocked(ThreadUnsafeData& data, const kj::String& key) {
void SharedMemoryCache::removeIfExistsWhileLocked(
ThreadUnsafeData& data,
const kj::String& key) const {
KJ_IF_SOME(entry, data.cache.find(key)) {
// This DOES NOT count as an eviction because it might happen while
// replacing the existing cache entry with a new one, when the new one is
Expand All @@ -198,15 +218,42 @@ void SharedMemoryCache::removeIfExistsWhileLocked(ThreadUnsafeData& data, const
}
}

kj::Maybe<kj::Own<CacheValue>> SharedMemoryCache::Use::getWithoutFallback(const kj::String& key) {
auto data = cache.data.lockExclusive();
return cache.getWhileLocked(*data, key);
kj::Maybe<kj::Own<const SharedMemoryCache>> SharedMemoryCache::tryAddRef() const {
return kj::atomicAddRefWeak(*this);
}

kj::Own<const SharedMemoryCache> SharedMemoryCache::create(
kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> handler) {
return kj::atomicRefcounted<const SharedMemoryCache>(provider, id, handler);
}

SharedMemoryCache::Use::Use(kj::Own<const SharedMemoryCache> cache, const Limits& limits)
: cache(kj::mv(cache)), limits(limits) {
this->cache->suggest(limits);
}

SharedMemoryCache::Use::Use(Use&& other): cache(kj::mv(other.cache)), limits(other.limits) {
this->cache->suggest(limits);
}

SharedMemoryCache::Use::~Use() noexcept(false) {
if (cache.get() != nullptr) {
cache->unsuggest(limits);
}
}

kj::Maybe<kj::Own<CacheValue>> SharedMemoryCache::Use::getWithoutFallback(
const kj::String& key) const {
auto data = cache->data.lockExclusive();
return cache->getWhileLocked(*data, key);
}

kj::OneOf<kj::Own<CacheValue>, kj::Promise<SharedMemoryCache::Use::GetWithFallbackOutcome>>
SharedMemoryCache::Use::getWithFallback(const kj::String& key) {
auto data = cache.data.lockExclusive();
KJ_IF_SOME(existingValue, cache.getWhileLocked(*data, key)) {
SharedMemoryCache::Use::getWithFallback(const kj::String& key) const {
auto data = cache->data.lockExclusive();
KJ_IF_SOME(existingValue, cache->getWhileLocked(*data, key)) {
return kj::mv(existingValue);
} else KJ_IF_SOME(existingInProgress, data->inProgress.find(key)) {
// We return a Promise, but we keep the fulfiller. We might fulfill it
Expand All @@ -226,7 +273,7 @@ SharedMemoryCache::Use::getWithFallback(const kj::String& key) {
}

SharedMemoryCache::Use::FallbackDoneCallback SharedMemoryCache::Use::prepareFallback(
InProgress& inProgress) {
InProgress& inProgress) const {
// We need to detect if the Promise that we are about to create ever settles,
// as opposed to being destroyed without either being resolved or rejecting.
struct FallbackStatus {
Expand All @@ -250,8 +297,8 @@ SharedMemoryCache::Use::FallbackDoneCallback SharedMemoryCache::Use::prepareFall
// The fallback succeeded. Store the value in the cache and propagate it to
// all waiting requests, even if it has expired already.
status.hasSettled = true;
auto data = cache.data.lockExclusive();
cache.putWhileLocked(
auto data = cache->data.lockExclusive();
cache->putWhileLocked(
*data, kj::str(inProgress.key), kj::atomicAddRef(*result.value), result.expiration);
for (auto& waiter: inProgress.waiting) {
waiter.fulfiller->fulfill(kj::atomicAddRef(*result.value));
Expand All @@ -267,13 +314,13 @@ SharedMemoryCache::Use::FallbackDoneCallback SharedMemoryCache::Use::prepareFall
};
}

void SharedMemoryCache::Use::handleFallbackFailure(InProgress& inProgress) {
void SharedMemoryCache::Use::handleFallbackFailure(InProgress& inProgress) const {
kj::Own<kj::CrossThreadPromiseFulfiller<GetWithFallbackOutcome>> nextFulfiller;

// If there is another queued fallback, retrieve it and remove it from the
// queue. Otherwise, just delete the queue entirely.
{
auto data = cache.data.lockExclusive();
auto data = cache->data.lockExclusive();
auto next = inProgress.waiting.begin();
if (next != inProgress.waiting.end()) {
nextFulfiller = kj::mv(next->fulfiller);
Expand All @@ -300,7 +347,7 @@ void SharedMemoryCache::Use::handleFallbackFailure(InProgress& inProgress) {
// a tunneled exception, see jsg::createTunneledException().
static kj::Own<CacheValue> hackySerialize(jsg::Lock& js, jsg::JsRef<jsg::JsValue>& value) {
return js.tryCatch([&]() -> kj::Own<CacheValue> {
jsg::Serializer serializer(js, kj::none);
jsg::Serializer serializer(js);
serializer.write(js, value.getHandle(js));
return kj::atomicRefcounted<CacheValue>(serializer.release().data);
}, [&](jsg::Value&& exception) -> kj::Own<CacheValue> {
Expand Down Expand Up @@ -389,54 +436,71 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> MemoryCache::read(jsg::Lock& js,

// ======================================================================================

namespace {
// Data structure that maps unique cache identifiers to cache instances.
// This allows separate isolates to access the same in-memory caches.
class MemoryCacheMap: public MemoryCacheProvider {
public:
MemoryCacheMap(
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler = kj::none)
: additionalResizeMemoryLimitHandler(kj::mv(additionalResizeMemoryLimitHandler)) {}
KJ_DISALLOW_COPY_AND_MOVE(MemoryCacheMap);

// Gets an existing SharedMemoryCache instance or creates a new one if no
// cache with the given id exists.
SharedMemoryCache& getInstance(kj::StringPtr cacheId, uint32_t ownerId) const override;

private:
using HashMap = kj::HashMap<kj::String, kj::Own<SharedMemoryCache>>;

kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler;

// All existing in-memory caches.
kj::MutexGuarded<HashMap> caches;
// TODO(later): consider using a kj::Table with a HashIndex that uses
// SharedMemoryCache::uuid() instead.
};

SharedMemoryCache& MemoryCacheMap::getInstance(
kj::StringPtr cacheId, uint32_t ownerId) const {
auto lock = caches.lockExclusive();
auto id = kj::str(cacheId, "::", ownerId);
return *lock->findOrCreate(id, [this, &id]() {
MemoryCacheProvider::MemoryCacheProvider(
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler)
: additionalResizeMemoryLimitHandler(kj::mv(additionalResizeMemoryLimitHandler)) {}

MemoryCacheProvider::~MemoryCacheProvider() noexcept(false) {
// TODO(cleanup): Later, assuming progress is made on kj::Ptr<T>, we ought to be able
// to remove this. For now we just need to make sure that the MemoryCacheProvider instance
// outlives any SharedMemoryCache instances that are referencing it.
KJ_REQUIRE(caches.lockShared()->size() == 0,
"There are still active SharedMemoryCache instances. Use-after-free errors are likely.");
}

kj::Own<const SharedMemoryCache> MemoryCacheProvider::getInstance(
kj::Maybe<kj::StringPtr> cacheId) const {

const auto makeCache = [this](kj::Maybe<const MemoryCacheProvider&> provider, kj::StringPtr id) {
// The cache doesn't exist in the map. Let's create it.
auto handler = additionalResizeMemoryLimitHandler.map([](
const SharedMemoryCache::AdditionalResizeMemoryLimitHandler& handler)
-> SharedMemoryCache::AdditionalResizeMemoryLimitHandler& {
const SharedMemoryCache::AdditionalResizeMemoryLimitHandler& handler)
-> SharedMemoryCache::AdditionalResizeMemoryLimitHandler& {
return const_cast<SharedMemoryCache::AdditionalResizeMemoryLimitHandler&>(handler);
});
return HashMap::Entry{
kj::str(id),
kj::heap<SharedMemoryCache>(id, handler)
};
});
return SharedMemoryCache::create(provider, id, handler);
};

KJ_IF_SOME(cid, cacheId) {
auto lock = caches.lockExclusive();

// First, let's see if the cache already exists. If it does, we'll just return
// a strong reference to it.
KJ_IF_SOME(found, lock->find(cid)) {
KJ_IF_SOME(ref, found->tryAddRef()) {
return kj::mv(ref);
} else {
// We found an entry in the map, but atomicAddRefWeak failed. Doh. We have
// to replace the map entry with a new cache instance.
auto cache = makeCache(kj::Maybe<const MemoryCacheProvider&>(*this), cid);
lock->upsert(kj::str(cid), cache.get());
return kj::mv(cache);
}
}

// The cache doesn't exist, let's create it and add it to the map
auto cache = makeCache(kj::Maybe<const MemoryCacheProvider&>(*this), cid);
lock->insert(kj::str(cid), cache.get());
return kj::mv(cache);
}

// Since we don't have a cache id, we'll just create a new cache and return it.
return makeCache(kj::none, nullptr);
}
} // namespace

kj::Own<MemoryCacheProvider> MemoryCacheProvider::createDefault(
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler> additionalResizeMemoryLimitHandler) {
return kj::heap<MemoryCacheMap>(kj::mv(additionalResizeMemoryLimitHandler));
void MemoryCacheProvider::removeInstance(const SharedMemoryCache& instance) const {
// This is fun. We have to make sure that the instance to be removed is actually
// what we expect it to be.
auto lock = caches.lockExclusive();
KJ_IF_SOME(found, lock->find(instance.getId())) {
if (found != &instance) {
// Not the instance we expected it to be. Cache instance was likely replaced
// by a new instance with the same id. Do nothing.
return;
}
}
KJ_ASSERT(lock->erase(instance.getId()));
}

} // namespace workerd::api
Loading

0 comments on commit e20b243

Please sign in to comment.