Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup new Node class to improve consistency of node usage in dlg-engine #283

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def __init__(
super(NodeManagerClient, self).__init__(host=host, port=port, timeout=timeout)

def add_node_subscriptions(self, sessionId, node_subscriptions):

self._post_json(
f"/sessions/{quote(sessionId)}/subscriptions", node_subscriptions
)
Expand Down
5 changes: 2 additions & 3 deletions daliuge-common/dlg/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@
"NODE_DEFAULT_RPC_PORT": 6666,
}

NODE_DEFAULT_HOSTNAME = "localhost"

# just for backwards compatibility
NODE_DEFAULT_REST_PORT = DEFAULT_PORTS["NODE_DEFAULT_REST_PORT"]
ISLAND_DEFAULT_REST_PORT = DEFAULT_PORTS["ISLAND_DEFAULT_REST_PORT"]
MASTER_DEFAULT_REST_PORT = DEFAULT_PORTS["MASTER_DEFAULT_REST_PORT"]

REPLAY_DEFAULT_REST_PORT = DEFAULT_PORTS["REPLAY_DEFAULT_REST_PORT"]

DAEMON_DEFAULT_REST_PORT = DEFAULT_PORTS["DAEMON_DEFAULT_REST_PORT"]


# Others ports used by the Node Managers
NODE_DEFAULT_EVENTS_PORT = DEFAULT_PORTS["NODE_DEFAULT_EVENTS_PORT"]
NODE_DEFAULT_RPC_PORT = DEFAULT_PORTS["NODE_DEFAULT_RPC_PORT"]
135 changes: 78 additions & 57 deletions daliuge-engine/dlg/manager/composite_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@
import multiprocessing.pool
import threading

from dlg.manager.client import NodeManagerClient
from dlg.manager.drop_manager import DROPManager
from dlg.manager.manager_data import Node

from dlg import constants
from .client import NodeManagerClient
from dlg.constants import ISLAND_DEFAULT_REST_PORT, NODE_DEFAULT_REST_PORT
from .drop_manager import DROPManager
from .. import graph_loader
from dlg import graph_loader
from dlg.common.reproducibility.reproducibility import init_pg_repro_data
from ..ddap_protocol import DROPRel
from dlg.ddap_protocol import DROPRel
from dlg.exceptions import (
InvalidGraphException,
DaliugeException,
SubManagerException,
)
from ..utils import portIsOpen
from dlg.utils import portIsOpen

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -105,7 +107,8 @@ def sanitize_relations(interDMRelations, graph):
def group_by_node(uids, graph):
uids_by_node = collections.defaultdict(list)
for uid in uids:
uids_by_node[graph[uid]["node"]].append(uid)
uids_by_node[Node(graph[uid]["node"])].append(uid)
logger.info(uids_by_node)
return uids_by_node


Expand Down Expand Up @@ -133,13 +136,13 @@ class allows for multiple levels of hierarchy seamlessly.
__metaclass__ = abc.ABCMeta

def __init__(
self,
dmPort,
partitionAttr,
subDmId,
dmHosts: list[str] = None,
pkeyPath=None,
dmCheckTimeout=10,
self,
dmPort,
partitionAttr,
subDmId,
dmHosts: list[str] = None,
pkeyPath=None,
dmCheckTimeout=10,
):
"""
Creates a new CompositeManager. The sub-DMs it manages are to be located
Expand All @@ -156,12 +159,12 @@ def __init__(
:param: dmCheckTimeout The timeout used before giving up and declaring
a sub-DM as not-yet-present in a given host
"""
if dmHosts and ":" in dmHosts[0]:
self._dmHosts = [Node(host) for host in dmHosts]
myxie marked this conversation as resolved.
Show resolved Hide resolved
if self._dmHosts and self._dmHosts[0].rest_port_specified:
dmPort = -1
self._dmPort = dmPort
self._partitionAttr = partitionAttr
self._subDmId = subDmId
self._dmHosts = dmHosts if dmHosts else []
self._graph = {}
self._drop_rels = {}
self._sessionIds = (
Expand All @@ -176,6 +179,7 @@ def __init__(
# This list is different from the dmHosts, which are the machines that
# are directly managed by this manager (which in turn could manage more
# machines)
self.use_dmHosts = False
self._nodes = []

self.startDMChecker()
Expand Down Expand Up @@ -206,83 +210,97 @@ def _checkDM(self):
break
if not self.check_dm(host, self._dmPort, timeout=self._dmCheckTimeout):
logger.error(
"Couldn't contact manager for host %s:%d, will try again later",
"Couldn't contact manager for host %s with dmPort %d, will try again later",
host, self._dmPort,
)
if self._dmCheckerEvt.wait(60):
break

@property
def dmHosts(self):
return self._dmHosts[:]
return [str(n) for n in self._dmHosts[:]]

def addDmHost(self, host):
if not ":" in host:
host += f":{self._dmPort}"
def addDmHost(self, host_str: str):
host = Node(host_str)
# if not ":" in host:
# host += f":{self._dmPort}"
if host not in self._dmHosts:
self._dmHosts.append(host)
logger.debug("Added sub-manager %s", host)
else:
logger.warning("Host %s already registered.", host)

def removeDmHost(self, host):
def removeDmHost(self, host_str):
host = Node(host_str)
if host in self._dmHosts:
self._dmHosts.remove(host)

@property
def nodes(self):
return self._nodes[:]
if self.use_dmHosts:
return [str(n) for n in self._dmHosts[:]]
else:
return self._nodes
Comment on lines +238 to +241
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Replace if statement with if expression (assign-if-exp)

Suggested change
if self.use_dmHosts:
return [str(n) for n in self._dmHosts[:]]
else:
return self._nodes
return [str(n) for n in self._dmHosts[:]] if self.use_dmHosts else self._nodes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to stick to the existing approach as I find it more readable for this example. I agree in principle with the direction to use if expressions if it improves readability.


def add_node(self, node):
self._nodes.append(node)
def add_node(self, node: Node):
if self.use_dmHosts:
return self._dmHosts.append(node)
else:
self._nodes.append(node)

def remove_node(self, node):
self._nodes.remove(node)
if self.use_dmHosts:
self._dmHosts.remove(node)
else:
self._nodes.remove(node)

@property
def dmPort(self):
return self._dmPort

def check_dm(self, host, port=None, timeout=10):
if ":" in host:
host, port = host.split(":")
port = int(port)
def check_dm(self, host: Node, port: int = None, timeout=10):
host_name = host.host
if host.rest_port_specified:
port = host.port
else:
port = port or self._dmPort

logger.debug("Checking DM presence at %s port %d", host, port)
dm_is_there = portIsOpen(host, port, timeout)
dm_is_there = portIsOpen(host_name, port, timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot on that inline-immediately-returned.

return dm_is_there

def dmAt(self, host, port=None):
if not self.check_dm(host, port):
def dmAt(self, host):
if not self.check_dm(host):
raise SubManagerException(
f"Manager expected but not running in {host}:{port}"
f"Manager expected but not running in {host.host}:{host.port}"
)
if not ":" in host:
port = port or self._dmPort
else:
host, port = host.split(":")
return NodeManagerClient(host, port, 10)
# if not ":" in host:
# port = port or self._dmPort
# else:
# host, port = host.split(":")
return NodeManagerClient(host.host, host.port, 10)

def getSessionIds(self):
return self._sessionIds

#
# Replication of commands to underlying drop managers
# If "collect" is given, then individual results are also kept in the given
# structure, which is either a dictionary or a list
#
def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable):
"""
Replication of commands to underlying drop managers
If "collect" is given, then individual results are also kept in the given
structure, which is either a dictionary or a list
"""

host = iterable
if isinstance(iterable, (list, tuple)):
host = iterable[0]
if ":" in host:
host, port = host.split(":")
port = int(port)
host = iterable[0] # What's going on here?
# if ":" in host:
# host, port = host.split(":")
# port = int(port)
if isinstance(host, str):
host = Node(host)

try:
with self.dmAt(host, port) as dm:
with self.dmAt(host) as dm:
res = f(dm, iterable, sessionId)

if isinstance(collect, dict):
Expand All @@ -291,13 +309,14 @@ def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable)
collect.append(res)

except Exception as e:
exceptions[host] = e
exceptions[str(host)] = e
logger.exception(
"Error while %s on host %s:%d, session %s",
"Error while %s on host %s:%d, session %s, when executing %s",
action,
host,
port,
host.host,
host.port,
sessionId,
f
)

def replicate(self, sessionId, f, action, collect=None, iterable=None, port=None):
Expand Down Expand Up @@ -398,7 +417,7 @@ def addGraphSpec(self, sessionId, graphSpec):
)
raise InvalidGraphException(msg)

partition = dropSpec[self._partitionAttr]
partition = Node(dropSpec[self._partitionAttr])
if partition not in self._dmHosts:
msg = (
f"Drop {dropSpec.get('oid', None)}'s {self._partitionAttr} {partition} "
Expand Down Expand Up @@ -431,8 +450,8 @@ def addGraphSpec(self, sessionId, graphSpec):
for rel in inter_partition_rels:
# rhn = self._graph[rel.rhs]["node"].split(":")[0]
# lhn = self._graph[rel.lhs]["node"].split(":")[0]
rhn = self._graph[rel.rhs]["node"]
lhn = self._graph[rel.lhs]["node"]
rhn = (self._graph[rel.rhs]["node"])
lhn = (self._graph[rel.lhs]["node"])
drop_rels[lhn][rhn].append(rel)
drop_rels[rhn][lhn].append(rel)

Expand Down Expand Up @@ -462,6 +481,7 @@ def _deploySession(self, dm, host, sessionId):

def _triggerDrops(self, dm, host_and_uids, sessionId):
host, uids = host_and_uids

dm.trigger_drops(sessionId, uids)
logger.info(
"Successfully triggered drops for session %s on %s",
Expand Down Expand Up @@ -551,7 +571,7 @@ def getGraph(self, sessionId):
return allGraphs

def _getSessionStatus(self, dm, host, sessionId):
return {host: dm.getSessionStatus(sessionId)}
return {str(host): dm.getSessionStatus(sessionId)}

def getSessionStatus(self, sessionId):
allStatus = {}
Expand Down Expand Up @@ -593,7 +613,8 @@ def __init__(self, dmHosts: list[str] = None, pkeyPath=None, dmCheckTimeout=10):
)

# In the case of the Data Island the dmHosts are the final nodes as well
self._nodes = dmHosts
self.use_dmHosts = True
# self._nodes = dmHosts
logger.info("Created DataIslandManager for hosts: %r", self._dmHosts)


Expand Down
Loading
Loading