Skip to content

Commit

Permalink
use only one mmapped file
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz committed Jan 28, 2019
1 parent 9a64805 commit 1e33c7c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 40 deletions.
42 changes: 19 additions & 23 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -905,21 +905,22 @@ class PlasmaStoreRunner {
PlasmaStoreRunner() {}

void Start(char* socket_name, int64_t system_memory, std::string directory,
bool hugepages_enabled, bool use_one_memory_mapped_file) {
bool hugepages_enabled) {
// Create the event loop.
loop_.reset(new EventLoop);
store_.reset(
new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled));
plasma_config = store_->GetPlasmaStoreInfo();

// If the store is configured to use a single memory-mapped file, then we
// achieve that by mallocing and freeing a single large amount of space.
// that maximum allowed size up front.
if (use_one_memory_mapped_file) {
void* pointer = plasma::dlmemalign(kBlockSize, system_memory);
ARROW_CHECK(pointer != nullptr);
plasma::dlfree(pointer);
}
// We are using a single memory-mapped file by mallocing and freeing a single
// large amount of space up front. According to the documentation,
// dlmalloc might need up to 128*sizeof(size_t) bytes for internal
// bookkeeping.
void* pointer = plasma::dlmemalign(kBlockSize, system_memory - 256 * sizeof(size_t));
ARROW_CHECK(pointer != nullptr);
// This will unmap the file, but the next one created will be as large
// as this one (this is an implementation detail of dlmalloc).
plasma::dlfree(pointer);

int socket = BindIpcSock(socket_name, true);
// TODO(pcm): Check return value.
Expand Down Expand Up @@ -955,15 +956,14 @@ void HandleSignal(int signal) {
}

void StartServer(char* socket_name, int64_t system_memory, std::string plasma_directory,
bool hugepages_enabled, bool use_one_memory_mapped_file) {
bool hugepages_enabled) {
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
signal(SIGPIPE, SIG_IGN);

g_runner.reset(new PlasmaStoreRunner());
signal(SIGTERM, HandleSignal);
g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled,
use_one_memory_mapped_file);
g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled);
}

} // namespace plasma
Expand All @@ -975,11 +975,9 @@ int main(int argc, char* argv[]) {
// Directory where plasma memory mapped files are stored.
std::string plasma_directory;
bool hugepages_enabled = false;
// True if a single large memory-mapped file should be created at startup.
bool use_one_memory_mapped_file = false;
int64_t system_memory = -1;
int c;
while ((c = getopt(argc, argv, "s:m:d:hf")) != -1) {
while ((c = getopt(argc, argv, "s:m:d:h")) != -1) {
switch (c) {
case 'd':
plasma_directory = std::string(optarg);
Expand All @@ -994,14 +992,16 @@ int main(int argc, char* argv[]) {
char extra;
int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra);
ARROW_CHECK(scanned == 1);
// Set system memory, potentially rounding it to a page size
// Also make it so dlmalloc fails if we try to request more memory than
// is available.
system_memory =
plasma::dlmalloc_set_footprint_limit(static_cast<size_t>(system_memory));
ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
<< static_cast<double>(system_memory) / 1000000000
<< "GB of memory.";
break;
}
case 'f':
use_one_memory_mapped_file = true;
break;
default:
exit(-1);
}
Expand Down Expand Up @@ -1051,12 +1051,8 @@ int main(int argc, char* argv[]) {
SetMallocGranularity(1024 * 1024 * 1024); // 1 GB
}
#endif
// Make it so dlmalloc fails if we try to request more memory than is
// available.
plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled,
use_one_memory_mapped_file);
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled);
plasma::g_runner->Shutdown();
plasma::g_runner = nullptr;

Expand Down
5 changes: 0 additions & 5 deletions python/pyarrow/plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def build_plasma_tensorflow_op():
@contextlib.contextmanager
def start_plasma_store(plasma_store_memory,
use_valgrind=False, use_profiler=False,
use_one_memory_mapped_file=False,
plasma_directory=None, use_hugepages=False):
"""Start a plasma store process.
Args:
Expand All @@ -87,8 +86,6 @@ def start_plasma_store(plasma_store_memory,
of valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the plasma store should be started inside
a profiler. If this is True, use_valgrind must be False.
use_one_memory_mapped_file: If True, then the store will use only a
single memory-mapped file.
plasma_directory (str): Directory where plasma memory mapped files
will be stored.
use_hugepages (bool): True if the plasma store should use huge pages.
Expand All @@ -107,8 +104,6 @@ def start_plasma_store(plasma_store_memory,
command = [plasma_store_executable,
"-s", plasma_store_name,
"-m", str(plasma_store_memory)]
if use_one_memory_mapped_file:
command += ["-f"]
if plasma_directory:
command += ["-d", plasma_directory]
if use_hugepages:
Expand Down
25 changes: 13 additions & 12 deletions python/pyarrow/tests/test_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8
USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1"
SMALL_OBJECT_SIZE = 9000


def random_name():
Expand Down Expand Up @@ -110,15 +111,11 @@ def assert_get_object_equal(unit_test, client1, client2, object_id,
class TestPlasmaClient(object):

def setup_method(self, test_method):
use_one_memory_mapped_file = (test_method ==
self.test_use_one_memory_mapped_file)

import pyarrow.plasma as plasma
# Start Plasma store.
self.plasma_store_ctx = plasma.start_plasma_store(
plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
use_valgrind=USE_VALGRIND,
use_one_memory_mapped_file=use_one_memory_mapped_file)
use_valgrind=USE_VALGRIND)
self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
# Connect to Plasma.
self.plasma_client = plasma.connect(self.plasma_store_name)
Expand Down Expand Up @@ -471,22 +468,26 @@ def assert_create_raises_plasma_full(unit_test, size):
memory_buffers.append(memory_buffer)
# Remaining space is 50%. Make sure that we can't create an
# object of size 50% + 1, but we can create one of size 20%.
assert_create_raises_plasma_full(self, 50 * PERCENT + 1)
assert_create_raises_plasma_full(
self, 50 * PERCENT + SMALL_OBJECT_SIZE)
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
del memory_buffer
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
del memory_buffer
assert_create_raises_plasma_full(self, 50 * PERCENT + 1)
assert_create_raises_plasma_full(
self, 50 * PERCENT + SMALL_OBJECT_SIZE)

_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
memory_buffers.append(memory_buffer)
# Remaining space is 30%.
assert_create_raises_plasma_full(self, 30 * PERCENT + 1)
assert_create_raises_plasma_full(
self, 30 * PERCENT + SMALL_OBJECT_SIZE)

_, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT)
memory_buffers.append(memory_buffer)
# Remaining space is 20%.
assert_create_raises_plasma_full(self, 20 * PERCENT + 1)
assert_create_raises_plasma_full(
self, 20 * PERCENT + SMALL_OBJECT_SIZE)

def test_contains(self):
fake_object_ids = [random_object_id() for _ in range(100)]
Expand Down Expand Up @@ -838,7 +839,7 @@ def test_subscribe_deletions(self):
assert -1 == recv_dsize
assert -1 == recv_msize

def test_use_one_memory_mapped_file(self):
def test_use_full_memory(self):
# Fill the object store up with a large number of small objects and let
# them go out of scope.
for _ in range(100):
Expand All @@ -851,8 +852,8 @@ def test_use_one_memory_mapped_file(self):
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
# Verify that an object that is too large does not fit.
with pytest.raises(pa.lib.PlasmaStoreFull):
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + 1,
0)
create_object(self.plasma_client2,
DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0)

def test_client_death_during_get(self):
import pyarrow.plasma as plasma
Expand Down

0 comments on commit 1e33c7c

Please sign in to comment.