From f210753f96f27c13473cf7e0237e1071b1bc61b3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 17 May 2024 10:21:20 -0500 Subject: [PATCH] Rename hub_port in executors to clarify it is for ZMQ (#3439) This follows a rename in PR #3266 which focused on making the same change inside parsl/monitoring/ A final use of hub_port is left in place in the MonitoringHub constructor because it is user-facing - future work on monitoring radio plugins might change this significantly (or not) so I am leaving it as is for now. --- parsl/dataflow/dflow.py | 2 +- parsl/executors/base.py | 14 +++++++------- parsl/executors/high_throughput/executor.py | 2 +- parsl/executors/high_throughput/interchange.py | 10 +++++----- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 8c464b6c6b..9bc78b1ce3 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1156,7 +1156,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: executor.run_id = self.run_id executor.run_dir = self.run_dir executor.hub_address = self.hub_address - executor.hub_port = self.hub_zmq_port + executor.hub_zmq_port = self.hub_zmq_port if self.monitoring: executor.monitoring_radio = self.monitoring.radio if hasattr(executor, 'provider'): diff --git a/parsl/executors/base.py b/parsl/executors/base.py index 1720dc479f..966705c7c5 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -50,13 +50,13 @@ def __init__( self, *, hub_address: Optional[str] = None, - hub_port: Optional[int] = None, + hub_zmq_port: Optional[int] = None, monitoring_radio: Optional[MonitoringRadio] = None, run_dir: str = ".", run_id: Optional[str] = None, ): self.hub_address = hub_address - self.hub_port = hub_port + self.hub_zmq_port = hub_zmq_port self.monitoring_radio = monitoring_radio self.run_dir = os.path.abspath(run_dir) self.run_id = run_id @@ -136,14 +136,14 @@ def hub_address(self, value: Optional[str]) -> None: self._hub_address = value @property - def hub_port(self) -> Optional[int]: + def hub_zmq_port(self) -> Optional[int]: """Port to the Hub for monitoring. """ - return self._hub_port + return self._hub_zmq_port - @hub_port.setter - def hub_port(self, value: Optional[int]) -> None: - self._hub_port = value + @hub_zmq_port.setter + def hub_zmq_port(self, value: Optional[int]) -> None: + self._hub_zmq_port = value @property def monitoring_radio(self) -> Optional[MonitoringRadio]: diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 73c2f36c7c..7e4ae2ad2e 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -541,7 +541,7 @@ def _start_local_interchange_process(self): "worker_ports": self.worker_ports, "worker_port_range": self.worker_port_range, "hub_address": self.hub_address, - "hub_port": self.hub_port, + "hub_zmq_port": self.hub_zmq_port, "logdir": self.logdir, "heartbeat_threshold": self.heartbeat_threshold, "poll_period": self.poll_period, diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index a661761620..fe7a081c40 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -74,7 +74,7 @@ def __init__(self, worker_ports: Optional[Tuple[int, int]] = None, worker_port_range: Tuple[int, int] = (54000, 55000), hub_address: Optional[str] = None, - hub_port: Optional[int] = None, + hub_zmq_port: Optional[int] = None, heartbeat_threshold: int = 60, logdir: str = ".", logging_level: int = logging.INFO, @@ -105,7 +105,7 @@ def __init__(self, The IP address at which the interchange can send info about managers to when monitoring is enabled. Default: None (meaning monitoring disabled) - hub_port : str + hub_zmq_port : str The port at which the interchange can send info about managers to when monitoring is enabled. Default: None (meaning monitoring disabled) @@ -151,7 +151,7 @@ def __init__(self, logger.info("Connected to client") self.hub_address = hub_address - self.hub_port = hub_port + self.hub_zmq_port = hub_zmq_port self.pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6) self.count = 0 @@ -244,12 +244,12 @@ def task_puller(self) -> NoReturn: logger.debug(f"Fetched {task_counter} tasks so far") def _create_monitoring_channel(self) -> Optional[zmq.Socket]: - if self.hub_address and self.hub_port: + if self.hub_address and self.hub_zmq_port: logger.info("Connecting to MonitoringHub") # This is a one-off because monitoring is unencrypted hub_channel = zmq.Context().socket(zmq.DEALER) hub_channel.set_hwm(0) - hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_port)) + hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_zmq_port)) logger.info("Connected to MonitoringHub") return hub_channel else: