Skip to content

Commit

Permalink
modify
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Dec 12, 2022
1 parent 66bf801 commit 7dcbcb5
Showing 1 changed file with 82 additions and 74 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package redis.clients.jedis.providers;

import redis.clients.jedis.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
Expand All @@ -12,26 +10,40 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.Connection;
import redis.clients.jedis.ConnectionPool;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisClientConfig;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.IOUtils;

public class SentineledConnectionProvider implements ConnectionProvider {

private static final Logger LOG = LoggerFactory.getLogger(SentineledConnectionProvider.class);

private final Object initPoolLock = new Object();
protected static final long DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS = 5000;

private volatile HostAndPort currentMaster;

private final GenericObjectPoolConfig<Connection> masterPoolConfig;
private volatile ConnectionPool pool;

private final String masterName;

private final JedisClientConfig masterClientConfig;

private volatile ConnectionPool pool;
private final GenericObjectPoolConfig<Connection> masterPoolConfig;

protected final Collection<SentinelListener> sentinelListeners = new ArrayList<>();

private final JedisClientConfig sentinelClientConfig;

protected final Collection<MasterListener> masterListeners = new ArrayList<>();
private final long subscribeRetryWaitTimeMillis;

private final Object initPoolLock = new Object();

public SentineledConnectionProvider(String masterName, Set<HostAndPort> sentinels,
final JedisClientConfig masterClientConfig, final JedisClientConfig sentinelClientConfig) {
Expand All @@ -41,12 +53,23 @@ public SentineledConnectionProvider(String masterName, Set<HostAndPort> sentinel
public SentineledConnectionProvider(String masterName, Set<HostAndPort> sentinels,
final JedisClientConfig masterClientConfig, final JedisClientConfig sentinelClientConfig,
final GenericObjectPoolConfig<Connection> poolConfig) {
this(masterName, masterClientConfig, poolConfig, sentinels, sentinelClientConfig,
DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS);
}

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig,
final long subscribeRetryWaitTimeMillis) {

this.masterName = masterName;
this.masterClientConfig = masterClientConfig;
this.sentinelClientConfig = sentinelClientConfig;
this.masterPoolConfig = poolConfig;

HostAndPort master = initSentinels(sentinels, masterName);
this.sentinelClientConfig = sentinelClientConfig;
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;

HostAndPort master = initSentinels(sentinels);
initMaster(master);
}

Expand All @@ -62,7 +85,7 @@ public Connection getConnection(CommandArguments args) {

@Override
public void close() {
masterListeners.forEach(MasterListener::shutdown);
sentinelListeners.forEach(SentinelListener::shutdown);

pool.close();
}
Expand Down Expand Up @@ -94,16 +117,16 @@ private void initMaster(HostAndPort master) {
}
}

private HostAndPort initSentinels(Set<HostAndPort> sentinels, final String masterName) {
private HostAndPort initSentinels(Set<HostAndPort> sentinels) {

HostAndPort master = null;
boolean sentinelAvailable = false;

LOG.info("Trying to find master from available Sentinels...");
LOG.debug("Trying to find master from available sentinels...");

for (HostAndPort sentinel : sentinels) {

LOG.debug("Connecting to Sentinel {}", sentinel);
LOG.debug("Connecting to Sentinel {}...", sentinel);

try (Jedis jedis = new Jedis(sentinel, sentinelClientConfig)) {

Expand All @@ -113,76 +136,65 @@ private HostAndPort initSentinels(Set<HostAndPort> sentinels, final String maste
sentinelAvailable = true;

if (masterAddr == null || masterAddr.size() != 2) {
LOG.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, sentinel);
LOG.warn("Sentinel {} is not monitoring master {}.", sentinel, masterName);
continue;
}

master = toHostAndPort(masterAddr);
LOG.debug("Found Redis master at {}", master);
LOG.debug("Redis master reported at {}.", master);
break;
} catch (JedisException e) {
// resolves #1036, it should handle JedisException there's another chance
// of raising JedisDataException
LOG.warn(
"Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.", sentinel, e);
LOG.warn("Could not get master address from {}.", sentinel, e);
}
}

if (master == null) {
if (sentinelAvailable) {
// can connect to sentinel, but master name seems to not monitored
throw new JedisException("Can connect to sentinel, but " + masterName
+ " seems to be not monitored...");
throw new JedisException(
"Can connect to sentinel, but " + masterName + " seems to be not monitored.");
} else {
throw new JedisConnectionException("All sentinels down, cannot determine where is "
+ masterName + " master is running...");
throw new JedisConnectionException(
"All sentinels down, cannot determine where " + masterName + " is running.");
}
}

LOG.info("Redis master running at {}, starting Sentinel listeners...", master);
LOG.info("Redis master running at {}. Starting sentinel listeners...", master);

for (HostAndPort sentinel : sentinels) {

MasterListener masterListener = new MasterListener(masterName, sentinel.getHost(), sentinel.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
SentinelListener listener = new SentinelListener(sentinel);
// whether SentinelListener threads are alive or not, process can be stopped
listener.setDaemon(true);
sentinelListeners.add(listener);
listener.start();
}

return master;
}

private HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
String host = getMasterAddrByNameResult.get(0);
int port = Integer.parseInt(getMasterAddrByNameResult.get(1));
/**
* Must be of size 2.
*/
private static HostAndPort toHostAndPort(List<String> masterAddr) {
return toHostAndPort(masterAddr.get(0), masterAddr.get(1));
}

return new HostAndPort(host, port);
private static HostAndPort toHostAndPort(String hostStr, String portStr) {
return new HostAndPort(hostStr, Integer.parseInt(portStr));
}

protected class MasterListener extends Thread {
protected class SentinelListener extends Thread {

protected String masterName;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis = 5000;
protected volatile Jedis j;
protected final HostAndPort node;
protected volatile Jedis sentinelJedis;
protected AtomicBoolean running = new AtomicBoolean(false);

protected MasterListener() {
}

public MasterListener(String masterName, String host, int port) {
super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
this.masterName = masterName;
this.host = host;
this.port = port;
}

public MasterListener(String masterName, String host, int port,
long subscribeRetryWaitTimeMillis) {
this(masterName, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
public SentinelListener(HostAndPort node) {
super(String.format("%s-SentinelListener-[%s]", masterName, node.toString()));
this.node = node;
}

@Override
Expand All @@ -197,75 +209,71 @@ public void run() {
if (!running.get()) {
break;
}

final HostAndPort hostPort = new HostAndPort(host, port);
j = new Jedis(hostPort, sentinelClientConfig);

sentinelJedis = new Jedis(node, sentinelClientConfig);

// code for active refresh
List<String> masterAddr = j.sentinelGetMasterAddrByName(masterName);
List<String> masterAddr = sentinelJedis.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
LOG.warn("Can not get master addr, master name: {}. Sentinel: {}.", masterName,
hostPort);
LOG.warn("Can not get master {} address. Sentinel: {}.", masterName, node);
} else {
initMaster(toHostAndPort(masterAddr));
}

j.subscribe(new JedisPubSub() {
sentinelJedis.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
LOG.debug("Sentinel {} published: {}.", hostPort, message);
LOG.debug("Sentinel {} published: {}.", node, message);

String[] switchMasterMsg = message.split(" ");

if (switchMasterMsg.length > 3) {

if (masterName.equals(switchMasterMsg[0])) {
initMaster(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
initMaster(toHostAndPort(switchMasterMsg[3], switchMasterMsg[4]));
} else {
LOG.debug(
"Ignoring message on +switch-master for master name {}, our master name is {}",
"Ignoring message on +switch-master for master {}. Our master is {}.",
switchMasterMsg[0], masterName);
}

} else {
LOG.error("Invalid message received on Sentinel {} on channel +switch-master: {}",
hostPort, message);
LOG.error("Invalid message received on sentinel {} on channel +switch-master: {}.",
node, message);
}
}
}, "+switch-master");

} catch (JedisException e) {

if (running.get()) {
LOG.error("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying.", host,
port, e);
LOG.error("Lost connection to sentinel {}. Sleeping {}ms and retrying.", node,
subscribeRetryWaitTimeMillis, e);
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
LOG.error("Sleep interrupted: ", e1);
} catch (InterruptedException se) {
LOG.error("Sleep interrupted.", se);
}
} else {
LOG.debug("Unsubscribing from Sentinel at {}:{}", host, port);
LOG.debug("Unsubscribing from sentinel {}.", node);
}
} finally {
if (j != null) {
j.close();
}
IOUtils.closeQuietly(sentinelJedis);
}
}
}

// must not throw exception
public void shutdown() {
try {
LOG.debug("Shutting down listener on {}:{}", host, port);
LOG.debug("Shutting down listener on {}.", node);
running.set(false);
// This isn't good, the Jedis object is not thread safe
if (j != null) {
j.close();
if (sentinelJedis != null) {
sentinelJedis.close();
}
} catch (RuntimeException e) {
LOG.error("Caught exception while shutting down: ", e);
LOG.error("Error while shutting down.", e);
}
}
}
Expand Down

0 comments on commit 7dcbcb5

Please sign in to comment.