Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Plasma] Use jemalloc instead of dlmalloc in plasma #2593

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,6 @@ POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------------------

src/plasma/thirdparty/dlmalloc.c: CC0

This is a version (aka dlmalloc) of malloc/free/realloc written by
Doug Lea and released to the public domain, as explained at
http://creativecommons.org/publicdomain/zero/1.0/ Send questions,
comments, complaints, performance data, etc to [email protected]

--------------------------------------------------------------------------------

src/plasma/thirdparty/xxhash: BSD 2-Clause License

xxHash - Fast Hash algorithm
Expand Down
24 changes: 21 additions & 3 deletions cpp/src/plasma/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,25 @@ project(plasma VERSION "${ARROW_BASE_VERSION}")

set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/../python/cmake_modules")

# Include jemalloc

# TODO(pcm): Include xhochy's patch
set(JEMALLOC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/jemalloc_ep-prefix/src/jemalloc_ep/dist/")
set(jemalloc_URL https://github.com/jemalloc/jemalloc/archive/5.1.0.tar.gz)
set(JEMALLOC_INCLUDE_DIR "${JEMALLOC_PREFIX}/include")
set(JEMALLOC_SHARED_LIB "${JEMALLOC_PREFIX}/lib/libjemalloc${CMAKE_SHARED_LIBRARY_SUFFIX}")
set(JEMALLOC_STATIC_LIB "${JEMALLOC_PREFIX}/lib/libjemalloc_pic${CMAKE_STATIC_LIBRARY_SUFFIX}")

ExternalProject_Add(jemalloc
URL ${jemalloc_URL}
DOWNLOAD_DIR "${DOWNLOAD_LOCATION}"
CONFIGURE_COMMAND ./autogen.sh "--prefix=${JEMALLOC_PREFIX}" "--with-jemalloc-prefix=je_plasma_" "--with-private-namespace=je_plasma_private_" "--disable-tls"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the same jemalloc version as we do in Arrow C++. Eventhough they should live happily along eachother, we should not add multiple versions of a library to the project.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: if we're able to remove our vendored jemalloc by making this upgrade that would be great

${EP_LOG_OPTIONS}
BUILD_IN_SOURCE 1
BUILD_COMMAND ${MAKE}
BUILD_BYPRODUCTS "${JEMALLOC_STATIC_LIB}" "${JEMALLOC_SHARED_LIB}"
INSTALL_COMMAND ${MAKE} -j1 install_bin install_include install_lib)

find_package(PythonLibsNew REQUIRED)
find_package(Threads)

Expand All @@ -30,7 +49,7 @@ set(PLASMA_SO_VERSION "${ARROW_SO_VERSION}")
set(PLASMA_FULL_SO_VERSION "${ARROW_FULL_SO_VERSION}")

include_directories(SYSTEM ${PYTHON_INCLUDE_DIRS})
include_directories("${FLATBUFFERS_INCLUDE_DIR}" "${CMAKE_CURRENT_LIST_DIR}/" "${CMAKE_CURRENT_LIST_DIR}/thirdparty/" "${CMAKE_CURRENT_LIST_DIR}/../")
include_directories("${FLATBUFFERS_INCLUDE_DIR}" "${CMAKE_CURRENT_LIST_DIR}/" "${CMAKE_CURRENT_LIST_DIR}/thirdparty/" "${CMAKE_CURRENT_LIST_DIR}/../" ${JEMALLOC_INCLUDE_DIR})

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L")

Expand Down Expand Up @@ -75,7 +94,6 @@ set(PLASMA_SRCS
events.cc
fling.cc
io.cc
malloc.cc
plasma.cc
protocol.cc
thirdparty/ae/ae.c
Expand Down Expand Up @@ -129,7 +147,7 @@ if ("${COMPILER_FAMILY}" STREQUAL "gcc")
endif()

add_executable(plasma_store_server store.cc)
target_link_libraries(plasma_store_server plasma_static ${PLASMA_LINK_LIBS})
target_link_libraries(plasma_store_server plasma_static ${PLASMA_LINK_LIBS} ${JEMALLOC_STATIC_LIB})

if (ARROW_RPATH_ORIGIN)
if (APPLE)
Expand Down
108 changes: 17 additions & 91 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
#include "plasma/common.h"
#include "plasma/fling.h"
#include "plasma/io.h"
#include "plasma/malloc.h"
#include "plasma/plasma.h"
#include "plasma/protocol.h"

Expand Down Expand Up @@ -249,10 +248,6 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
int store_conn_;
/// File descriptor of the Unix domain socket that connects to the manager.
int manager_conn_;
/// Table of dlmalloc buffer files that have been memory mapped so far. This
/// is a hash table mapping a file descriptor to a struct containing the
/// address of the corresponding memory-mapped file.
std::unordered_map<int, ClientMmapTableEntry> mmap_table_;
/// A hash table of the object IDs that are currently being used by this
/// client.
std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>> objects_in_use_;
Expand All @@ -274,6 +269,10 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
int64_t store_capacity_;
/// A hash set to record the ids that users want to delete but still in use.
std::unordered_set<ObjectID> deletion_cache_;
/// File descriptor of the memory mapped file that contains the data
int plasma_fd_;
/// Location the file containing the plasma data is mapped to
uint8_t* plasma_pointer_;

#ifdef PLASMA_GPU
/// Cuda Device Manager.
Expand All @@ -291,41 +290,6 @@ PlasmaClient::Impl::Impl() {

PlasmaClient::Impl::~Impl() {}

// If the file descriptor fd has been mmapped in this client process before,
// return the pointer that was returned by mmap, otherwise mmap it and store the
// pointer in a hash table.
uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) {
auto entry = mmap_table_.find(store_fd_val);
if (entry != mmap_table_.end()) {
close(fd);
return entry->second.pointer;
} else {
// We subtract kMmapRegionsGap from the length that was added
// in fake_mmap in malloc.h, to make map_size page-aligned again.
uint8_t* result = reinterpret_cast<uint8_t*>(mmap(
NULL, map_size - kMmapRegionsGap, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
// TODO(pcm): Don't fail here, instead return a Status.
if (result == MAP_FAILED) {
ARROW_LOG(FATAL) << "mmap failed";
}
close(fd); // Closing this fd has an effect on performance.

ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
entry.pointer = result;
entry.length = map_size;
entry.count = 0;
return result;
}
}

// Get a pointer to a file that we know has been memory mapped in this client
// process before.
uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) {
auto entry = mmap_table_.find(store_fd_val);
ARROW_CHECK(entry != mmap_table_.end());
return entry->second.pointer;
}

bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
const auto elem = objects_in_use_.find(object_id);
return (elem != objects_in_use_.end());
Expand All @@ -347,16 +311,9 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
objects_in_use_[object_id]->is_sealed = is_sealed;
object_entry = objects_in_use_[object_id].get();
if (object->device_num == 0) {
// Increment the count of the number of objects in the memory-mapped file
// that are being used. The corresponding decrement should happen in
// PlasmaClient::Release.
auto entry = mmap_table_.find(object->store_fd);
ARROW_CHECK(entry != mmap_table_.end());
ARROW_CHECK(entry->second.count >= 0);
// Update the in_use_object_bytes_.
in_use_object_bytes_ +=
(object_entry->object.data_size + object_entry->object.metadata_size);
entry->second.count += 1;
}
} else {
object_entry = elem->second.get();
Expand All @@ -379,21 +336,17 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaCreateReply, &buffer));
ObjectID id;
PlasmaObject object;
int store_fd;
int64_t mmap_size;
RETURN_NOT_OK(
ReadCreateReply(buffer.data(), buffer.size(), &id, &object, &store_fd, &mmap_size));
ReadCreateReply(buffer.data(), buffer.size(), &id, &object));
// If the CreateReply included an error, then the store will not send a file
// descriptor.
if (device_num == 0) {
int fd = recv_fd(store_conn_);
ARROW_CHECK(fd >= 0) << "recv not successful";
ARROW_CHECK(object.data_size == data_size);
ARROW_CHECK(object.metadata_size == metadata_size);
// The metadata should come right after the data.
ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
*data = std::make_shared<MutableBuffer>(
LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset, data_size);
plasma_pointer_ + object.data_offset, data_size);
// If plasma_create is being called from a transfer, then we will not copy the
// metadata here. The metadata will be written along with the data streamed
// from the transfer.
Expand Down Expand Up @@ -456,9 +409,8 @@ Status PlasmaClient::Impl::GetBuffers(
std::shared_ptr<Buffer> physical_buf;

if (object->device_num == 0) {
uint8_t* data = LookupMmappedFile(object->store_fd);
physical_buf = std::make_shared<Buffer>(
data + object->data_offset, object->data_size + object->metadata_size);
plasma_pointer_ + object->data_offset, object->data_size + object->metadata_size);
} else {
#ifdef PLASMA_GPU
physical_buf = gpu_object_map.find(object_ids[i])->second->ptr;
Expand Down Expand Up @@ -489,19 +441,8 @@ Status PlasmaClient::Impl::GetBuffers(
std::vector<ObjectID> received_object_ids(num_objects);
std::vector<PlasmaObject> object_data(num_objects);
PlasmaObject* object;
std::vector<int> store_fds;
std::vector<int64_t> mmap_sizes;
RETURN_NOT_OK(ReadGetReply(buffer.data(), buffer.size(), received_object_ids.data(),
object_data.data(), num_objects, store_fds, mmap_sizes));

// We mmap all of the file descriptors here so that we can avoid look them up
// in the subsequent loop based on just the store file descriptor and without
// having to know the relevant file descriptor received from recv_fd.
for (size_t i = 0; i < store_fds.size(); i++) {
int fd = recv_fd(store_conn_);
ARROW_CHECK(fd >= 0);
LookupOrMmap(fd, store_fds[i], mmap_sizes[i]);
}
object_data.data(), num_objects));

for (int64_t i = 0; i < num_objects; ++i) {
DCHECK(received_object_ids[i] == object_ids[i]);
Expand All @@ -519,9 +460,8 @@ Status PlasmaClient::Impl::GetBuffers(
if (object->data_size != -1) {
std::shared_ptr<Buffer> physical_buf;
if (object->device_num == 0) {
uint8_t* data = LookupMmappedFile(object->store_fd);
physical_buf = std::make_shared<Buffer>(
data + object->data_offset, object->data_size + object->metadata_size);
plasma_pointer_ + object->data_offset, object->data_size + object->metadata_size);
} else {
#ifdef PLASMA_GPU
std::lock_guard<std::mutex> lock(gpu_mutex);
Expand Down Expand Up @@ -583,31 +523,11 @@ Status PlasmaClient::Impl::UnmapObject(const ObjectID& object_id) {
ARROW_CHECK(object_entry != objects_in_use_.end());
ARROW_CHECK(object_entry->second->count == 0);

// Decrement the count of the number of objects in this memory-mapped file
// that the client is using. The corresponding increment should have
// happened in plasma_get.
int fd = object_entry->second->object.store_fd;
auto entry = mmap_table_.find(fd);
ARROW_CHECK(entry != mmap_table_.end());
ARROW_CHECK(entry->second.count >= 1);
if (entry->second.count == 1) {
// If no other objects are being used, then unmap the file.
// We subtract kMmapRegionsGap from the length that was added
// in fake_mmap in malloc.h, to make the size page-aligned again.
int err = munmap(entry->second.pointer, entry->second.length - kMmapRegionsGap);
if (err == -1) {
return Status::IOError("Error during munmap");
}
// Remove the corresponding entry from the hash table.
mmap_table_.erase(fd);
} else {
// If there are other objects being used, decrement the reference count.
entry->second.count -= 1;
}
// Update the in_use_object_bytes_.
in_use_object_bytes_ -= (object_entry->second->object.data_size +
object_entry->second->object.metadata_size);
DCHECK_GE(in_use_object_bytes_, 0);

// Remove the entry from the hash table of objects currently in use.
objects_in_use_.erase(object_id);
return Status::OK();
Expand Down Expand Up @@ -940,7 +860,13 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
RETURN_NOT_OK(SendConnectRequest(store_conn_));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaConnectReply, &buffer));
RETURN_NOT_OK(ReadConnectReply(buffer.data(), buffer.size(), &store_capacity_));
std::string plasma_file_name;
int64_t plasma_file_size;
RETURN_NOT_OK(ReadConnectReply(buffer.data(), buffer.size(), &store_capacity_, &plasma_file_name, &plasma_file_size));
plasma_fd_ = open(plasma_file_name.c_str(), O_RDWR);
ARROW_CHECK(plasma_fd_ != -1) << "Failed to open plasma file, errno = " << strerror(errno);
plasma_pointer_ = reinterpret_cast<uint8_t*>(mmap(NULL, plasma_file_size, PROT_READ | PROT_WRITE, MAP_SHARED, plasma_fd_, 0));

return Status::OK();
}

Expand Down
4 changes: 0 additions & 4 deletions cpp/src/plasma/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,8 @@ struct ObjectTableEntry {

~ObjectTableEntry();

/// Memory mapped file containing the object.
int fd;
/// Device number.
int device_num;
/// Size of the underlying map.
int64_t map_size;
/// Offset from the base of the mmap.
ptrdiff_t offset;
/// Pointer to the object data. Needed to free the object.
Expand Down
14 changes: 9 additions & 5 deletions cpp/src/plasma/eviction_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "plasma/eviction_policy.h"

#include <jemalloc/jemalloc.h>

#include <algorithm>

namespace plasma {
Expand Down Expand Up @@ -67,10 +69,12 @@ int64_t EvictionPolicy::ChooseObjectsToEvict(int64_t num_bytes_required,

void EvictionPolicy::ObjectCreated(const ObjectID& object_id) {
auto entry = store_info_->objects[object_id].get();
cache_.Add(object_id, entry->data_size + entry->metadata_size);
int64_t size = entry->data_size + entry->metadata_size;
int64_t size = std::max(entry->data_size + entry->metadata_size, kBlockSize);
cache_.Add(object_id, size);
memory_used_ += size;
ARROW_CHECK(memory_used_ <= store_info_->memory_capacity);
ARROW_CHECK(memory_used_ <= store_info_->memory_capacity)
<< " memory_used_ = " << memory_used_
<< ", store_info_->memory_capacity = " << store_info_->memory_capacity;
}

bool EvictionPolicy::RequireSpace(int64_t size, std::vector<ObjectID>* objects_to_evict) {
Expand Down Expand Up @@ -99,15 +103,15 @@ void EvictionPolicy::EndObjectAccess(const ObjectID& object_id,
std::vector<ObjectID>* objects_to_evict) {
auto entry = store_info_->objects[object_id].get();
// Add the object to the LRU cache.
cache_.Add(object_id, entry->data_size + entry->metadata_size);
cache_.Add(object_id, std::max(entry->data_size + entry->metadata_size, kBlockSize));
}

void EvictionPolicy::RemoveObject(const ObjectID& object_id) {
// If the object is in the LRU cache, remove it.
cache_.Remove(object_id);

auto entry = store_info_->objects[object_id].get();
int64_t size = entry->data_size + entry->metadata_size;
int64_t size = std::max(entry->data_size + entry->metadata_size, kBlockSize);
ARROW_CHECK(memory_used_ >= size);
memory_used_ -= size;
}
Expand Down
21 changes: 4 additions & 17 deletions cpp/src/plasma/format/plasma.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ enum PlasmaError:int {
// Plasma store messages

struct PlasmaObjectSpec {
// Index of the memory segment (= memory mapped file) that
// this object is allocated in.
segment_index: int;
// The offset in bytes in the memory mapped file of the data.
data_offset: ulong;
// The size in bytes of the data.
Expand Down Expand Up @@ -131,12 +128,6 @@ table PlasmaCreateReply {
plasma_object: PlasmaObjectSpec;
// Error that occurred for this call.
error: PlasmaError;
// The file descriptor in the store that corresponds to the file descriptor
// being sent to the client right after this message.
store_fd: int;
// The size in bytes of the segment for the store file descriptor (needed to
// call mmap).
mmap_size: long;
// CUDA IPC Handle for objects on GPU.
ipc_handle: CudaHandle;
}
Expand Down Expand Up @@ -181,14 +172,6 @@ table PlasmaGetReply {
// Plasma object information, in the same order as their IDs. The number of
// elements in both object_ids and plasma_objects arrays must agree.
plasma_objects: [PlasmaObjectSpec];
// A list of the file descriptors in the store that correspond to the file
// descriptors being sent to the client. The length of this list is the number
// of file descriptors that the store will send to the client after this
// message.
store_fds: [int];
// Size in bytes of the segment for each store file descriptor (needed to call
// mmap). This list must have the same length as store_fds.
mmap_sizes: [long];
// The number of elements in both object_ids and plasma_objects arrays must agree.
handles: [CudaHandle];
}
Expand Down Expand Up @@ -279,6 +262,10 @@ table PlasmaConnectRequest {
table PlasmaConnectReply {
// The memory capacity of the store.
memory_capacity: long;
// Path of the file the store data is stored in.
plasma_file_name: string;
// Size of the shared memory file in bytes.
plasma_file_size: long;
}

table PlasmaEvictRequest {
Expand Down
Loading