Skip to content

Commit

Permalink
Merge pull request #273 from ICRAR/LIU-383_DangerousDefaults
Browse files Browse the repository at this point in the history
Fix "dangerous default" mutable paramters and enable pylint warning for future.
  • Loading branch information
myxie authored Aug 8, 2024
2 parents 0a82100 + 6a0d211 commit 10110da
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[Main]
disable=C, R, W, no-name-in-module, c-extension-no-member, no-member, import-error, unsupported-membership-test
enable=logging-not-lazy,logging-format-interpolation
enable=logging-not-lazy,logging-format-interpolation, dangerous-default-value

8 changes: 6 additions & 2 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
from __future__ import annotations

import logging
import os
import urllib.parse
Expand All @@ -39,8 +41,10 @@ class BaseDROPManagerClient(RestClient):
Base class for REST clients that talk to the DROP managers.
"""

def _request(self, url, method, content=None, headers={}):
def _request(self, url, method, content=None, headers: dict=None, timeout=10):
# Normalize first
if not headers:
headers = {}
if not url.startswith("/"):
url = "/" + url
url = "/api" + url
Expand All @@ -61,7 +65,7 @@ def create_session(self, sessionId):
"Successfully created session %s on %s:%s", sessionId, self.host, self.port
)

def deploy_session(self, sessionId, completed_uids=[]):
def deploy_session(self, sessionId, completed_uids: list[str] = None):
"""
Deploys session `sessionId`, effectively creating its DROPs and triggering
the execution of the graph
Expand Down
4 changes: 3 additions & 1 deletion daliuge-common/dlg/restutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ def _DELETE(self, url):
stream, _ = self._request(url, "DELETE")
return stream

def _request(self, url, method, content=None, headers={}, timeout=10):
def _request(self, url, method, content=None, headers: dict=None, timeout=10):
if not headers:
headers = {}
# Do the HTTP stuff...
url = self.url_prefix + url
logger.debug("Sending %s request to %s:%d%s", method, self.host, self.port, url)
Expand Down
9 changes: 5 additions & 4 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""
Utility methods and classes to be used when interacting with DROPs
"""
from __future__ import annotations

import collections
import io
Expand Down Expand Up @@ -60,7 +61,7 @@ class EvtConsumer(object):
until all DROPs of a given graph have executed.
"""

def __init__(self, evt, expected_states=[]):
def __init__(self, evt, expected_states: list[DROPStates] = None):
self._evt = evt
self._expected_states = expected_states or (
DROPStates.COMPLETED,
Expand All @@ -87,7 +88,7 @@ class DROPWaiterCtx(object):
a.setCompleted()
"""

def __init__(self, test, drops, timeout=1, expected_states=[]):
def __init__(self, test, drops, timeout=1, expected_states:list[DROPStates] = None):
self._drops = listify(drops)
self._expected_states = expected_states or (
DROPStates.COMPLETED,
Expand Down Expand Up @@ -232,7 +233,7 @@ def getLeafNodes(drops):
]


def depthFirstTraverse(node: "AbstractDROP", visited=[]):
def depthFirstTraverse(node: "AbstractDROP", visited: list[AbstractDROP]=None):
"""
Depth-first iterator for a DROP graph.
Expand All @@ -243,7 +244,7 @@ def depthFirstTraverse(node: "AbstractDROP", visited=[]):
This implementation is recursive.
"""

visited = visited if visited else []
dependencies = getDownstreamObjects(node)
yield node, dependencies
visited.append(node)
Expand Down
12 changes: 7 additions & 5 deletions daliuge-engine/dlg/manager/composite_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
from __future__ import annotations

import abc
import collections
import functools
Expand Down Expand Up @@ -135,7 +137,7 @@ def __init__(
dmPort,
partitionAttr,
subDmId,
dmHosts=[],
dmHosts: list[str] = None,
pkeyPath=None,
dmCheckTimeout=10,
):
Expand All @@ -157,7 +159,7 @@ def __init__(
self._dmPort = dmPort
self._partitionAttr = partitionAttr
self._subDmId = subDmId
self._dmHosts = dmHosts
self._dmHosts = dmHosts if dmHosts else []
self._graph = {}
self._drop_rels = {}
self._sessionIds = (
Expand Down Expand Up @@ -464,7 +466,7 @@ def _triggerDrops(self, dm, host_and_uids, sessionId):
host,
)

def deploySession(self, sessionId, completedDrops=[]):
def deploySession(self, sessionId, completedDrops: list[str] = None):
# Indicate the node managers that they have to subscribe to events
# published by some nodes
if self._drop_rels.get(sessionId, None):
Expand Down Expand Up @@ -577,7 +579,7 @@ class DataIslandManager(CompositeManager):
The DataIslandManager, which manages a number of NodeManagers.
"""

def __init__(self, dmHosts=[], pkeyPath=None, dmCheckTimeout=10):
def __init__(self, dmHosts: list[str]=None, pkeyPath=None, dmCheckTimeout=10):
super(DataIslandManager, self).__init__(
NODE_DEFAULT_REST_PORT,
"node",
Expand All @@ -597,7 +599,7 @@ class MasterManager(CompositeManager):
The MasterManager, which manages a number of DataIslandManagers.
"""

def __init__(self, dmHosts=[], pkeyPath=None, dmCheckTimeout=10):
def __init__(self, dmHosts: list[str]=None, pkeyPath=None, dmCheckTimeout=10):
super(MasterManager, self).__init__(
ISLAND_DEFAULT_REST_PORT,
"island",
Expand Down
3 changes: 2 additions & 1 deletion daliuge-engine/dlg/manager/drop_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""
Module containing the base interface for all DROP managers.
"""
from __future__ import annotations
import abc


Expand Down Expand Up @@ -103,7 +104,7 @@ def getGraphReproData(self, sessionId):
"""

@abc.abstractmethod
def deploySession(self, sessionId, completedDrops=[]):
def deploySession(self, sessionId, completedDrops:list[str]=None):
"""
Deploys the graph specification held by session `sessionId`, effectively
creating all DROPs, linking them together, and moving those whose UID
Expand Down
7 changes: 5 additions & 2 deletions daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
thus represents the bottom of the DROP management hierarchy.
"""

from __future__ import annotations
import abc
import collections
import copy
Expand Down Expand Up @@ -239,7 +240,7 @@ def __init__(
dlm_enable_replication=False,
dlgPath=None,
error_listener=None,
event_listeners=[],
event_listeners: list=None,
max_threads=0,
use_processes=False,
logdir=utils.getDlgLogsDir(),
Expand Down Expand Up @@ -273,6 +274,8 @@ def __init__(
self._error_listener = (
_load(error_listener, "on_error") if error_listener else None
)
if not event_listeners:
event_listeners = []
self._event_listeners = [
_load(l, "handleEvent") for l in event_listeners
]
Expand Down Expand Up @@ -354,7 +357,7 @@ def getGraph(self, sessionId):
def getLogDir(self):
return self.logdir

def deploySession(self, sessionId, completedDrops=[]):
def deploySession(self, sessionId, completedDrops:list[str]=None):
self._check_session_id(sessionId)
session = self._sessions[sessionId]
if hasattr(self, "_memoryManager"):
Expand Down
4 changes: 3 additions & 1 deletion daliuge-engine/dlg/manager/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
from __future__ import annotations
import json
import logging

import bottle
import pkg_resources

from typing import List, Optional
from .drop_manager import DROPManager
from .rest import ManagerRestServer
from .session import SessionStates
Expand Down Expand Up @@ -70,7 +72,7 @@ def createSession(self, sessionId):
def addGraphSpec(self, sessionId, graphSpec):
raise NotImplementedError()

def deploySession(self, sessionId, completedDrops=[]):
def deploySession(self, sessionId, completedDrops: Optional[List[str]] = None):
raise NotImplementedError()

def destroySession(self, sessionId):
Expand Down
12 changes: 9 additions & 3 deletions daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Module containing the logic of a session -- a given graph execution
"""

from __future__ import annotations
import collections
import inspect
import json
Expand Down Expand Up @@ -57,6 +58,7 @@
DaliugeException,
)


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -330,7 +332,8 @@ def linkGraphParts(self, lhOID, rhOID, linkType, force=False):
graph_loader.addLink(linkType, lhDropSpec, rhOID, force=force)

@track_current_session
def deploy(self, completedDrops=[], event_listeners=[], foreach=None):
def deploy(self, completedDrops:list[str]=None,
event_listeners:list=None, foreach=None):
"""
Creates the DROPs represented by all the graph specs contained in
this session, effectively deploying them.
Expand All @@ -345,6 +348,8 @@ def deploy(self, completedDrops=[], event_listeners=[], foreach=None):
# In those cases we still want to be able to "deploy" this session
# to keep a consistent state across all NM sessions, even though
# in reality this particular session is managing nothing
if not completedDrops:
completedDrops = []
status = self.status
if (self._graph and status != SessionStates.BUILDING) or (
not self._graph and status != SessionStates.PRISTINE
Expand Down Expand Up @@ -389,8 +394,9 @@ def deploy(self, completedDrops=[], event_listeners=[], foreach=None):
drop._rpc_endpoint = self._nm.rpc_endpoint

# Register them with the error handler
for l in event_listeners:
drop.subscribe(l)
if event_listeners:
for l in event_listeners:
drop.subscribe(l)
# Register each drop for reproducibility listening
drop.subscribe(repro_listener, "reproducibility")

Expand Down
21 changes: 12 additions & 9 deletions daliuge-engine/dlg/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
from __future__ import annotations
import threading

from dlg.manager import constants
Expand Down Expand Up @@ -50,7 +51,7 @@ def stop(self):

class ManagerStarter(object):
def _start_manager_in_thread(
self, port, manager_class, rest_class, *manager_args, **manager_kwargs
self, port, manager_class, rest_class, *manager_args, **manager_kwargs
):
manager = manager_class(*manager_args, **manager_kwargs)
server = rest_class(manager)
Expand All @@ -63,21 +64,23 @@ def start_nm_in_thread(self, port=constants.NODE_DEFAULT_REST_PORT):
return self._start_manager_in_thread(port, NodeManager, NMRestServer, False)

def start_dim_in_thread(
self,
nm_hosts=[f"localhost:{constants.NODE_DEFAULT_REST_PORT}"],
port=constants.ISLAND_DEFAULT_REST_PORT,
self,
nm_hosts: list[str] = None,
port=constants.ISLAND_DEFAULT_REST_PORT,
):
if not nm_hosts:
nm_hosts = [f"localhost:{constants.NODE_DEFAULT_REST_PORT}"]
return self._start_manager_in_thread(
port, DataIslandManager, CompositeManagerRestServer, nm_hosts
)

def start_mm_in_thread(
self,
nm_hosts=[
f"localhost:{constants.ISLAND_DEFAULT_REST_PORT}",
],
port=constants.MASTER_DEFAULT_REST_PORT,
self,
nm_hosts: list[str]=None,
port=constants.MASTER_DEFAULT_REST_PORT,
):
if not nm_hosts:
nm_hosts = [f"localhost:{constants.ISLAND_DEFAULT_REST_PORT}"]
return self._start_manager_in_thread(
port, MasterManager, CompositeManagerRestServer, nm_hosts
)
14 changes: 9 additions & 5 deletions daliuge-engine/test/manager/test_dm.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def sleepAndCopy(uid, **kwargs):
return dropSpec


def quickDeploy(nm, sessionId, graphSpec, node_subscriptions={}):
def quickDeploy(nm, sessionId, graphSpec, node_subscriptions:dict = None):
if not node_subscriptions:
node_subscriptions = {}
nm.createSession(sessionId)
nm.addGraphSpec(sessionId, graphSpec)
nm.add_node_subscriptions(sessionId, node_subscriptions)
Expand Down Expand Up @@ -142,13 +144,14 @@ def _test_runGraphInTwoNMs(
leaf_data,
root_oids=("A",),
leaf_oid="C",
expected_failures=[],
expected_failures: list=None,
sessionId=f"s{random.randint(0, 1000)}",
node_managers=None,
threads=0,
):
"""Utility to run a graph in two Node Managers"""


dm1, dm2 = node_managers or [
self._start_dm(threads=threads) for _ in range(2)
]
Expand All @@ -171,12 +174,13 @@ def _test_runGraphInTwoNMs(
drop.write(root_data)
drop.setCompleted()

if not expected_failures:
expected_failures = []
expected_successes = [
drops[oid] for oid in drops if oid not in expected_failures
]
expected_failures = [
drops[oid] for oid in drops if oid in expected_failures
]
expected_failures = [drops[oid] for oid in drops if oid in expected_failures]

for drop in expected_successes:
self.assertEqual(DROPStates.COMPLETED, drop.status)
for drop in expected_failures:
Expand Down

0 comments on commit 10110da

Please sign in to comment.