Skip to content

Commit

Permalink
[Serve] Wait for request to start processing in `test_http_replica_ga…
Browse files Browse the repository at this point in the history
…uge_metrics` (#37155)

`test_metrics.py` has been flaky lately:

<img width="1360" alt="Screen Shot 2023-07-06 at 11 52 55 AM" src="https://github.com/ray-project/ray/assets/92341594/2c124adb-c68d-43fc-bc65-562b44175292">

See [example traceback](https://buildkite.com/ray-project/oss-ci-build-branch/builds/4818#_):

```console
serve_start_shutdown = <ray.serve._private.client.ServeControllerClient object at 0x7f3088011d00>

    def test_http_replica_gauge_metrics(serve_start_shutdown):
        """Test http replica gauge metrics"""
        signal = SignalActor.remote()
    
        @serve.deployment
        class A:
            async def __call__(self):
                await signal.wait.remote()
    
        handle = serve.run(A.bind(), name="app1")
        _ = handle.remote()
        wait_for_condition(
            lambda: len(get_metric_dictionaries("serve_replica_processing_queries")) == 1,
            timeout=20,
        )
        processing_requests = get_metric_dictionaries("serve_replica_processing_queries")
        assert len(processing_requests) == 1
        assert processing_requests[0]["deployment"] == "app1_A"
        assert processing_requests[0]["application"] == "app1"
        print("processing_requests working as expected.")
    
        pending_requests = get_metric_dictionaries("serve_replica_pending_queries")
        assert len(pending_requests) == 1
        assert pending_requests[0]["deployment"] == "app1_A"
        assert pending_requests[0]["application"] == "app1"
    
        resp = requests.get("http://127.0.0.1:9999").text
        resp = resp.split("\n")
        for metrics in resp:
            if "# HELP" in metrics or "# TYPE" in metrics:
                continue
            if "serve_replica_processing_queries" in metrics:
>               assert "1.0" in metrics
E               assert '1.0' in 'ray_serve_replica_processing_queries{Component="core_worker",NodeAddress="172.16.16.3",SessionName="session_2023-07-05_17-34-28_672200_4914",Version="3.0.0.dev0",WorkerId="fe8d8074e49baebaf85a11429a49a6f019fd9194e2f7c047e400e545",application="app1",deployment="app1_A",replica="app1_A#ogIsqV"} 0.0'

python/ray/serve/tests/test_metrics.py:123: AssertionError
```

The root cause seems to be that the request occasionally takes some time stop pending and start processing. This change places that check inside a `wait_for_condition` call to remove flakiness.
  • Loading branch information
shrekris-anyscale authored Jul 7, 2023
1 parent 394766d commit dfee3f0
Showing 1 changed file with 29 additions and 16 deletions.
45 changes: 29 additions & 16 deletions python/ray/serve/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,37 +92,50 @@ def test_http_replica_gauge_metrics(serve_start_shutdown):
"""Test http replica gauge metrics"""
signal = SignalActor.remote()

@serve.deployment
@serve.deployment(graceful_shutdown_timeout_s=0.0001)
class A:
async def __call__(self):
await signal.wait.remote()

handle = serve.run(A.bind(), name="app1")
_ = handle.remote()
wait_for_condition(
lambda: len(get_metric_dictionaries("serve_replica_processing_queries")) == 1,
timeout=20,

processing_requests = get_metric_dictionaries(
"serve_replica_processing_queries", timeout=5
)
processing_requests = get_metric_dictionaries("serve_replica_processing_queries")
assert len(processing_requests) == 1
assert processing_requests[0]["deployment"] == "app1_A"
assert processing_requests[0]["application"] == "app1"
print("processing_requests working as expected.")
print("serve_replica_processing_queries exists.")

pending_requests = get_metric_dictionaries("serve_replica_pending_queries")
pending_requests = get_metric_dictionaries(
"serve_replica_pending_queries", timeout=5
)
assert len(pending_requests) == 1
assert pending_requests[0]["deployment"] == "app1_A"
assert pending_requests[0]["application"] == "app1"
print("serve_replica_pending_queries exists.")

def ensure_request_processing():
resp = requests.get("http://127.0.0.1:9999").text
resp = resp.split("\n")
expected_metrics = {
"serve_replica_processing_queries",
"serve_replica_pending_queries",
}
for metrics in resp:
if "# HELP" in metrics or "# TYPE" in metrics:
continue
if "serve_replica_processing_queries" in metrics:
assert "1.0" in metrics
expected_metrics.discard("serve_replica_processing_queries")
elif "serve_replica_pending_queries" in metrics:
assert "0.0" in metrics
expected_metrics.discard("serve_replica_pending_queries")
assert len(expected_metrics) == 0
return True

resp = requests.get("http://127.0.0.1:9999").text
resp = resp.split("\n")
for metrics in resp:
if "# HELP" in metrics or "# TYPE" in metrics:
continue
if "serve_replica_processing_queries" in metrics:
assert "1.0" in metrics
elif "serve_replica_pending_queries" in metrics:
assert "0" in metrics
wait_for_condition(ensure_request_processing, timeout=5)


def test_http_metrics(serve_start_shutdown):
Expand Down

0 comments on commit dfee3f0

Please sign in to comment.