Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3958: [Plasma] Reduce number of IPCs #3124

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 35 additions & 130 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ typedef struct XXH64_state_s XXH64_state_t;
constexpr int64_t kHashingConcurrency = 8;
constexpr int64_t kBytesInMB = 1 << 20;

// Use 100MB as an overestimate of the L3 cache size.
constexpr int64_t kL3CacheSizeBytes = 100000000;

// ----------------------------------------------------------------------
// GPU support

Expand Down Expand Up @@ -143,22 +140,13 @@ struct ObjectInUseEntry {
bool is_sealed;
};

/// Configuration options for the plasma client.
struct PlasmaClientConfig {
/// Number of release calls we wait until the object is actually released.
/// This allows us to avoid invalidating the cpu cache on workers if objects
/// are reused accross tasks.
size_t release_delay;
};

struct ClientMmapTableEntry {
/// The associated file descriptor on the client.
int fd;
/// The result of mmap for this file descriptor.
uint8_t* pointer;
/// The length of the memory-mapped file.
size_t length;
/// The number of objects in this memory-mapped file that are currently being
/// used by the client. When this count reaches zeros, we unmap the file.
int count;
};

class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Impl> {
Expand All @@ -169,7 +157,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
// PlasmaClient method implementations

Status Connect(const std::string& store_socket_name,
const std::string& manager_socket_name, int release_delay,
const std::string& manager_socket_name, int release_delay = 0,
int num_retries = -1);

Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata,
Expand Down Expand Up @@ -221,18 +209,20 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp

int get_manager_fd() const;

Status FlushReleaseHistory();

bool IsInUse(const ObjectID& object_id);

private:
/// This is a helper method for unmapping objects for which all references have
/// gone out of scope, either by calling Release or Abort.
/// Check if store_fd has already been received from the store. If yes,
/// return it. Otherwise, receive it from the store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment that this logic requires analogous logic in ReturnFromGet in store.cc to guarantee that file descriptors are sent to a client if and only if they have not been sent before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And probably add a comment in the relevant place in the store referring to this logic in the client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

///
/// @param object_id The object ID whose data we should unmap.
Status UnmapObject(const ObjectID& object_id);
/// @param store_fd File descriptor to fetch from the store.
/// @return Client file descriptor corresponding to store_fd.
int GetStoreFd(int store_fd);

Status PerformRelease(const ObjectID& object_id);
/// This is a helper method for marking an object as unused by this client.
///
/// @param object_id The object ID we mark unused.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Status MarkObjectUnused(const ObjectID& object_id);

/// Common helper for Get() variants
Status GetBuffers(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
Expand Down Expand Up @@ -267,18 +257,10 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
/// A hash table of the object IDs that are currently being used by this
/// client.
std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>> objects_in_use_;
/// Object IDs of the last few release calls. This is a deque and
/// is used to delay releasing objects to see if they can be reused by
/// subsequent tasks so we do not unneccessarily invalidate cpu caches.
/// TODO(pcm): replace this with a proper lru cache using the size of the L3
/// cache.
std::deque<ObjectID> release_history_;
/// The number of bytes in the combined objects that are held in the release
/// history doubly-linked list. If this is too large then the client starts
/// releasing objects.
int64_t in_use_object_bytes_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we don't need this anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good point, I removed it

/// Configuration options for the plasma client.
PlasmaClientConfig config_;
/// The amount of memory available to the Plasma store. The client needs this
/// information to make sure that it does not delay in releasing so much
/// memory that the store is unable to evict enough objects to free up space.
Expand Down Expand Up @@ -308,7 +290,6 @@ PlasmaClient::Impl::~Impl() {}
uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) {
auto entry = mmap_table_.find(store_fd_val);
if (entry != mmap_table_.end()) {
close(fd);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file descriptor is not send any more if it had already been send, so we shouldn't close it here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we close all the file descriptors in the PlasmaClient destructor?

Copy link
Contributor Author

@pcmoritz pcmoritz Dec 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they have already been closed by the first LookupOrMmap call (see the close call below), see also comment below

return entry->second.pointer;
} else {
// We subtract kMmapRegionsGap from the length that was added
Expand All @@ -322,9 +303,9 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_
close(fd); // Closing this fd has an effect on performance.

ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
entry.fd = fd;
entry.pointer = result;
entry.length = map_size;
entry.count = 0;
return result;
}
}
Expand All @@ -342,6 +323,17 @@ bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
return (elem != objects_in_use_.end());
}

int PlasmaClient::Impl::GetStoreFd(int store_fd) {
auto entry = mmap_table_.find(store_fd);
if (entry == mmap_table_.end()) {
int fd = recv_fd(store_conn_);
ARROW_CHECK(fd >= 0) << "recv not successful";
return fd;
} else {
return entry->second.fd;
}
}

void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
PlasmaObject* object, bool is_sealed) {
// Increment the count of the object to track the fact that it is being used.
Expand All @@ -358,16 +350,9 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
objects_in_use_[object_id]->is_sealed = is_sealed;
object_entry = objects_in_use_[object_id].get();
if (object->device_num == 0) {
// Increment the count of the number of objects in the memory-mapped file
// that are being used. The corresponding decrement should happen in
// PlasmaClient::Release.
auto entry = mmap_table_.find(object->store_fd);
ARROW_CHECK(entry != mmap_table_.end());
ARROW_CHECK(entry->second.count >= 0);
// Update the in_use_object_bytes_.
in_use_object_bytes_ +=
(object_entry->object.data_size + object_entry->object.metadata_size);
entry->second.count += 1;
}
} else {
object_entry = elem->second.get();
Expand Down Expand Up @@ -397,8 +382,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
// If the CreateReply included an error, then the store will not send a file
// descriptor.
if (device_num == 0) {
int fd = recv_fd(store_conn_);
ARROW_CHECK(fd >= 0) << "recv not successful";
int fd = GetStoreFd(store_fd);
ARROW_CHECK(object.data_size == data_size);
ARROW_CHECK(object.metadata_size == metadata_size);
// The metadata should come right after the data.
Expand Down Expand Up @@ -535,8 +519,7 @@ Status PlasmaClient::Impl::GetBuffers(
// in the subsequent loop based on just the store file descriptor and without
// having to know the relevant file descriptor received from recv_fd.
for (size_t i = 0; i < store_fds.size(); i++) {
int fd = recv_fd(store_conn_);
ARROW_CHECK(fd >= 0);
int fd = GetStoreFd(store_fds[i]);
LookupOrMmap(fd, store_fds[i], mmap_sizes[i]);
}

Expand Down Expand Up @@ -615,62 +598,34 @@ Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects,
return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out);
}

Status PlasmaClient::Impl::UnmapObject(const ObjectID& object_id) {
Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) {
auto object_entry = objects_in_use_.find(object_id);
ARROW_CHECK(object_entry != objects_in_use_.end());
ARROW_CHECK(object_entry->second->count == 0);

// Decrement the count of the number of objects in this memory-mapped file
// that the client is using. The corresponding increment should have
// happened in plasma_get.
int fd = object_entry->second->object.store_fd;
auto entry = mmap_table_.find(fd);
ARROW_CHECK(entry != mmap_table_.end());
ARROW_CHECK(entry->second.count >= 1);
if (entry->second.count == 1) {
// If no other objects are being used, then unmap the file.
// We subtract kMmapRegionsGap from the length that was added
// in fake_mmap in malloc.h, to make the size page-aligned again.
int err = munmap(entry->second.pointer, entry->second.length - kMmapRegionsGap);
if (err == -1) {
return Status::IOError("Error during munmap");
}
// Remove the corresponding entry from the hash table.
mmap_table_.erase(fd);
} else {
// If there are other objects being used, decrement the reference count.
entry->second.count -= 1;
}
// Update the in_use_object_bytes_.
in_use_object_bytes_ -= (object_entry->second->object.data_size +
object_entry->second->object.metadata_size);
DCHECK_GE(in_use_object_bytes_, 0);

// Remove the entry from the hash table of objects currently in use.
objects_in_use_.erase(object_id);
return Status::OK();
}

/// This is a helper method for implementing plasma_release. We maintain a
/// buffer
/// of release calls and only perform them once the buffer becomes full (as
/// judged by the aggregate sizes of the objects). There may be multiple release
/// calls for the same object ID in the buffer. In this case, the first release
/// calls will not do anything. The client will only send a message to the store
/// releasing the object when the client is truly done with the object.
///
/// @param object_id The object ID to attempt to release.
Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) {
// Decrement the count of the number of instances of this object that are
// being used by this client. The corresponding increment should have happened
// in PlasmaClient::Get.
Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
// If the client is already disconnected, ignore release requests.
if (store_conn_ < 0) {
return Status::OK();
}
auto object_entry = objects_in_use_.find(object_id);
ARROW_CHECK(object_entry != objects_in_use_.end());
object_entry->second->count -= 1;
ARROW_CHECK(object_entry->second->count >= 0);
// Check if the client is no longer using this object.
if (object_entry->second->count == 0) {
// Tell the store that the client no longer needs the object.
RETURN_NOT_OK(UnmapObject(object_id));
RETURN_NOT_OK(MarkObjectUnused(object_id));
RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id));
auto iter = deletion_cache_.find(object_id);
if (iter != deletion_cache_.end()) {
Expand All @@ -681,50 +636,6 @@ Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) {
return Status::OK();
}

Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
// If the client is already disconnected, ignore release requests.
if (store_conn_ < 0) {
return Status::OK();
}
// If an object is in the deletion cache, handle it directly without waiting.
auto iter = deletion_cache_.find(object_id);
if (iter != deletion_cache_.end()) {
RETURN_NOT_OK(PerformRelease(object_id));
return Status::OK();
}
// Add the new object to the release history.
release_history_.push_front(object_id);
// If there are too many bytes in use by the client or if there are too many
// pending release calls, and there are at least some pending release calls in
// the release_history list, then release some objects.

// TODO(wap): Eviction policy only works on host memory, and thus objects on
// the GPU cannot be released currently.
while ((in_use_object_bytes_ > std::min(kL3CacheSizeBytes, store_capacity_ / 100) ||
release_history_.size() > config_.release_delay) &&
release_history_.size() > 0) {
// Perform a release for the object ID for the first pending release.
RETURN_NOT_OK(PerformRelease(release_history_.back()));
// Remove the last entry from the release history.
release_history_.pop_back();
}
return Status::OK();
}

Status PlasmaClient::Impl::FlushReleaseHistory() {
// If the client is already disconnected, ignore the flush.
if (store_conn_ < 0) {
return Status::OK();
}
while (release_history_.size() > 0) {
// Perform a release for the object ID for the first pending release.
RETURN_NOT_OK(PerformRelease(release_history_.back()));
// Remove the last entry from the release history.
release_history_.pop_back();
}
return Status::OK();
}

// This method is used to query whether the plasma store contains an object.
Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) {
// Check if we already have a reference to the object.
Expand Down Expand Up @@ -855,8 +766,6 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
ARROW_CHECK(!object_entry->second->is_sealed)
<< "Plasma client called abort on a sealed object";

// Flush the release history.
RETURN_NOT_OK(FlushReleaseHistory());
// Make sure that the Plasma client only has one reference to the object. If
// it has more, then the client needs to release the buffer before calling
// abort.
Expand All @@ -868,7 +777,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
RETURN_NOT_OK(SendAbortRequest(store_conn_, object_id));
// Decrease the reference count to zero, then remove the object.
object_entry->second->count--;
RETURN_NOT_OK(UnmapObject(object_id));
RETURN_NOT_OK(MarkObjectUnused(object_id));

std::vector<uint8_t> buffer;
ObjectID id;
Expand All @@ -878,7 +787,6 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
}

Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
RETURN_NOT_OK(FlushReleaseHistory());
std::vector<ObjectID> not_in_use_ids;
for (auto& object_id : object_ids) {
// If the object is in used, skip it.
Expand Down Expand Up @@ -981,7 +889,6 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
} else {
manager_conn_ = -1;
}
config_.release_delay = release_delay;
in_use_object_bytes_ = 0;
// Send a ConnectRequest to the store to get its memory capacity.
RETURN_NOT_OK(SendConnectRequest(store_conn_));
Expand Down Expand Up @@ -1175,8 +1082,6 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {

int PlasmaClient::get_manager_fd() const { return impl_->get_manager_fd(); }

Status PlasmaClient::FlushReleaseHistory() { return impl_->FlushReleaseHistory(); }

bool PlasmaClient::IsInUse(const ObjectID& object_id) {
return impl_->IsInUse(object_id);
}
Expand Down
16 changes: 3 additions & 13 deletions cpp/src/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ using arrow::Status;

namespace plasma {

/// We keep a queue of unreleased objects cached in the client until we start
/// sending release requests to the store. This is to avoid frequently mapping
/// and unmapping objects and evicting data from processor caches.
constexpr int64_t kPlasmaDefaultReleaseDelay = 64;

/// Object buffer data structure.
struct ObjectBuffer {
/// The data buffer.
Expand All @@ -62,13 +57,12 @@ class ARROW_EXPORT PlasmaClient {
/// \param manager_socket_name The name of the UNIX domain socket to use to
/// connect to the local Plasma manager. If this is "", then this
/// function will not connect to a manager.
/// \param release_delay Number of released objects that are kept around
/// and not evicted to avoid too many munmaps.
/// \param release_delay Deprecated (not used).
/// \param num_retries number of attempts to connect to IPC socket, default 50
/// \return The return status.
Status Connect(const std::string& store_socket_name,
const std::string& manager_socket_name,
int release_delay = kPlasmaDefaultReleaseDelay, int num_retries = -1);
const std::string& manager_socket_name, int release_delay = 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to just get rid of release_delay or print a deprecation warning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least, can we remove it from any documentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

int num_retries = -1);

/// Create an object in the Plasma Store. Any metadata for this object must be
/// be passed in when the object is created.
Expand Down Expand Up @@ -354,10 +348,6 @@ class ARROW_EXPORT PlasmaClient {
FRIEND_TEST(TestPlasmaStore, LegacyGetTest);
FRIEND_TEST(TestPlasmaStore, AbortTest);

/// This is a helper method that flushes all pending release calls to the
/// store.
Status FlushReleaseHistory();

bool IsInUse(const ObjectID& object_id);

class ARROW_NO_EXPORT Impl;
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,10 @@ void PlasmaStore::ReturnFromGet(GetRequest* get_req) {
if (s.ok()) {
// Send all of the file descriptors for the present objects.
for (int store_fd : store_fds) {
WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd);
if (get_req->client->used_fds.find(store_fd) == get_req->client->used_fds.end()) {
WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd);
get_req->client->used_fds.emplace(store_fd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why emplace instead of insert? same comment below.

Copy link
Contributor Author

@pcmoritz pcmoritz Dec 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are equivalent here (I replaced it with insert, maybe that's a bit more idiomatic here, not sure)

}
}
}

Expand Down Expand Up @@ -783,8 +786,10 @@ Status PlasmaStore::ProcessMessage(Client* client) {
HANDLE_SIGPIPE(
SendCreateReply(client->fd, object_id, &object, error_code, mmap_size),
client->fd);
if (error_code == PlasmaError::OK && device_num == 0) {
if (error_code == PlasmaError::OK && device_num == 0 &&
client->used_fds.find(object.store_fd) == client->used_fds.end()) {
WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd);
client->used_fds.emplace(object.store_fd);
}
} break;
case fb::MessageType::PlasmaCreateAndSealRequest: {
Expand Down
Loading