diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 5314ae210c89..d1c1dc966097 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -201,9 +201,6 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel): Note: this class returns a singleton """ - # This class is only a singleton per minion/master pair - # mapping of io_loop -> {key -> channel} - instance_map = weakref.WeakKeyDictionary() async_methods = [ "crypted_transfer_decode_dictentry", "_crypted_transfer", @@ -214,45 +211,6 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel): "close", ] - def __new__(cls, opts, **kwargs): - """ - Only create one instance of channel per __key() - """ - # do we have any mapping for this io_loop - io_loop = kwargs.get("io_loop") or salt.ext.tornado.ioloop.IOLoop.current() - if io_loop not in cls.instance_map: - cls.instance_map[io_loop] = weakref.WeakValueDictionary() - loop_instance_map = cls.instance_map[io_loop] - - key = cls.__key(opts, **kwargs) - obj = loop_instance_map.get(key) - if obj is None: - log.debug("Initializing new AsyncTCPReqChannel for %s", key) - # we need to make a local variable for this, as we are going to store - # it in a WeakValueDictionary-- which will remove the item if no one - # references it-- this forces a reference while we return to the caller - obj = object.__new__(cls) - obj.__singleton_init__(opts, **kwargs) - obj._instance_key = key - loop_instance_map[key] = obj - obj._refcount = 1 - obj._refcount_lock = threading.RLock() - else: - with obj._refcount_lock: - obj._refcount += 1 - log.debug("Re-using AsyncTCPReqChannel for %s", key) - return obj - - @classmethod - def __key(cls, opts, **kwargs): - if "master_uri" in kwargs: - opts["master_uri"] = kwargs["master_uri"] - return ( - opts["pki_dir"], # where the keys are stored - opts["id"], # minion ID - opts["master_uri"], - kwargs.get("crypt", "aes"), # TODO: use the same channel for crypt - ) @classmethod def force_close_all_instances(cls): @@ -260,16 +218,9 @@ def force_close_all_instances(cls): Will force close all instances :return: None """ - for weak_dict in list(cls.instance_map.values()): - for instance in list(weak_dict.values()): - instance.close() - - # has to remain empty for singletons, since __init__ will *always* be called - def __init__(self, opts, **kwargs): pass - # an init for the singleton instance to call - def __singleton_init__(self, opts, **kwargs): + def __init__(self, opts, **kwargs): self.opts = dict(opts) self.serial = salt.payload.Serial(self.opts) diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 763539687830..4a01ad2a9d9b 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -127,9 +127,6 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel): ZMQ Channels default to 'crypt=aes' """ - # This class is only a singleton per minion/master pair - # mapping of io_loop -> {key -> channel} - instance_map = weakref.WeakKeyDictionary() async_methods = [ "crypted_transfer_decode_dictentry", "_crypted_transfer", @@ -141,76 +138,6 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel): "close", ] - def __new__(cls, opts, **kwargs): - """ - Only create one instance of channel per __key() - """ - - # do we have any mapping for this io_loop - io_loop = kwargs.get("io_loop") - if io_loop is None: - io_loop = salt.ext.tornado.ioloop.IOLoop.current() - if io_loop not in cls.instance_map: - cls.instance_map[io_loop] = weakref.WeakValueDictionary() - loop_instance_map = cls.instance_map[io_loop] - - key = cls.__key(opts, **kwargs) - obj = loop_instance_map.get(key) - if obj is None: - log.debug("Initializing new AsyncZeroMQReqChannel for %s", key) - # we need to make a local variable for this, as we are going to store - # it in a WeakValueDictionary-- which will remove the item if no one - # references it-- this forces a reference while we return to the caller - obj = object.__new__(cls) - obj.__singleton_init__(opts, **kwargs) - obj._instance_key = key - loop_instance_map[key] = obj - obj._refcount = 1 - obj._refcount_lock = threading.RLock() - log.trace( - "Inserted key into loop_instance_map id %s for key %s and process %s", - id(loop_instance_map), - key, - os.getpid(), - ) - else: - with obj._refcount_lock: - obj._refcount += 1 - log.debug("Re-using AsyncZeroMQReqChannel for %s", key) - return obj - - def __deepcopy__(self, memo): - cls = self.__class__ - # pylint: disable=too-many-function-args - result = cls.__new__(cls, copy.deepcopy(self.opts, memo)) - # pylint: enable=too-many-function-args - memo[id(self)] = result - for key in self.__dict__: - if key in ("_io_loop", "_refcount", "_refcount_lock"): - continue - # The _io_loop has a thread Lock which will fail to be deep - # copied. Skip it because it will just be recreated on the - # new copy. - if key == "message_client": - # Recreate the message client because it will fail to be deep - # copied. The reason is the same as the io_loop skip above. - setattr( - result, - key, - AsyncReqMessageClientPool( - result.opts, - args=( - result.opts, - self.master_uri, - ), - kwargs={"io_loop": self._io_loop}, - ), - ) - - continue - setattr(result, key, copy.deepcopy(self.__dict__[key], memo)) - return result - @classmethod def force_close_all_instances(cls): """ @@ -221,25 +148,9 @@ def force_close_all_instances(cls): :return: None """ - for weak_dict in list(cls.instance_map.values()): - for instance in list(weak_dict.values()): - instance.close() - - @classmethod - def __key(cls, opts, **kwargs): - return ( - opts["pki_dir"], # where the keys are stored - opts["id"], # minion ID - kwargs.get("master_uri", opts.get("master_uri")), # master ID - kwargs.get("crypt", "aes"), # TODO: use the same channel for crypt - ) - - # has to remain empty for singletons, since __init__ will *always* be called - def __init__(self, opts, **kwargs): pass - # an init for the singleton instance to call - def __singleton_init__(self, opts, **kwargs): + def __init__(self, opts, **kwargs): self.opts = dict(opts) self.ttype = "zeromq" @@ -278,16 +189,6 @@ def close(self): if self._closing: return - if self._refcount > 1: - # Decrease refcount - with self._refcount_lock: - self._refcount -= 1 - log.debug( - "This is not the last %s instance. Not closing yet.", - self.__class__.__name__, - ) - return - log.debug("Closing %s instance", self.__class__.__name__) self._closing = True if hasattr(self, "message_client"): @@ -306,10 +207,6 @@ def close(self): # pylint: disable=W1701 def __del__(self): - with self._refcount_lock: - # Make sure we actually close no matter if something - # went wrong with our ref counting - self._refcount = 1 try: self.close() except OSError as exc: