Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Zlib/Brotli custom allocator #2705

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 18 additions & 26 deletions src/workerd/api/node/zlib-util.c++
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ void ZlibUtil::ZlibStream::initialize(int windowBits,
jsg::Function<void()> writeCallback,
jsg::Optional<kj::Array<kj::byte>> dictionary) {
initializeStream(kj::mv(writeState), kj::mv(writeCallback));
context()->setAllocationFunctions(AllocForZlib, FreeForZlib, this);
context()->setAllocationFunctions(Allocator::AllocForZlib, Allocator::FreeForZlib, &allocator);
context()->initialize(level, windowBits, memLevel, strategy, kj::mv(dictionary));
}

Expand Down Expand Up @@ -771,10 +771,8 @@ bool ZlibUtil::BrotliCompressionStream<CompressionContext>::initialize(jsg::Lock
jsg::BufferSource writeResult,
jsg::Function<void()> writeCallback) {
this->initializeStream(kj::mv(writeResult), kj::mv(writeCallback));
auto maybeError =
this->context()->initialize(CompressionStream<CompressionContext>::AllocForBrotli,
CompressionStream<CompressionContext>::FreeForZlib,
static_cast<CompressionStream<CompressionContext>*>(this));
auto maybeError = this->context()->initialize(
Allocator::AllocForBrotli, Allocator::FreeForZlib, &this->allocator);

KJ_IF_SOME(err, maybeError) {
this->emitError(js, kj::mv(err));
Expand All @@ -796,33 +794,26 @@ bool ZlibUtil::BrotliCompressionStream<CompressionContext>::initialize(jsg::Lock
return true;
}

template <typename CompressionContext>
void* ZlibUtil::CompressionStream<CompressionContext>::AllocForZlib(
void* data, uInt items, uInt size) {
void* ZlibUtil::Allocator::AllocForZlib(void* data, uInt items, uInt size) {
size_t real_size =
nbytes::MultiplyWithOverflowCheck(static_cast<size_t>(items), static_cast<size_t>(size));
return AllocForBrotli(data, real_size);
}

template <typename CompressionContext>
void* ZlibUtil::CompressionStream<CompressionContext>::AllocForBrotli(void* data, size_t size) {
size += sizeof(size_t);
auto* ctx = static_cast<CompressionStream*>(data);
void* ZlibUtil::Allocator::AllocForBrotli(void* opaque, size_t size) {
auto* thisAllocator = static_cast<Allocator*>(opaque);
auto memory = kj::heapArray<uint8_t>(size);
auto begin = memory.begin();
// TODO(soon): Check if we need to store the size of the block in the pointer like Node.js
*reinterpret_cast<size_t*>(begin) = size;
ctx->allocations.insert(begin, kj::mv(memory));
return begin + sizeof(size_t);
thisAllocator->allocations.insert(begin, kj::mv(memory));
return begin;
}

template <typename CompressionContext>
void ZlibUtil::CompressionStream<CompressionContext>::FreeForZlib(void* data, void* pointer) {
void ZlibUtil::Allocator::FreeForZlib(void* opaque, void* pointer) {
if (KJ_UNLIKELY(pointer == nullptr)) return;
auto* ctx = static_cast<CompressionStream*>(data);
auto real_pointer = static_cast<uint8_t*>(pointer) - sizeof(size_t);
JSG_REQUIRE(ctx->allocations.erase(real_pointer), Error, "Zlib allocation should exist"_kj);
auto* thisAllocator = static_cast<Allocator*>(opaque);
JSG_REQUIRE(thisAllocator->allocations.erase(pointer), Error, "Zlib allocation should exist"_kj);
}

namespace {
template <typename Context>
static kj::Array<kj::byte> syncProcessBuffer(Context& ctx, GrowableBuffer& result) {
Expand All @@ -845,7 +836,10 @@ static kj::Array<kj::byte> syncProcessBuffer(Context& ctx, GrowableBuffer& resul

kj::Array<kj::byte> ZlibUtil::zlibSync(
ZlibUtil::InputSource data, ZlibContext::Options opts, ZlibModeValue mode) {
// Any use of zlib APIs consistutes an implicit dependency on Allocator which must remain alive until the zlib stream is destroyed
Allocator allocator;
ZlibContext ctx(static_cast<ZlibMode>(mode));
ctx.setAllocationFunctions(Allocator::AllocForZlib, Allocator::FreeForZlib, &allocator);

auto chunkSize = opts.chunkSize.orDefault(ZLIB_PERFORMANT_CHUNK_SIZE);
auto maxOutputLength = opts.maxOutputLength.orDefault(Z_MAX_CHUNK);
Expand Down Expand Up @@ -894,6 +888,8 @@ void ZlibUtil::zlibWithCallback(jsg::Lock& js,

template <typename Context>
kj::Array<kj::byte> ZlibUtil::brotliSync(InputSource data, BrotliContext::Options opts) {
// Any use of brotli APIs consistutes an implicit dependency on Allocator which must remain alive until the brotli state is destroyed
Allocator allocator;
Context ctx(Context::Mode);

auto chunkSize = opts.chunkSize.orDefault(ZLIB_PERFORMANT_CHUNK_SIZE);
Expand All @@ -906,8 +902,7 @@ kj::Array<kj::byte> ZlibUtil::brotliSync(InputSource data, BrotliContext::Option
JSG_REQUIRE(maxOutputLength <= Z_MAX_CHUNK, Error, "Invalid maxOutputLength"_kj);
GrowableBuffer result(ZLIB_PERFORMANT_CHUNK_SIZE, maxOutputLength);

// TODO(soon): should we track them brotli allocationz?
KJ_IF_SOME(err, ctx.initialize(nullptr, nullptr, nullptr)) {
KJ_IF_SOME(err, ctx.initialize(Allocator::AllocForBrotli, Allocator::FreeForZlib, &allocator)) {
JSG_FAIL_REQUIRE(Error, err.message);
}

Expand Down Expand Up @@ -955,9 +950,6 @@ void ZlibUtil::brotliWithCallback(

#ifndef CREATE_TEMPLATE
#define CREATE_TEMPLATE(T) \
template void* ZlibUtil::CompressionStream<T>::AllocForZlib(void* data, uInt items, uInt size); \
template void* ZlibUtil::CompressionStream<T>::AllocForBrotli(void* data, size_t size); \
template void ZlibUtil::CompressionStream<T>::FreeForZlib(void* data, void* pointer); \
template void ZlibUtil::CompressionStream<T>::reset(jsg::Lock& js); \
template void ZlibUtil::CompressionStream<T>::write<false>(jsg::Lock & js, int flush, \
jsg::Optional<kj::Array<kj::byte>> input, int inputOffset, int inputLength, \
Expand Down
24 changes: 15 additions & 9 deletions src/workerd/api/node/zlib-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,19 @@ class ZlibUtil final: public jsg::Object {
ZlibUtil() = default;
ZlibUtil(jsg::Lock&, const jsg::Url&) {}

// A custom allocator to be used by the zlib and brotli libraries
// The current implementation stores allocations in a hash map.
// TODO: Use an arena allocator implementation instead of hashing pointers in order to improve performance
class Allocator final {
public:
static void* AllocForZlib(void* data, uInt items, uInt size);
static void* AllocForBrotli(void* data, size_t size);
static void FreeForZlib(void* data, void* pointer);

private:
kj::HashMap<void*, kj::Array<kj::byte>> allocations;
};

template <class CompressionContext>
class CompressionStream: public jsg::Object {
public:
Expand Down Expand Up @@ -344,19 +357,12 @@ class ZlibUtil final: public jsg::Object {

void initializeStream(jsg::BufferSource _write_result, jsg::Function<void()> writeCallback);

// Allocation functions provided to zlib itself. We store the real size of
// the allocated memory chunk just before the "payload" memory we return
// to zlib.
static void* AllocForZlib(void* data, uInt items, uInt size);
static void* AllocForBrotli(void* data, size_t size);
static void FreeForZlib(void* data, void* pointer);

private:
// Used to store allocations in Brotli* operations.
// This declaration should be physically positioned before
// context to avoid `heap-use-after-free` ASan error.
kj::HashMap<uint8_t*, kj::Array<uint8_t>> allocations;
Allocator allocator;

private:
CompressionContext context_;
bool initialized = false;
bool writing = false;
Expand Down