Skip to content

Commit

Permalink
[core] remove unused GcsAio(Publisher|Subscriber) methods and subclas…
Browse files Browse the repository at this point in the history
…ses. (ray-project#47057)

They were used to fetch / publish logs and errors, but now they are
replaced by PythonGcsSubscriber cython binded classes.

Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
  • Loading branch information
rynewang authored and simonsays1980 committed Aug 15, 2024
1 parent d0679b7 commit e15ef64
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 117 deletions.
69 changes: 0 additions & 69 deletions python/ray/_private/gcs_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import ray._private.gcs_utils as gcs_utils
import ray._private.logging_utils as logging_utils
from ray.core.generated.gcs_pb2 import ErrorTableData
from ray.core.generated import gcs_service_pb2_grpc
from ray.core.generated import gcs_service_pb2
from ray.core.generated import common_pb2
Expand All @@ -27,19 +26,6 @@


class _PublisherBase:
@staticmethod
def _create_log_request(log_json: dict):
job_id = log_json.get("job")
return gcs_service_pb2.GcsPublishRequest(
pub_messages=[
pubsub_pb2.PubMessage(
channel_type=pubsub_pb2.RAY_LOG_CHANNEL,
key_id=job_id.encode() if job_id else None,
log_batch_message=logging_utils.log_batch_dict_to_proto(log_json),
)
]
)

@staticmethod
def _create_node_resource_usage_request(key: str, json: str):
return gcs_service_pb2.GcsPublishRequest(
Expand Down Expand Up @@ -149,21 +135,6 @@ def __init__(self, address: str = None, channel: aiogrpc.Channel = None):
assert channel is not None, "One of address and channel must be specified"
self._stub = gcs_service_pb2_grpc.InternalPubSubGcsServiceStub(channel)

async def publish_error(self, key_id: bytes, error_info: ErrorTableData) -> None:
"""Publishes error info to GCS."""
msg = pubsub_pb2.PubMessage(
channel_type=pubsub_pb2.RAY_ERROR_INFO_CHANNEL,
key_id=key_id,
error_info_message=error_info,
)
req = gcs_service_pb2.GcsPublishRequest(pub_messages=[msg])
await self._stub.GcsPublish(req)

async def publish_logs(self, log_batch: dict) -> None:
"""Publishes logs to GCS."""
req = self._create_log_request(log_batch)
await self._stub.GcsPublish(req)

async def publish_resource_usage(self, key: str, json: str) -> None:
"""Publishes logs to GCS."""
req = self._create_node_resource_usage_request(key, json)
Expand Down Expand Up @@ -275,46 +246,6 @@ async def close(self) -> None:
self._stub = None


class GcsAioErrorSubscriber(_AioSubscriber):
def __init__(
self,
worker_id: bytes = None,
address: str = None,
channel: grpc.Channel = None,
):
super().__init__(pubsub_pb2.RAY_ERROR_INFO_CHANNEL, worker_id, address, channel)

async def poll(self, timeout=None) -> Tuple[bytes, ErrorTableData]:
"""Polls for new error message.
Returns:
A tuple of error message ID and ErrorTableData proto message,
or None, None if polling times out or subscriber closed.
"""
await self._poll(timeout=timeout)
return self._pop_error_info(self._queue)


class GcsAioLogSubscriber(_AioSubscriber):
def __init__(
self,
worker_id: bytes = None,
address: str = None,
channel: grpc.Channel = None,
):
super().__init__(pubsub_pb2.RAY_LOG_CHANNEL, worker_id, address, channel)

async def poll(self, timeout=None) -> dict:
"""Polls for new log message.
Returns:
A dict containing a batch of log lines and their metadata,
or None if polling times out or subscriber closed.
"""
await self._poll(timeout=timeout)
return self._pop_log_batch(self._queue)


class GcsAioResourceUsageSubscriber(_AioSubscriber):
def __init__(
self,
Expand Down
48 changes: 0 additions & 48 deletions python/ray/tests/test_gcs_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@
import ray
from ray._private.gcs_pubsub import (
GcsAioPublisher,
GcsAioErrorSubscriber,
GcsAioLogSubscriber,
GcsAioResourceUsageSubscriber,
)
from ray.core.generated.gcs_pb2 import ErrorTableData
import pytest


Expand All @@ -35,26 +32,6 @@ def test_publish_and_subscribe_error_info(ray_start_regular):
subscriber.close()


@pytest.mark.asyncio
async def test_aio_publish_and_subscribe_error_info(ray_start_regular):
address_info = ray_start_regular
gcs_server_addr = address_info["gcs_address"]

subscriber = GcsAioErrorSubscriber(address=gcs_server_addr)
await subscriber.subscribe()

publisher = GcsAioPublisher(address=gcs_server_addr)
err1 = ErrorTableData(error_message="test error message 1")
err2 = ErrorTableData(error_message="test error message 2")
await publisher.publish_error(b"aaa_id", err1)
await publisher.publish_error(b"bbb_id", err2)

assert await subscriber.poll() == (b"aaa_id", err1)
assert await subscriber.poll() == (b"bbb_id", err2)

await subscriber.close()


def test_publish_and_subscribe_logs(ray_start_regular):
address_info = ray_start_regular
gcs_server_addr = address_info["gcs_address"]
Expand All @@ -81,31 +58,6 @@ def test_publish_and_subscribe_logs(ray_start_regular):
subscriber.close()


@pytest.mark.asyncio
async def test_aio_publish_and_subscribe_logs(ray_start_regular):
address_info = ray_start_regular
gcs_server_addr = address_info["gcs_address"]

subscriber = GcsAioLogSubscriber(address=gcs_server_addr)
await subscriber.subscribe()

publisher = GcsAioPublisher(address=gcs_server_addr)
log_batch = {
"ip": "127.0.0.1",
"pid": "gcs",
"job": "0001",
"is_err": False,
"lines": ["line 1", "line 2"],
"actor_name": "test actor",
"task_name": "test task",
}
await publisher.publish_logs(log_batch)

assert await subscriber.poll() == log_batch

await subscriber.close()


@pytest.mark.asyncio
async def test_aio_publish_and_subscribe_resource_usage(ray_start_regular):
address_info = ray_start_regular
Expand Down

0 comments on commit e15ef64

Please sign in to comment.