From 45887d31eb2953640849940dc67952cd65dac53a Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Thu, 3 Feb 2022 20:05:11 -0800 Subject: [PATCH 1/2] no warning when logs are not from the same driver --- python/ray/worker.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 59ad95e5c550..6a7e2f0b2d07 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -496,6 +496,24 @@ def print_logs(self): self.threads_stopped.wait(timeout=0.01) continue + if self.gcs_pubsub_enabled: + data = msg + else: + data = json.loads(ray._private.utils.decode(msg["data"])) + + # Don't show logs from other drivers. + if ( + self.filter_logs_by_job + and data["job"] + and job_id_hex != data["job"] + ): + num_consecutive_messages_received = 0 + last_polling_batch_size = 0 + continue + + data["localhost"] = localhost + global_worker_stdstream_dispatcher.emit(data) + if self.gcs_pubsub_enabled: lagging = ( 100 <= last_polling_batch_size < subscriber.last_batch_size @@ -515,21 +533,6 @@ def print_logs(self): "'ray.init(log_to_driver=False)'." ) - if self.gcs_pubsub_enabled: - data = msg - else: - data = json.loads(ray._private.utils.decode(msg["data"])) - - # Don't show logs from other drivers. - if ( - self.filter_logs_by_job - and data["job"] - and job_id_hex != data["job"] - ): - continue - data["localhost"] = localhost - global_worker_stdstream_dispatcher.emit(data) - except (OSError, redis.exceptions.ConnectionError) as e: logger.error(f"print_logs: {e}") finally: From 2d9c7df70c2c4fd6cbdfb3dbac26031b5d71b6b9 Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Fri, 4 Feb 2022 09:25:44 -0800 Subject: [PATCH 2/2] fix --- python/ray/worker.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 6a7e2f0b2d07..09e3e796c03d 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -143,9 +143,6 @@ def __init__(self): # running on. self.ray_debugger_external = False self._load_code_from_local = False - # Used to toggle whether or not logs should be filtered to only those - # produced in the same job. - self.filter_logs_by_job = True @property def connected(self): @@ -502,11 +499,7 @@ def print_logs(self): data = json.loads(ray._private.utils.decode(msg["data"])) # Don't show logs from other drivers. - if ( - self.filter_logs_by_job - and data["job"] - and job_id_hex != data["job"] - ): + if data["job"] and data["job"] != job_id_hex: num_consecutive_messages_received = 0 last_polling_batch_size = 0 continue