Skip to content

Commit

Permalink
Node Class: Setup new node class to improve consistency of computing …
Browse files Browse the repository at this point in the history
…node description throughout dlg-engine.
  • Loading branch information
myxie committed Sep 13, 2024
1 parent ea6cc06 commit 1c6acca
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 49 deletions.
5 changes: 2 additions & 3 deletions daliuge-common/dlg/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@
"NODE_DEFAULT_RPC_PORT": 6666,
}

NODE_DEFAULT_HOSTNAME = "localhost"

# just for backwards compatibility
NODE_DEFAULT_REST_PORT = DEFAULT_PORTS["NODE_DEFAULT_REST_PORT"]
ISLAND_DEFAULT_REST_PORT = DEFAULT_PORTS["ISLAND_DEFAULT_REST_PORT"]
MASTER_DEFAULT_REST_PORT = DEFAULT_PORTS["MASTER_DEFAULT_REST_PORT"]

REPLAY_DEFAULT_REST_PORT = DEFAULT_PORTS["REPLAY_DEFAULT_REST_PORT"]

DAEMON_DEFAULT_REST_PORT = DEFAULT_PORTS["DAEMON_DEFAULT_REST_PORT"]


# Others ports used by the Node Managers
NODE_DEFAULT_EVENTS_PORT = DEFAULT_PORTS["NODE_DEFAULT_EVENTS_PORT"]
NODE_DEFAULT_RPC_PORT = DEFAULT_PORTS["NODE_DEFAULT_RPC_PORT"]
89 changes: 53 additions & 36 deletions daliuge-engine/dlg/manager/composite_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@
import multiprocessing.pool
import threading

from dlg.manager.client import NodeManagerClient
from dlg.manager.drop_manager import DROPManager
from dlg.manager.manager_data import Node

from dlg import constants
from .client import NodeManagerClient
from dlg.constants import ISLAND_DEFAULT_REST_PORT, NODE_DEFAULT_REST_PORT
from .drop_manager import DROPManager
from .. import graph_loader
from dlg import graph_loader
from dlg.common.reproducibility.reproducibility import init_pg_repro_data
from ..ddap_protocol import DROPRel
from dlg.ddap_protocol import DROPRel
from dlg.exceptions import (
InvalidGraphException,
DaliugeException,
SubManagerException,
)
from ..utils import portIsOpen
from dlg.utils import portIsOpen

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -156,12 +158,12 @@ def __init__(
:param: dmCheckTimeout The timeout used before giving up and declaring
a sub-DM as not-yet-present in a given host
"""
if dmHosts and ":" in dmHosts[0]:
self._dmHosts = [Node(host) for host in dmHosts]
if self._dmHosts and self._dmHosts[0].rest_port_specified:
dmPort = -1
self._dmPort = dmPort
self._partitionAttr = partitionAttr
self._subDmId = subDmId
self._dmHosts = dmHosts if dmHosts else []
self._graph = {}
self._drop_rels = {}
self._sessionIds = (
Expand All @@ -176,6 +178,7 @@ def __init__(
# This list is different from the dmHosts, which are the machines that
# are directly managed by this manager (which in turn could manage more
# machines)
self._use_dmHosts = False
self._nodes = []

self.startDMChecker()
Expand Down Expand Up @@ -214,44 +217,56 @@ def _checkDM(self):

@property
def dmHosts(self):
return self._dmHosts[:]
return [str(n) for n in self._dmHosts[:]]

def addDmHost(self, host):
if not ":" in host:
host += f":{self._dmPort}"
def addDmHost(self, host_str: str):
host = Node(host_str)
# if not ":" in host:
# host += f":{self._dmPort}"
if host not in self._dmHosts:
self._dmHosts.append(host)
logger.debug("Added sub-manager %s", host)
else:
logger.warning("Host %s already registered.", host)

def removeDmHost(self, host):
def removeDmHost(self, host_str):
host = Node(host_str)
if host in self._dmHosts:
self._dmHosts.remove(host)

@property
def nodes(self):
return self._nodes[:]
if self._use_dmHosts:
return [str(n) for n in self._dmHosts[:]]
else:
return self._nodes

def add_node(self, node):
self._nodes.append(node)
if self._use_dmHosts:
return self._dmHosts.append(Node(node))
else:
self._nodes.append(node)


def remove_node(self, node):
self._nodes.remove(node)
if self._use_dmHosts:
self._dmHosts.remove(Node(node))
else:
self._nodes.remove(node)

@property
def dmPort(self):
return self._dmPort

def check_dm(self, host, port=None, timeout=10):
if ":" in host:
host, port = host.split(":")
port = int(port)
def check_dm(self, host: Node, port=None, timeout=10):
host_name = host.host
if host.rest_port_specified:
port = host.port
else:
port = port or self._dmPort

logger.debug("Checking DM presence at %s port %d", host, port)
dm_is_there = portIsOpen(host, port, timeout)
dm_is_there = portIsOpen(host_name, port, timeout)
return dm_is_there

def dmAt(self, host, port=None):
Expand All @@ -268,21 +283,22 @@ def dmAt(self, host, port=None):
def getSessionIds(self):
return self._sessionIds

#
# Replication of commands to underlying drop managers
# If "collect" is given, then individual results are also kept in the given
# structure, which is either a dictionary or a list
#
def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable):
"""
Replication of commands to underlying drop managers
If "collect" is given, then individual results are also kept in the given
structure, which is either a dictionary or a list
"""

host = iterable
if isinstance(iterable, (list, tuple)):
host = iterable[0]
if ":" in host:
host, port = host.split(":")
port = int(port)
host = iterable[0] # What's going on here?
# if ":" in host:
# host, port = host.split(":")
# port = int(port)

try:
with self.dmAt(host, port) as dm:
with self.dmAt(host) as dm:
res = f(dm, iterable, sessionId)

if isinstance(collect, dict):
Expand All @@ -295,8 +311,8 @@ def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable)
logger.exception(
"Error while %s on host %s:%d, session %s",
action,
host,
port,
host.host,
host.port,
sessionId,
)

Expand Down Expand Up @@ -398,7 +414,7 @@ def addGraphSpec(self, sessionId, graphSpec):
)
raise InvalidGraphException(msg)

partition = dropSpec[self._partitionAttr]
partition = Node(dropSpec[self._partitionAttr])
if partition not in self._dmHosts:
msg = (
f"Drop {dropSpec.get('oid', None)}'s {self._partitionAttr} {partition} "
Expand Down Expand Up @@ -431,8 +447,8 @@ def addGraphSpec(self, sessionId, graphSpec):
for rel in inter_partition_rels:
# rhn = self._graph[rel.rhs]["node"].split(":")[0]
# lhn = self._graph[rel.lhs]["node"].split(":")[0]
rhn = self._graph[rel.rhs]["node"]
lhn = self._graph[rel.lhs]["node"]
rhn = Node(self._graph[rel.rhs]["node"])
lhn = Node(self._graph[rel.lhs]["node"])
drop_rels[lhn][rhn].append(rel)
drop_rels[rhn][lhn].append(rel)

Expand Down Expand Up @@ -593,7 +609,8 @@ def __init__(self, dmHosts: list[str] = None, pkeyPath=None, dmCheckTimeout=10):
)

# In the case of the Data Island the dmHosts are the final nodes as well
self._nodes = dmHosts
self._use_dmHosts = True
# self._nodes = dmHosts
logger.info("Created DataIslandManager for hosts: %r", self._dmHosts)


Expand Down
96 changes: 96 additions & 0 deletions daliuge-engine/dlg/manager/manager_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2024
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#

"""
This module contains classes and helper-methods to support the various manager classes
"""

from dataclasses import dataclass

from dlg import constants

from enum import IntEnum


class NodeProtocolPosition(IntEnum):
HOST = 0
PORT = 1
RPC_PORT = 2
EVENTS_PORT = 3


class Node:
"""
Class for encapsulating compute node information to standardise
inter-node communication.
"""

def __init__(self, host: str):
chunks = host.split(':')
num_chunks = len(chunks)
self.host = constants.NODE_DEFAULT_HOSTNAME
self.port = constants.NODE_DEFAULT_REST_PORT
self.rpc_port = constants.NODE_DEFAULT_RPC_PORT
self.events_port = constants.NODE_DEFAULT_RPC_PORT
self._rest_port_specified = False

if num_chunks >= 1:
self.host = chunks[NodeProtocolPosition.HOST]
if num_chunks >= 2:
self.port = int(chunks[NodeProtocolPosition.PORT])
self._rest_port_specified = True
if num_chunks >= 3:
self.rpc_port = int(chunks[NodeProtocolPosition.RPC_PORT])
if num_chunks >= 4:
self.events_port = int(chunks[NodeProtocolPosition.EVENTS_PORT])

def serialize(self):
"""
Convert to the expect string representation of our Node using the
following 'protocol':
"host:port:rpc_port:event_port"
:return: str
"""
return f"{self.host}:{self.port}:{self.rpc_port}:{self.events_port}"

@property
def rest_port_specified(self):
"""
Returns True if we specified a Node REST port when passing the list of nodes to
the DIM at startup.
"""
return self._rest_port_specified

def __str__(self):
"""
Make our serialized Node the string.
:return: str
"""
return self.serialize()

def __eq__(self, other):
return str(self) == str(other)

def __hash__(self):
return hash(str(self))
17 changes: 16 additions & 1 deletion daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ class NodeManagerBase(DROPManager):

def __init__(
self,
events_port,
dlm_check_period=0,
dlm_cleanup_period=0,
dlm_enable_replication=False,
Expand All @@ -248,6 +249,7 @@ def __init__(
use_processes=False,
logdir=utils.getDlgLogsDir(),
):
self._events_port = events_port
self._dlm = DataLifecycleManager(
check_period=dlm_check_period,
cleanup_period=dlm_cleanup_period,
Expand Down Expand Up @@ -433,6 +435,7 @@ def add_node_subscriptions(self, sessionId, relationships):

# Set up event channels subscriptions
for nodesub in relationships:
# This needs to be changed
events_port = constants.NODE_DEFAULT_EVENTS_PORT
if type(nodesub) is tuple:
host, events_port, _ = nodesub
Expand All @@ -445,6 +448,18 @@ def add_node_subscriptions(self, sessionId, relationships):
logger.debug("Sending subscription to %s", f"{host}:{events_port}")
self.subscribe(host, events_port)

def _convert_relationships_to_nodes(self, relationships):
"""
Load JSON representation of relationships into Node classes.
:param relationships: dict, relationships receveived through REST call
:return: list of Node classes
"""
nodes = []
for nodesub in relationships:
relationships


def has_method(self, sessionId, uid, mname):
self._check_session_id(sessionId)
return self._sessions[sessionId].has_method(uid, mname)
Expand Down Expand Up @@ -654,7 +669,7 @@ def __init__(
**kwargs,
):
host = host or "localhost"
NodeManagerBase.__init__(self, *args, **kwargs)
NodeManagerBase.__init__(self, events_port, *args, **kwargs)
EventMixIn.__init__(self, host, events_port)
RpcMixIn.__init__(self, host, rpc_port)
self.start()
Expand Down
19 changes: 10 additions & 9 deletions daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,23 @@
from dlg.common.reproducibility.reproducibility import init_runtime_repro_data
from dlg.utils import createDirIfMissing

from .. import constants
from .. import droputils
from .. import graph_loader
from .. import rpc
from .. import utils
from ..common.reproducibility.constants import ReproducibilityFlags, ALL_RMODES
from ..ddap_protocol import DROPLinkType, DROPRel, DROPStates
from ..drop import (
from dlg import constants
# from .. import constants
from dlg import droputils
from dlg import graph_loader
from dlg import rpc
from dlg import utils
from dlg.common.reproducibility.constants import ReproducibilityFlags, ALL_RMODES
from dlg.ddap_protocol import DROPLinkType, DROPRel, DROPStates
from dlg.drop import (
AbstractDROP,
LINKTYPE_1TON_APPEND_METHOD,
LINKTYPE_1TON_BACK_APPEND_METHOD,
)
from ..apps.app_base import AppDROP, InputFiredAppDROP
from ..data.drops.data_base import EndDROP

from ..exceptions import (
from dlg.exceptions import (
InvalidSessionState,
InvalidGraphException,
NoDropException,
Expand Down

0 comments on commit 1c6acca

Please sign in to comment.