diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index ca49d010aa529..343ccf5b886f2 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -905,21 +905,22 @@ class PlasmaStoreRunner { PlasmaStoreRunner() {} void Start(char* socket_name, int64_t system_memory, std::string directory, - bool hugepages_enabled, bool use_one_memory_mapped_file) { + bool hugepages_enabled) { // Create the event loop. loop_.reset(new EventLoop); store_.reset( new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled)); plasma_config = store_->GetPlasmaStoreInfo(); - // If the store is configured to use a single memory-mapped file, then we - // achieve that by mallocing and freeing a single large amount of space. - // that maximum allowed size up front. - if (use_one_memory_mapped_file) { - void* pointer = plasma::dlmemalign(kBlockSize, system_memory); - ARROW_CHECK(pointer != nullptr); - plasma::dlfree(pointer); - } + // We are using a single memory-mapped file by mallocing and freeing a single + // large amount of space up front. According to the documentation, + // dlmalloc might need up to 128*sizeof(size_t) bytes for internal + // bookkeeping. + void* pointer = plasma::dlmemalign(kBlockSize, system_memory - 256 * sizeof(size_t)); + ARROW_CHECK(pointer != nullptr); + // This will unmap the file, but the next one created will be as large + // as this one (this is an implementation detail of dlmalloc). + plasma::dlfree(pointer); int socket = BindIpcSock(socket_name, true); // TODO(pcm): Check return value. @@ -955,15 +956,14 @@ void HandleSignal(int signal) { } void StartServer(char* socket_name, int64_t system_memory, std::string plasma_directory, - bool hugepages_enabled, bool use_one_memory_mapped_file) { + bool hugepages_enabled) { // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write // to a client that has already died, the store could die. signal(SIGPIPE, SIG_IGN); g_runner.reset(new PlasmaStoreRunner()); signal(SIGTERM, HandleSignal); - g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled, - use_one_memory_mapped_file); + g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled); } } // namespace plasma @@ -975,11 +975,9 @@ int main(int argc, char* argv[]) { // Directory where plasma memory mapped files are stored. std::string plasma_directory; bool hugepages_enabled = false; - // True if a single large memory-mapped file should be created at startup. - bool use_one_memory_mapped_file = false; int64_t system_memory = -1; int c; - while ((c = getopt(argc, argv, "s:m:d:hf")) != -1) { + while ((c = getopt(argc, argv, "s:m:d:h")) != -1) { switch (c) { case 'd': plasma_directory = std::string(optarg); @@ -994,14 +992,16 @@ int main(int argc, char* argv[]) { char extra; int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra); ARROW_CHECK(scanned == 1); + // Set system memory, potentially rounding it to a page size + // Also make it so dlmalloc fails if we try to request more memory than + // is available. + system_memory = + plasma::dlmalloc_set_footprint_limit(static_cast(system_memory)); ARROW_LOG(INFO) << "Allowing the Plasma store to use up to " << static_cast(system_memory) / 1000000000 << "GB of memory."; break; } - case 'f': - use_one_memory_mapped_file = true; - break; default: exit(-1); } @@ -1051,12 +1051,8 @@ int main(int argc, char* argv[]) { SetMallocGranularity(1024 * 1024 * 1024); // 1 GB } #endif - // Make it so dlmalloc fails if we try to request more memory than is - // available. - plasma::dlmalloc_set_footprint_limit((size_t)system_memory); ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; - plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled, - use_one_memory_mapped_file); + plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled); plasma::g_runner->Shutdown(); plasma::g_runner = nullptr; diff --git a/python/pyarrow/plasma.py b/python/pyarrow/plasma.py index 056172c9800de..a6ab362536d00 100644 --- a/python/pyarrow/plasma.py +++ b/python/pyarrow/plasma.py @@ -78,7 +78,6 @@ def build_plasma_tensorflow_op(): @contextlib.contextmanager def start_plasma_store(plasma_store_memory, use_valgrind=False, use_profiler=False, - use_one_memory_mapped_file=False, plasma_directory=None, use_hugepages=False): """Start a plasma store process. Args: @@ -87,8 +86,6 @@ def start_plasma_store(plasma_store_memory, of valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the plasma store should be started inside a profiler. If this is True, use_valgrind must be False. - use_one_memory_mapped_file: If True, then the store will use only a - single memory-mapped file. plasma_directory (str): Directory where plasma memory mapped files will be stored. use_hugepages (bool): True if the plasma store should use huge pages. @@ -107,8 +104,6 @@ def start_plasma_store(plasma_store_memory, command = [plasma_store_executable, "-s", plasma_store_name, "-m", str(plasma_store_memory)] - if use_one_memory_mapped_file: - command += ["-f"] if plasma_directory: command += ["-d", plasma_directory] if use_hugepages: diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 05375d7b65aee..bcb467aab8e3e 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -37,6 +37,7 @@ DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8 USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1" +SMALL_OBJECT_SIZE = 9000 def random_name(): @@ -110,15 +111,11 @@ def assert_get_object_equal(unit_test, client1, client2, object_id, class TestPlasmaClient(object): def setup_method(self, test_method): - use_one_memory_mapped_file = (test_method == - self.test_use_one_memory_mapped_file) - import pyarrow.plasma as plasma # Start Plasma store. self.plasma_store_ctx = plasma.start_plasma_store( plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, - use_valgrind=USE_VALGRIND, - use_one_memory_mapped_file=use_one_memory_mapped_file) + use_valgrind=USE_VALGRIND) self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() # Connect to Plasma. self.plasma_client = plasma.connect(self.plasma_store_name) @@ -471,22 +468,26 @@ def assert_create_raises_plasma_full(unit_test, size): memory_buffers.append(memory_buffer) # Remaining space is 50%. Make sure that we can't create an # object of size 50% + 1, but we can create one of size 20%. - assert_create_raises_plasma_full(self, 50 * PERCENT + 1) + assert_create_raises_plasma_full( + self, 50 * PERCENT + SMALL_OBJECT_SIZE) _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) del memory_buffer _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) del memory_buffer - assert_create_raises_plasma_full(self, 50 * PERCENT + 1) + assert_create_raises_plasma_full( + self, 50 * PERCENT + SMALL_OBJECT_SIZE) _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) memory_buffers.append(memory_buffer) # Remaining space is 30%. - assert_create_raises_plasma_full(self, 30 * PERCENT + 1) + assert_create_raises_plasma_full( + self, 30 * PERCENT + SMALL_OBJECT_SIZE) _, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT) memory_buffers.append(memory_buffer) # Remaining space is 20%. - assert_create_raises_plasma_full(self, 20 * PERCENT + 1) + assert_create_raises_plasma_full( + self, 20 * PERCENT + SMALL_OBJECT_SIZE) def test_contains(self): fake_object_ids = [random_object_id() for _ in range(100)] @@ -838,7 +839,7 @@ def test_subscribe_deletions(self): assert -1 == recv_dsize assert -1 == recv_msize - def test_use_one_memory_mapped_file(self): + def test_use_full_memory(self): # Fill the object store up with a large number of small objects and let # them go out of scope. for _ in range(100): @@ -851,8 +852,8 @@ def test_use_one_memory_mapped_file(self): create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0) # Verify that an object that is too large does not fit. with pytest.raises(pa.lib.PlasmaStoreFull): - create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + 1, - 0) + create_object(self.plasma_client2, + DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0) def test_client_death_during_get(self): import pyarrow.plasma as plasma