Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3958: [Plasma] Reduce number of IPCs #3124

Closed
wants to merge 12 commits into from

Conversation

pcmoritz
Copy link
Contributor

@pcmoritz pcmoritz commented Dec 7, 2018

This PR also removes the client unmap, which is not necessary any more since the introduction of malloc (since there is only few memory mapped files and they typically stay around for the lifetime of the application).

The PR also gets rid of a bunch of code that is not needed any more now (the release buffer, yay!).

Benchmarks:

import pyarrow.plasma as plasma

client = plasma.connect("/tmp/plasma", "", 0)

# Put performance

def f():
    for i in range(10000):
        client.put(1)

%timeit f()

# without optimization:

# 1.51 s ± 2.22 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 1.52 s ± 9.68 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 1.53 s ± 19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) 

# with optimizations:

# 1.27 s ± 10.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 1.31 s ± 8.18 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 1.31 s ± 17.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# Create/seal performance

def f(): 
    for i in range(10000): 
        object_id = plasma.ObjectID.from_random() 
        client.create(object_id, 0) 
        client.seal(object_id)

%timeit f()

# without optimizations:

# 571 ms ± 2.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 583 ms ± 22.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 588 ms ± 14.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# with optimizations:

# 531 ms ± 3.24 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 541 ms ± 9.99 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 542 ms ± 19.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# Get performance

objects = [client.put(1) for i in range(10000)]

def g():
    for i in range(10000):
        client.get(objects[i])

%timeit g()

# without optimizations

# 1.11 s ± 6.17 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 1.12 s ± 1.49 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 1.19 s ± 24.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# with optimizations

# 776 ms ± 11.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 792 ms ± 3.06 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 778 ms ± 9.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

@@ -155,11 +158,12 @@ TEST_F(TestPlasmaStore, SealErrorsTest) {

// Create object.
std::vector<uint8_t> data(100, 0);
CreateObject(client_, object_id, {42}, data);
CreateObject(client_, object_id, {42}, data, false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug that was masked by the release_delay before. If we call Release before the second Seal, the result status will be "PlasmaObjectNonexistent" instead of "PlasmaObjectAlreadySealed".

ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
ASSERT_TRUE(has_object);
// The Delete call will flush release cache and send the Delete request.
// Delete the objects.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is not needed any more (there is no release cache any more)

@@ -386,7 +384,6 @@ TEST_F(TestPlasmaStore, AbortTest) {
// Test that we can get the object.
ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
AssertObjectBufferEqual(object_buffers[0], {42, 43}, {1, 2, 3, 4, 5});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following line was a bug masked by the release cache. This form of get calls Release in the destructor of the PlasmaBuffer shared pointer.

@pcmoritz pcmoritz changed the title ARROW-3958: [Plasma] Reduce number of IPCs [WIP] ARROW-3958: [Plasma] Reduce number of IPCs Dec 8, 2018
@robertnishihara
Copy link
Contributor

@pcmoritz can you clarify what you mean in the PR description by "since the introduction of malloc"? Do you mean "jemalloc"?

Also, it is normal in a lot of situations for clients to release all of their objects, in which case the unmap makes perfect sense. Can you elaborate on the rationale here?

@pcmoritz
Copy link
Contributor Author

@robertnishihara The main rationale of this PR is to get rid of the release delay and the additional IPCs to send over the file descriptor. There is no advantage in unmapping in remapping the memory mapped files.

To be hones I would prefer to statically allocate a memory mapped file at the beginning that everybody uses, but that's not feasible with the way malloc implementations are designed (both dlmalloc and jemalloc).

@@ -308,7 +290,6 @@ PlasmaClient::Impl::~Impl() {}
uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) {
auto entry = mmap_table_.find(store_fd_val);
if (entry != mmap_table_.end()) {
close(fd);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file descriptor is not send any more if it had already been send, so we shouldn't close it here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we close all the file descriptors in the PlasmaClient destructor?

Copy link
Contributor Author

@pcmoritz pcmoritz Dec 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they have already been closed by the first LookupOrMmap call (see the close call below), see also comment below

Status PerformRelease(const ObjectID& object_id);
/// This is a helper method for marking an object as unused by this client.
///
/// @param object_id The object ID we mark unused.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/// subsequent tasks so we do not unneccessarily invalidate cpu caches.
/// TODO(pcm): replace this with a proper lru cache using the size of the L3
/// cache.
std::deque<ObjectID> release_history_;
/// The number of bytes in the combined objects that are held in the release
/// history doubly-linked list. If this is too large then the client starts
/// releasing objects.
int64_t in_use_object_bytes_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we don't need this anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good point, I removed it

/// This is a helper method for unmapping objects for which all references have
/// gone out of scope, either by calling Release or Abort.
/// Check if store_fd has already been received from the store. If yes,
/// return it. Otherwise, receive it from the store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment that this logic requires analogous logic in ReturnFromGet in store.cc to guarantee that file descriptors are sent to a client if and only if they have not been sent before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And probably add a comment in the relevant place in the store referring to this logic in the client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/// \param num_retries number of attempts to connect to IPC socket, default 50
/// \return The return status.
Status Connect(const std::string& store_socket_name,
const std::string& manager_socket_name,
int release_delay = kPlasmaDefaultReleaseDelay, int num_retries = -1);
const std::string& manager_socket_name, int release_delay = 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to just get rid of release_delay or print a deprecation warning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least, can we remove it from any documentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -308,7 +290,6 @@ PlasmaClient::Impl::~Impl() {}
uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) {
auto entry = mmap_table_.find(store_fd_val);
if (entry != mmap_table_.end()) {
close(fd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we close all the file descriptors in the PlasmaClient destructor?

WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd);
if (get_req->client->used_fds.find(store_fd) == get_req->client->used_fds.end()) {
WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd);
get_req->client->used_fds.emplace(store_fd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why emplace instead of insert? same comment below.

Copy link
Contributor Author

@pcmoritz pcmoritz Dec 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are equivalent here (I replaced it with insert, maybe that's a bit more idiomatic here, not sure)

@robertnishihara robertnishihara changed the title [WIP] ARROW-3958: [Plasma] Reduce number of IPCs ARROW-3958: [Plasma] Reduce number of IPCs Dec 13, 2018
@robertnishihara
Copy link
Contributor

Looks good to me! Thanks!

@codecov-io
Copy link

Codecov Report

Merging #3124 into master will increase coverage by 0.87%.
The diff coverage is 90.24%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master   #3124      +/-   ##
=========================================
+ Coverage   87.23%   88.1%   +0.87%     
=========================================
  Files         494     437      -57     
  Lines       69615   65900    -3715     
=========================================
- Hits        60729   58064    -2665     
+ Misses       8787    7836     -951     
+ Partials       99       0      -99
Impacted Files Coverage Δ
cpp/src/plasma/store.h 100% <ø> (ø) ⬆️
cpp/src/plasma/client.h 100% <ø> (ø) ⬆️
cpp/src/plasma/test/client_tests.cc 100% <100%> (ø) ⬆️
python/pyarrow/tests/test_plasma_tf_op.py 97.91% <100%> (ø) ⬆️
cpp/src/plasma/store.cc 92.48% <100%> (+0.05%) ⬆️
python/pyarrow/tests/test_plasma.py 96.17% <100%> (-0.17%) ⬇️
python/pyarrow/_plasma.pyx 65.27% <66.66%> (-0.26%) ⬇️
cpp/src/plasma/client.cc 83.33% <89.47%> (-1.36%) ⬇️
python/pyarrow/compat.py 57.14% <0%> (-20.5%) ⬇️
python/pyarrow/formatting.py 37.5% <0%> (-18.75%) ⬇️
... and 128 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 1dee3f4...f899f45. Read the comment docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants