Skip to content

Commit

Permalink
use only one mmapped file
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz committed Jan 26, 2019
1 parent 9a64805 commit 117d03a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 26 deletions.
35 changes: 15 additions & 20 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -905,21 +905,20 @@ class PlasmaStoreRunner {
PlasmaStoreRunner() {}

void Start(char* socket_name, int64_t system_memory, std::string directory,
bool hugepages_enabled, bool use_one_memory_mapped_file) {
bool hugepages_enabled) {
// Create the event loop.
loop_.reset(new EventLoop);
store_.reset(
new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled));
plasma_config = store_->GetPlasmaStoreInfo();

// If the store is configured to use a single memory-mapped file, then we
// achieve that by mallocing and freeing a single large amount of space.
// that maximum allowed size up front.
if (use_one_memory_mapped_file) {
void* pointer = plasma::dlmemalign(kBlockSize, system_memory);
ARROW_CHECK(pointer != nullptr);
plasma::dlfree(pointer);
}
// We are using a single memory-mapped file by mallocing and freeing a single
// large amount of space up front. According to the documentation,
// dlmalloc might need up to 128*sizeof(size_t) bytes for internal
// bookkeeping.
void* pointer = plasma::dlmemalign(kBlockSize, system_memory - 128 * sizeof(size_t));
ARROW_CHECK(pointer != nullptr);
plasma::dlfree(pointer);

int socket = BindIpcSock(socket_name, true);
// TODO(pcm): Check return value.
Expand Down Expand Up @@ -955,15 +954,14 @@ void HandleSignal(int signal) {
}

void StartServer(char* socket_name, int64_t system_memory, std::string plasma_directory,
bool hugepages_enabled, bool use_one_memory_mapped_file) {
bool hugepages_enabled) {
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
signal(SIGPIPE, SIG_IGN);

g_runner.reset(new PlasmaStoreRunner());
signal(SIGTERM, HandleSignal);
g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled,
use_one_memory_mapped_file);
g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled);
}

} // namespace plasma
Expand All @@ -975,8 +973,6 @@ int main(int argc, char* argv[]) {
// Directory where plasma memory mapped files are stored.
std::string plasma_directory;
bool hugepages_enabled = false;
// True if a single large memory-mapped file should be created at startup.
bool use_one_memory_mapped_file = false;
int64_t system_memory = -1;
int c;
while ((c = getopt(argc, argv, "s:m:d:hf")) != -1) {
Expand All @@ -994,13 +990,16 @@ int main(int argc, char* argv[]) {
char extra;
int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra);
ARROW_CHECK(scanned == 1);
// Set system memory, potentially rounding it to a page size
// Also make it so dlmalloc fails if we try to request more memory than
// is available.
system_memory = plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
<< static_cast<double>(system_memory) / 1000000000
<< "GB of memory.";
break;
}
case 'f':
use_one_memory_mapped_file = true;
break;
default:
exit(-1);
Expand Down Expand Up @@ -1051,12 +1050,8 @@ int main(int argc, char* argv[]) {
SetMallocGranularity(1024 * 1024 * 1024); // 1 GB
}
#endif
// Make it so dlmalloc fails if we try to request more memory than is
// available.
plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled,
use_one_memory_mapped_file);
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled);
plasma::g_runner->Shutdown();
plasma::g_runner = nullptr;

Expand Down
17 changes: 11 additions & 6 deletions python/pyarrow/tests/test_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8
USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1"
SMALL_OBJECT_SIZE = 9000


def random_name():
Expand Down Expand Up @@ -471,22 +472,26 @@ def assert_create_raises_plasma_full(unit_test, size):
memory_buffers.append(memory_buffer)
# Remaining space is 50%. Make sure that we can't create an
# object of size 50% + 1, but we can create one of size 20%.
assert_create_raises_plasma_full(self, 50 * PERCENT + 1)
assert_create_raises_plasma_full(
self, 50 * PERCENT + SMALL_OBJECT_SIZE)
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
del memory_buffer
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
del memory_buffer
assert_create_raises_plasma_full(self, 50 * PERCENT + 1)
assert_create_raises_plasma_full(
self, 50 * PERCENT + SMALL_OBJECT_SIZE)

_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
memory_buffers.append(memory_buffer)
# Remaining space is 30%.
assert_create_raises_plasma_full(self, 30 * PERCENT + 1)
assert_create_raises_plasma_full(
self, 30 * PERCENT + SMALL_OBJECT_SIZE)

_, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT)
memory_buffers.append(memory_buffer)
# Remaining space is 20%.
assert_create_raises_plasma_full(self, 20 * PERCENT + 1)
assert_create_raises_plasma_full(
self, 20 * PERCENT + SMALL_OBJECT_SIZE)

def test_contains(self):
fake_object_ids = [random_object_id() for _ in range(100)]
Expand Down Expand Up @@ -851,8 +856,8 @@ def test_use_one_memory_mapped_file(self):
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
# Verify that an object that is too large does not fit.
with pytest.raises(pa.lib.PlasmaStoreFull):
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + 1,
0)
create_object(self.plasma_client2,
DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0)

def test_client_death_during_get(self):
import pyarrow.plasma as plasma
Expand Down

0 comments on commit 117d03a

Please sign in to comment.