Skip to content

Commit

Permalink
Remove singletons
Browse files Browse the repository at this point in the history
  • Loading branch information
dwoz authored and garethgreenaway committed Sep 15, 2021
1 parent e8c7629 commit 7af10f2
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 154 deletions.
51 changes: 1 addition & 50 deletions salt/transport/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,6 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
Note: this class returns a singleton
"""

# This class is only a singleton per minion/master pair
# mapping of io_loop -> {key -> channel}
instance_map = weakref.WeakKeyDictionary()
async_methods = [
"crypted_transfer_decode_dictentry",
"_crypted_transfer",
Expand All @@ -214,62 +211,16 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
"close",
]

def __new__(cls, opts, **kwargs):
"""
Only create one instance of channel per __key()
"""
# do we have any mapping for this io_loop
io_loop = kwargs.get("io_loop") or salt.ext.tornado.ioloop.IOLoop.current()
if io_loop not in cls.instance_map:
cls.instance_map[io_loop] = weakref.WeakValueDictionary()
loop_instance_map = cls.instance_map[io_loop]

key = cls.__key(opts, **kwargs)
obj = loop_instance_map.get(key)
if obj is None:
log.debug("Initializing new AsyncTCPReqChannel for %s", key)
# we need to make a local variable for this, as we are going to store
# it in a WeakValueDictionary-- which will remove the item if no one
# references it-- this forces a reference while we return to the caller
obj = object.__new__(cls)
obj.__singleton_init__(opts, **kwargs)
obj._instance_key = key
loop_instance_map[key] = obj
obj._refcount = 1
obj._refcount_lock = threading.RLock()
else:
with obj._refcount_lock:
obj._refcount += 1
log.debug("Re-using AsyncTCPReqChannel for %s", key)
return obj

@classmethod
def __key(cls, opts, **kwargs):
if "master_uri" in kwargs:
opts["master_uri"] = kwargs["master_uri"]
return (
opts["pki_dir"], # where the keys are stored
opts["id"], # minion ID
opts["master_uri"],
kwargs.get("crypt", "aes"), # TODO: use the same channel for crypt
)

@classmethod
def force_close_all_instances(cls):
"""
Will force close all instances
:return: None
"""
for weak_dict in list(cls.instance_map.values()):
for instance in list(weak_dict.values()):
instance.close()

# has to remain empty for singletons, since __init__ will *always* be called
def __init__(self, opts, **kwargs):
pass

# an init for the singleton instance to call
def __singleton_init__(self, opts, **kwargs):
def __init__(self, opts, **kwargs):
self.opts = dict(opts)

self.serial = salt.payload.Serial(self.opts)
Expand Down
105 changes: 1 addition & 104 deletions salt/transport/zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
ZMQ Channels default to 'crypt=aes'
"""

# This class is only a singleton per minion/master pair
# mapping of io_loop -> {key -> channel}
instance_map = weakref.WeakKeyDictionary()
async_methods = [
"crypted_transfer_decode_dictentry",
"_crypted_transfer",
Expand All @@ -141,76 +138,6 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
"close",
]

def __new__(cls, opts, **kwargs):
"""
Only create one instance of channel per __key()
"""

# do we have any mapping for this io_loop
io_loop = kwargs.get("io_loop")
if io_loop is None:
io_loop = salt.ext.tornado.ioloop.IOLoop.current()
if io_loop not in cls.instance_map:
cls.instance_map[io_loop] = weakref.WeakValueDictionary()
loop_instance_map = cls.instance_map[io_loop]

key = cls.__key(opts, **kwargs)
obj = loop_instance_map.get(key)
if obj is None:
log.debug("Initializing new AsyncZeroMQReqChannel for %s", key)
# we need to make a local variable for this, as we are going to store
# it in a WeakValueDictionary-- which will remove the item if no one
# references it-- this forces a reference while we return to the caller
obj = object.__new__(cls)
obj.__singleton_init__(opts, **kwargs)
obj._instance_key = key
loop_instance_map[key] = obj
obj._refcount = 1
obj._refcount_lock = threading.RLock()
log.trace(
"Inserted key into loop_instance_map id %s for key %s and process %s",
id(loop_instance_map),
key,
os.getpid(),
)
else:
with obj._refcount_lock:
obj._refcount += 1
log.debug("Re-using AsyncZeroMQReqChannel for %s", key)
return obj

def __deepcopy__(self, memo):
cls = self.__class__
# pylint: disable=too-many-function-args
result = cls.__new__(cls, copy.deepcopy(self.opts, memo))
# pylint: enable=too-many-function-args
memo[id(self)] = result
for key in self.__dict__:
if key in ("_io_loop", "_refcount", "_refcount_lock"):
continue
# The _io_loop has a thread Lock which will fail to be deep
# copied. Skip it because it will just be recreated on the
# new copy.
if key == "message_client":
# Recreate the message client because it will fail to be deep
# copied. The reason is the same as the io_loop skip above.
setattr(
result,
key,
AsyncReqMessageClientPool(
result.opts,
args=(
result.opts,
self.master_uri,
),
kwargs={"io_loop": self._io_loop},
),
)

continue
setattr(result, key, copy.deepcopy(self.__dict__[key], memo))
return result

@classmethod
def force_close_all_instances(cls):
"""
Expand All @@ -221,25 +148,9 @@ def force_close_all_instances(cls):
:return: None
"""
for weak_dict in list(cls.instance_map.values()):
for instance in list(weak_dict.values()):
instance.close()

@classmethod
def __key(cls, opts, **kwargs):
return (
opts["pki_dir"], # where the keys are stored
opts["id"], # minion ID
kwargs.get("master_uri", opts.get("master_uri")), # master ID
kwargs.get("crypt", "aes"), # TODO: use the same channel for crypt
)

# has to remain empty for singletons, since __init__ will *always* be called
def __init__(self, opts, **kwargs):
pass

# an init for the singleton instance to call
def __singleton_init__(self, opts, **kwargs):
def __init__(self, opts, **kwargs):
self.opts = dict(opts)
self.ttype = "zeromq"

Expand Down Expand Up @@ -278,16 +189,6 @@ def close(self):
if self._closing:
return

if self._refcount > 1:
# Decrease refcount
with self._refcount_lock:
self._refcount -= 1
log.debug(
"This is not the last %s instance. Not closing yet.",
self.__class__.__name__,
)
return

log.debug("Closing %s instance", self.__class__.__name__)
self._closing = True
if hasattr(self, "message_client"):
Expand All @@ -306,10 +207,6 @@ def close(self):

# pylint: disable=W1701
def __del__(self):
with self._refcount_lock:
# Make sure we actually close no matter if something
# went wrong with our ref counting
self._refcount = 1
try:
self.close()
except OSError as exc:
Expand Down

0 comments on commit 7af10f2

Please sign in to comment.