Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][ObjectRef] Change default to not record call stack during ObjectRef creation #18078

Merged
merged 8 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions dashboard/memory_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def memory_summary(state,
| {:<4} | {:<14} | {:<10}\n"

if size > line_wrap_threshold and line_wrap:
object_ref_string = "{:<12} {:<5} {:<6} {:<22} {:<6} {:<18} \
object_ref_string = "{:<15} {:<5} {:<6} {:<22} {:<6} {:<18} \
{:<56}\n"

mem += f"Grouping by {group_by}...\
Expand All @@ -406,12 +406,11 @@ def memory_summary(state,
# Group summary
summary = group["summary"]
ref_size = track_reference_size(group)
for key in summary:
if key == "total_object_size":
summary[key] = str(summary[key] / units[unit]) + f" {unit}"
for k, v in summary.items():
if k == "total_object_size":
summary[k] = str(v / units[unit]) + f" {unit}"
else:
summary[key] = str(
summary[key]) + f", ({ref_size[key] / units[unit]} {unit})"
summary[k] = str(v) + f", ({ref_size[k] / units[unit]} {unit})"
mem += f"--- Summary for {group_by}: {key} ---\n"
mem += summary_string\
.format(*summary_labels)
Expand All @@ -432,10 +431,13 @@ def memory_summary(state,
num_lines = 1
if size > line_wrap_threshold and line_wrap:
call_site_length = 22
entry["call_site"] = [
entry["call_site"][i:i + call_site_length] for i in range(
0, len(entry["call_site"]), call_site_length)
]
if len(entry["call_site"]) == 0:
entry["call_site"] = ["disabled"]
else:
entry["call_site"] = [
entry["call_site"][i:i + call_site_length] for i in
range(0, len(entry["call_site"]), call_site_length)
]
num_lines = len(entry["call_site"])
else:
mem += "\n"
Expand All @@ -455,5 +457,8 @@ def memory_summary(state,
.format(*row)
mem += "\n"
n += 1
mem += "\n\n"

mem += "To record callsite information for each ObjectRef created, set " \
"env variable RAY_record_ref_creation_sites=1\n\n"

return mem
18 changes: 11 additions & 7 deletions python/ray/_private/ray_microbenchmark_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@ def timeit(name, fn, multiplier=1) -> List[Optional[Tuple[str, float, float]]]:
if filter_pattern not in name:
return [None]
# warmup
start = time.time()
while time.time() - start < 1:
start = time.perf_counter()
count = 0
while time.perf_counter() - start < 1:
fn()
count += 1
# real run
step = count // 10 + 1
stats = []
for _ in range(4):
start = time.time()
start = time.perf_counter()
count = 0
while time.time() - start < 2:
fn()
count += 1
end = time.time()
while time.perf_counter() - start < 2:
for _ in range(step):
fn()
count += step
end = time.perf_counter()
stats.append(multiplier * count / (end - start))

mean = np.mean(stats)
Expand Down
16 changes: 16 additions & 0 deletions python/ray/_private/ray_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ def small_value_batch(n):
return 0


@ray.remote
def create_object_containing_ref():
obj_refs = []
for _ in range(10000):
obj_refs.append(ray.put(1))
return obj_refs


def check_optimized_build():
if not ray._raylet.OPTIMIZED:
msg = ("WARNING: Unoptimized build! "
Expand Down Expand Up @@ -140,6 +148,14 @@ def put_multi():

results += timeit("multi client put gigabytes", put_multi, 10 * 8 * 0.1)

obj_containing_ref = create_object_containing_ref.remote()

def get_containing_object_ref():
ray.get(obj_containing_ref)

results += timeit("single client get object containing 10k refs",
get_containing_object_ref)

def small_task():
ray.get(small_value.remote())

Expand Down
43 changes: 40 additions & 3 deletions python/ray/tests/test_memstat.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import numpy as np
import os
import time

import pytest
import ray
from ray.cluster_utils import Cluster
from ray.internal.internal_api import memory_summary

# RayConfig to enable recording call sites during ObjectRej creations.
ray_config = {"record_ref_creation_sites": True}

# Unique strings.
DRIVER_PID = "Driver"
WORKER_PID = "Worker"
Expand All @@ -22,7 +27,8 @@
TASK_CALL_OBJ = "(task call)"
ACTOR_TASK_CALL_OBJ = "(actor call)"
DESER_TASK_ARG = "(deserialize task arg)"
DESER_ACTOR_TASK_ARG = "(deserialize actor task arg)"
# Only 22 characters can be matched because longer strings are wrapped around.
DESER_ACTOR_TASK_ARG = "(deserialize actor tas"

# Group by and sort by parameters.
NODE_ADDRESS = "node address"
Expand Down Expand Up @@ -50,14 +56,18 @@ def num_objects(memory_str):


def count(memory_str, substr):
substr = substr[:39]
substr = substr[:42]
n = 0
for line in memory_str.split("\n"):
if substr in line:
n += 1
return n


@pytest.mark.parametrize(
"ray_start_regular", [{
"_system_config": ray_config
}], indirect=True)
def test_driver_put_ref(ray_start_regular):
address = ray_start_regular["redis_address"]
info = memory_summary(address)
Expand All @@ -73,6 +83,10 @@ def test_driver_put_ref(ray_start_regular):
assert num_objects(info) == 0, info


@pytest.mark.parametrize(
"ray_start_regular", [{
"_system_config": ray_config
}], indirect=True)
def test_worker_task_refs(ray_start_regular):
address = ray_start_regular["redis_address"]

Expand Down Expand Up @@ -112,6 +126,10 @@ def f(y):
assert num_objects(info) == 0, info


@pytest.mark.parametrize(
"ray_start_regular", [{
"_system_config": ray_config
}], indirect=True)
def test_actor_task_refs(ray_start_regular):
address = ray_start_regular["redis_address"]

Expand Down Expand Up @@ -160,6 +178,10 @@ def make_actor():
assert num_objects(info) == 0, info


@pytest.mark.parametrize(
"ray_start_regular", [{
"_system_config": ray_config
}], indirect=True)
def test_nested_object_refs(ray_start_regular):
address = ray_start_regular["redis_address"]
x_id = ray.put(np.zeros(100000))
Expand All @@ -174,6 +196,10 @@ def test_nested_object_refs(ray_start_regular):
del z_id


@pytest.mark.parametrize(
"ray_start_regular", [{
"_system_config": ray_config
}], indirect=True)
def test_pinned_object_call_site(ray_start_regular):
address = ray_start_regular["redis_address"]
# Local ref only.
Expand Down Expand Up @@ -208,6 +234,10 @@ def test_pinned_object_call_site(ray_start_regular):


def test_multi_node_stats(shutdown_only):
# NOTE(mwtian): using env var only enables the feature on workers, while
# using head_node_args={"_system_config": ray_config} only enables the
# feature on the driver.
os.environ["RAY_record_ref_creation_sites"] = "1"
cluster = Cluster()
for _ in range(2):
cluster.add_node(num_cpus=1)
Expand All @@ -234,6 +264,10 @@ def ping(self):
assert count(info, PUT_OBJ) == 2, info


@pytest.mark.parametrize(
"ray_start_regular", [{
"_system_config": ray_config
}], indirect=True)
def test_group_by_sort_by(ray_start_regular):
address = ray_start_regular["redis_address"]

Expand All @@ -260,6 +294,10 @@ def f(y):
assert count(info_c, PID) == 1, info_c


@pytest.mark.parametrize(
"ray_start_regular", [{
"_system_config": ray_config
}], indirect=True)
def test_memory_used_output(ray_start_regular):
address = ray_start_regular["redis_address"]
import numpy as np
Expand All @@ -272,6 +310,5 @@ def test_memory_used_output(ray_start_regular):


if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))
7 changes: 4 additions & 3 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ RAY_CONFIG(uint64_t, raylet_report_resources_period_milliseconds, 100)
RAY_CONFIG(uint64_t, num_resource_report_periods_warning, 5)

/// Whether to record the creation sites of object references. This adds more
/// information to `ray memstat`, but introduces a little extra overhead when
/// creating object references.
RAY_CONFIG(bool, record_ref_creation_sites, true)
/// information to `ray memory`, but introduces a little extra overhead when
/// creating object references (e.g. 5~10 microsec per call in Python).
/// TODO: maybe group this under RAY_DEBUG.
RAY_CONFIG(bool, record_ref_creation_sites, false)

/// Objects that have been unpinned are
/// added to a local cache. When the cache is flushed, all objects in the cache
Expand Down