Skip to content

Commit

Permalink
ARROW-3958: [Plasma] Reduce number of IPCs
Browse files Browse the repository at this point in the history
This PR also removes the client unmap, which is not necessary any more since the introduction of malloc (since there is only few memory mapped files and they typically stay around for the lifetime of the application).

The PR also gets rid of a bunch of code that is not needed any more now (the release buffer, yay!).

Benchmarks:

```
import pyarrow.plasma as plasma

client = plasma.connect("/tmp/plasma", "", 0)

# Put performance

def f():
    for i in range(10000):
        client.put(1)

%timeit f()

# without optimization:

# 1.51 s ± 2.22 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 1.52 s ± 9.68 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 1.53 s ± 19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# with optimizations:

# 1.27 s ± 10.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 1.31 s ± 8.18 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 1.31 s ± 17.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# Create/seal performance

def f():
    for i in range(10000):
        object_id = plasma.ObjectID.from_random()
        client.create(object_id, 0)
        client.seal(object_id)

%timeit f()

# without optimizations:

# 571 ms ± 2.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 583 ms ± 22.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 588 ms ± 14.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# with optimizations:

# 531 ms ± 3.24 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 541 ms ± 9.99 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 542 ms ± 19.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# Get performance

objects = [client.put(1) for i in range(10000)]

def g():
    for i in range(10000):
        client.get(objects[i])

%timeit g()

# without optimizations

# 1.11 s ± 6.17 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 1.12 s ± 1.49 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 1.19 s ± 24.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# with optimizations

# 776 ms ± 11.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 792 ms ± 3.06 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 778 ms ± 9.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

Author: Philipp Moritz <[email protected]>
Author: Robert Nishihara <[email protected]>

Closes #3124 from pcmoritz/plasma-send-fd and squashes the following commits:

f899f45 <Philipp Moritz> Update client.cc
a038404 <Robert Nishihara> Update _plasma.pyx
af150c1 <Philipp Moritz> comments and fixes
71c4c5c <Philipp Moritz> don't close fd twice
0d57282 <Philipp Moritz> linting
f60dcbe <Philipp Moritz> fix tests
502aeda <Philipp Moritz> linting
2887b17 <Philipp Moritz> clean up some code
cfff7e32 <Philipp Moritz> lint
e5ccbba <Philipp Moritz> fixes
5f09199 <Philipp Moritz> introduce method
24beb27 <Philipp Moritz> working version
  • Loading branch information
pcmoritz authored and robertnishihara committed Dec 13, 2018
1 parent e34057c commit b3bc338
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 194 deletions.
184 changes: 40 additions & 144 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ typedef struct XXH64_state_s XXH64_state_t;
constexpr int64_t kHashingConcurrency = 8;
constexpr int64_t kBytesInMB = 1 << 20;

// Use 100MB as an overestimate of the L3 cache size.
constexpr int64_t kL3CacheSizeBytes = 100000000;

// ----------------------------------------------------------------------
// GPU support

Expand Down Expand Up @@ -143,22 +140,13 @@ struct ObjectInUseEntry {
bool is_sealed;
};

/// Configuration options for the plasma client.
struct PlasmaClientConfig {
/// Number of release calls we wait until the object is actually released.
/// This allows us to avoid invalidating the cpu cache on workers if objects
/// are reused accross tasks.
size_t release_delay;
};

struct ClientMmapTableEntry {
/// The associated file descriptor on the client.
int fd;
/// The result of mmap for this file descriptor.
uint8_t* pointer;
/// The length of the memory-mapped file.
size_t length;
/// The number of objects in this memory-mapped file that are currently being
/// used by the client. When this count reaches zeros, we unmap the file.
int count;
};

class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Impl> {
Expand All @@ -169,7 +157,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
// PlasmaClient method implementations

Status Connect(const std::string& store_socket_name,
const std::string& manager_socket_name, int release_delay,
const std::string& manager_socket_name, int release_delay = 0,
int num_retries = -1);

Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata,
Expand Down Expand Up @@ -221,18 +209,22 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp

int get_manager_fd() const;

Status FlushReleaseHistory();

bool IsInUse(const ObjectID& object_id);

private:
/// This is a helper method for unmapping objects for which all references have
/// gone out of scope, either by calling Release or Abort.
/// Check if store_fd has already been received from the store. If yes,
/// return it. Otherwise, receive it from the store (see analogous logic
/// in store.cc).
///
/// @param object_id The object ID whose data we should unmap.
Status UnmapObject(const ObjectID& object_id);
/// \param store_fd File descriptor to fetch from the store.
/// \return Client file descriptor corresponding to store_fd.
int GetStoreFd(int store_fd);

Status PerformRelease(const ObjectID& object_id);
/// This is a helper method for marking an object as unused by this client.
///
/// \param object_id The object ID we mark unused.
/// \return The return status.
Status MarkObjectUnused(const ObjectID& object_id);

/// Common helper for Get() variants
Status GetBuffers(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
Expand Down Expand Up @@ -267,18 +259,6 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
/// A hash table of the object IDs that are currently being used by this
/// client.
std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>> objects_in_use_;
/// Object IDs of the last few release calls. This is a deque and
/// is used to delay releasing objects to see if they can be reused by
/// subsequent tasks so we do not unneccessarily invalidate cpu caches.
/// TODO(pcm): replace this with a proper lru cache using the size of the L3
/// cache.
std::deque<ObjectID> release_history_;
/// The number of bytes in the combined objects that are held in the release
/// history doubly-linked list. If this is too large then the client starts
/// releasing objects.
int64_t in_use_object_bytes_;
/// Configuration options for the plasma client.
PlasmaClientConfig config_;
/// The amount of memory available to the Plasma store. The client needs this
/// information to make sure that it does not delay in releasing so much
/// memory that the store is unable to evict enough objects to free up space.
Expand Down Expand Up @@ -308,7 +288,6 @@ PlasmaClient::Impl::~Impl() {}
uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) {
auto entry = mmap_table_.find(store_fd_val);
if (entry != mmap_table_.end()) {
close(fd);
return entry->second.pointer;
} else {
// We subtract kMmapRegionsGap from the length that was added
Expand All @@ -322,9 +301,9 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_
close(fd); // Closing this fd has an effect on performance.

ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
entry.fd = fd;
entry.pointer = result;
entry.length = map_size;
entry.count = 0;
return result;
}
}
Expand All @@ -342,6 +321,17 @@ bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
return (elem != objects_in_use_.end());
}

int PlasmaClient::Impl::GetStoreFd(int store_fd) {
auto entry = mmap_table_.find(store_fd);
if (entry == mmap_table_.end()) {
int fd = recv_fd(store_conn_);
ARROW_CHECK(fd >= 0) << "recv not successful";
return fd;
} else {
return entry->second.fd;
}
}

void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
PlasmaObject* object, bool is_sealed) {
// Increment the count of the object to track the fact that it is being used.
Expand All @@ -357,18 +347,6 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
objects_in_use_[object_id]->count = 0;
objects_in_use_[object_id]->is_sealed = is_sealed;
object_entry = objects_in_use_[object_id].get();
if (object->device_num == 0) {
// Increment the count of the number of objects in the memory-mapped file
// that are being used. The corresponding decrement should happen in
// PlasmaClient::Release.
auto entry = mmap_table_.find(object->store_fd);
ARROW_CHECK(entry != mmap_table_.end());
ARROW_CHECK(entry->second.count >= 0);
// Update the in_use_object_bytes_.
in_use_object_bytes_ +=
(object_entry->object.data_size + object_entry->object.metadata_size);
entry->second.count += 1;
}
} else {
object_entry = elem->second.get();
ARROW_CHECK(object_entry->count > 0);
Expand Down Expand Up @@ -397,8 +375,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
// If the CreateReply included an error, then the store will not send a file
// descriptor.
if (device_num == 0) {
int fd = recv_fd(store_conn_);
ARROW_CHECK(fd >= 0) << "recv not successful";
int fd = GetStoreFd(store_fd);
ARROW_CHECK(object.data_size == data_size);
ARROW_CHECK(object.metadata_size == metadata_size);
// The metadata should come right after the data.
Expand Down Expand Up @@ -535,8 +512,7 @@ Status PlasmaClient::Impl::GetBuffers(
// in the subsequent loop based on just the store file descriptor and without
// having to know the relevant file descriptor received from recv_fd.
for (size_t i = 0; i < store_fds.size(); i++) {
int fd = recv_fd(store_conn_);
ARROW_CHECK(fd >= 0);
int fd = GetStoreFd(store_fds[i]);
LookupOrMmap(fd, store_fds[i], mmap_sizes[i]);
}

Expand Down Expand Up @@ -615,62 +591,29 @@ Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects,
return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out);
}

Status PlasmaClient::Impl::UnmapObject(const ObjectID& object_id) {
Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) {
auto object_entry = objects_in_use_.find(object_id);
ARROW_CHECK(object_entry != objects_in_use_.end());
ARROW_CHECK(object_entry->second->count == 0);

// Decrement the count of the number of objects in this memory-mapped file
// that the client is using. The corresponding increment should have
// happened in plasma_get.
int fd = object_entry->second->object.store_fd;
auto entry = mmap_table_.find(fd);
ARROW_CHECK(entry != mmap_table_.end());
ARROW_CHECK(entry->second.count >= 1);
if (entry->second.count == 1) {
// If no other objects are being used, then unmap the file.
// We subtract kMmapRegionsGap from the length that was added
// in fake_mmap in malloc.h, to make the size page-aligned again.
int err = munmap(entry->second.pointer, entry->second.length - kMmapRegionsGap);
if (err == -1) {
return Status::IOError("Error during munmap");
}
// Remove the corresponding entry from the hash table.
mmap_table_.erase(fd);
} else {
// If there are other objects being used, decrement the reference count.
entry->second.count -= 1;
}
// Update the in_use_object_bytes_.
in_use_object_bytes_ -= (object_entry->second->object.data_size +
object_entry->second->object.metadata_size);
DCHECK_GE(in_use_object_bytes_, 0);
// Remove the entry from the hash table of objects currently in use.
objects_in_use_.erase(object_id);
return Status::OK();
}

/// This is a helper method for implementing plasma_release. We maintain a
/// buffer
/// of release calls and only perform them once the buffer becomes full (as
/// judged by the aggregate sizes of the objects). There may be multiple release
/// calls for the same object ID in the buffer. In this case, the first release
/// calls will not do anything. The client will only send a message to the store
/// releasing the object when the client is truly done with the object.
///
/// @param object_id The object ID to attempt to release.
Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) {
// Decrement the count of the number of instances of this object that are
// being used by this client. The corresponding increment should have happened
// in PlasmaClient::Get.
Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
// If the client is already disconnected, ignore release requests.
if (store_conn_ < 0) {
return Status::OK();
}
auto object_entry = objects_in_use_.find(object_id);
ARROW_CHECK(object_entry != objects_in_use_.end());
object_entry->second->count -= 1;
ARROW_CHECK(object_entry->second->count >= 0);
// Check if the client is no longer using this object.
if (object_entry->second->count == 0) {
// Tell the store that the client no longer needs the object.
RETURN_NOT_OK(UnmapObject(object_id));
RETURN_NOT_OK(MarkObjectUnused(object_id));
RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id));
auto iter = deletion_cache_.find(object_id);
if (iter != deletion_cache_.end()) {
Expand All @@ -681,50 +624,6 @@ Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) {
return Status::OK();
}

Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
// If the client is already disconnected, ignore release requests.
if (store_conn_ < 0) {
return Status::OK();
}
// If an object is in the deletion cache, handle it directly without waiting.
auto iter = deletion_cache_.find(object_id);
if (iter != deletion_cache_.end()) {
RETURN_NOT_OK(PerformRelease(object_id));
return Status::OK();
}
// Add the new object to the release history.
release_history_.push_front(object_id);
// If there are too many bytes in use by the client or if there are too many
// pending release calls, and there are at least some pending release calls in
// the release_history list, then release some objects.

// TODO(wap): Eviction policy only works on host memory, and thus objects on
// the GPU cannot be released currently.
while ((in_use_object_bytes_ > std::min(kL3CacheSizeBytes, store_capacity_ / 100) ||
release_history_.size() > config_.release_delay) &&
release_history_.size() > 0) {
// Perform a release for the object ID for the first pending release.
RETURN_NOT_OK(PerformRelease(release_history_.back()));
// Remove the last entry from the release history.
release_history_.pop_back();
}
return Status::OK();
}

Status PlasmaClient::Impl::FlushReleaseHistory() {
// If the client is already disconnected, ignore the flush.
if (store_conn_ < 0) {
return Status::OK();
}
while (release_history_.size() > 0) {
// Perform a release for the object ID for the first pending release.
RETURN_NOT_OK(PerformRelease(release_history_.back()));
// Remove the last entry from the release history.
release_history_.pop_back();
}
return Status::OK();
}

// This method is used to query whether the plasma store contains an object.
Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) {
// Check if we already have a reference to the object.
Expand Down Expand Up @@ -855,8 +754,6 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
ARROW_CHECK(!object_entry->second->is_sealed)
<< "Plasma client called abort on a sealed object";

// Flush the release history.
RETURN_NOT_OK(FlushReleaseHistory());
// Make sure that the Plasma client only has one reference to the object. If
// it has more, then the client needs to release the buffer before calling
// abort.
Expand All @@ -868,7 +765,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
RETURN_NOT_OK(SendAbortRequest(store_conn_, object_id));
// Decrease the reference count to zero, then remove the object.
object_entry->second->count--;
RETURN_NOT_OK(UnmapObject(object_id));
RETURN_NOT_OK(MarkObjectUnused(object_id));

std::vector<uint8_t> buffer;
ObjectID id;
Expand All @@ -878,7 +775,6 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
}

Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
RETURN_NOT_OK(FlushReleaseHistory());
std::vector<ObjectID> not_in_use_ids;
for (auto& object_id : object_ids) {
// If the object is in used, skip it.
Expand Down Expand Up @@ -981,8 +877,10 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
} else {
manager_conn_ = -1;
}
config_.release_delay = release_delay;
in_use_object_bytes_ = 0;
if (release_delay != 0) {
ARROW_LOG(WARNING) << "The release_delay parameter in PlasmaClient::Connect "
<< "is deprecated";
}
// Send a ConnectRequest to the store to get its memory capacity.
RETURN_NOT_OK(SendConnectRequest(store_conn_));
std::vector<uint8_t> buffer;
Expand Down Expand Up @@ -1175,8 +1073,6 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {

int PlasmaClient::get_manager_fd() const { return impl_->get_manager_fd(); }

Status PlasmaClient::FlushReleaseHistory() { return impl_->FlushReleaseHistory(); }

bool PlasmaClient::IsInUse(const ObjectID& object_id) {
return impl_->IsInUse(object_id);
}
Expand Down
16 changes: 3 additions & 13 deletions cpp/src/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ using arrow::Status;

namespace plasma {

/// We keep a queue of unreleased objects cached in the client until we start
/// sending release requests to the store. This is to avoid frequently mapping
/// and unmapping objects and evicting data from processor caches.
constexpr int64_t kPlasmaDefaultReleaseDelay = 64;

/// Object buffer data structure.
struct ObjectBuffer {
/// The data buffer.
Expand All @@ -62,13 +57,12 @@ class ARROW_EXPORT PlasmaClient {
/// \param manager_socket_name The name of the UNIX domain socket to use to
/// connect to the local Plasma manager. If this is "", then this
/// function will not connect to a manager.
/// \param release_delay Number of released objects that are kept around
/// and not evicted to avoid too many munmaps.
/// \param release_delay Deprecated (not used).
/// \param num_retries number of attempts to connect to IPC socket, default 50
/// \return The return status.
Status Connect(const std::string& store_socket_name,
const std::string& manager_socket_name,
int release_delay = kPlasmaDefaultReleaseDelay, int num_retries = -1);
const std::string& manager_socket_name, int release_delay = 0,
int num_retries = -1);

/// Create an object in the Plasma Store. Any metadata for this object must be
/// be passed in when the object is created.
Expand Down Expand Up @@ -354,10 +348,6 @@ class ARROW_EXPORT PlasmaClient {
FRIEND_TEST(TestPlasmaStore, LegacyGetTest);
FRIEND_TEST(TestPlasmaStore, AbortTest);

/// This is a helper method that flushes all pending release calls to the
/// store.
Status FlushReleaseHistory();

bool IsInUse(const ObjectID& object_id);

class ARROW_NO_EXPORT Impl;
Expand Down
Loading

0 comments on commit b3bc338

Please sign in to comment.