diff --git a/python/ray/worker.py b/python/ray/worker.py index 59ad95e5c550..09e3e796c03d 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -143,9 +143,6 @@ def __init__(self): # running on. self.ray_debugger_external = False self._load_code_from_local = False - # Used to toggle whether or not logs should be filtered to only those - # produced in the same job. - self.filter_logs_by_job = True @property def connected(self): @@ -496,6 +493,20 @@ def print_logs(self): self.threads_stopped.wait(timeout=0.01) continue + if self.gcs_pubsub_enabled: + data = msg + else: + data = json.loads(ray._private.utils.decode(msg["data"])) + + # Don't show logs from other drivers. + if data["job"] and data["job"] != job_id_hex: + num_consecutive_messages_received = 0 + last_polling_batch_size = 0 + continue + + data["localhost"] = localhost + global_worker_stdstream_dispatcher.emit(data) + if self.gcs_pubsub_enabled: lagging = ( 100 <= last_polling_batch_size < subscriber.last_batch_size @@ -515,21 +526,6 @@ def print_logs(self): "'ray.init(log_to_driver=False)'." ) - if self.gcs_pubsub_enabled: - data = msg - else: - data = json.loads(ray._private.utils.decode(msg["data"])) - - # Don't show logs from other drivers. - if ( - self.filter_logs_by_job - and data["job"] - and job_id_hex != data["job"] - ): - continue - data["localhost"] = localhost - global_worker_stdstream_dispatcher.emit(data) - except (OSError, redis.exceptions.ConnectionError) as e: logger.error(f"print_logs: {e}") finally: