Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for thread interrupt in subscribe process of PubSub #3726

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/JedisPubSubBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private void process() {
} else {
throw new JedisException("Unknown message type: " + reply);
}
} while (isSubscribed());
} while (!Thread.currentThread().isInterrupted() && isSubscribed());

// /* Invalidate instance since this thread is no longer listening */
// this.client = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private void process() {
} else {
throw new JedisException("Unknown message type: " + reply);
}
} while (isSubscribed());
} while (!Thread.currentThread().isInterrupted() && isSubscribed());

// /* Invalidate instance since this thread is no longer listening */
// this.client = null;
Expand Down
59 changes: 59 additions & 0 deletions src/test/java/redis/clients/jedis/JedisPubSubBaseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package redis.clients.jedis;

import junit.framework.TestCase;
import redis.clients.jedis.util.SafeEncoder;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static redis.clients.jedis.Protocol.ResponseKeyword.MESSAGE;
import static redis.clients.jedis.Protocol.ResponseKeyword.SUBSCRIBE;

public class JedisPubSubBaseTest extends TestCase {

public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException {
// setup
final JedisPubSubBase<String> pubSub = new JedisPubSubBase<String>() {

@Override
public void onMessage(String channel, String message) {
fail("this should not happen when thread is interrupted");
}

@Override
protected String encode(byte[] raw) {
return SafeEncoder.encode(raw);
}
};

final Connection mockConnection = mock(Connection.class);
final List<Object> mockSubscribe = Arrays.asList(
SUBSCRIBE.getRaw(), "channel".getBytes(), 1L
);
final List<Object> mockResponse = Arrays.asList(
MESSAGE.getRaw(), "channel".getBytes(), "message".getBytes()
);

when(mockConnection.getUnflushedObject()).

thenReturn(mockSubscribe, mockResponse);


final CountDownLatch countDownLatch = new CountDownLatch(1);
// action
final Thread thread = new Thread(() -> {
Thread.currentThread().interrupt();
pubSub.proceed(mockConnection, "channel");

countDownLatch.countDown();
});
thread.start();

assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS));

}
}
56 changes: 56 additions & 0 deletions src/test/java/redis/clients/jedis/JedisShardedPubSubBaseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package redis.clients.jedis;

import junit.framework.TestCase;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static redis.clients.jedis.Protocol.ResponseKeyword.SMESSAGE;
import static redis.clients.jedis.Protocol.ResponseKeyword.SSUBSCRIBE;

public class JedisShardedPubSubBaseTest extends TestCase {

public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException {
// setup
final JedisShardedPubSubBase<String> pubSub = new JedisShardedPubSubBase<String>() {

@Override
public void onSMessage(String channel, String message) {
fail("this should not happen when thread is interrupted");
}

@Override
protected String encode(byte[] raw) {
return new String(raw);
}

};

final Connection mockConnection = mock(Connection.class);
final List<Object> mockSubscribe = Arrays.asList(
SSUBSCRIBE.getRaw(), "channel".getBytes(), 1L
);
final List<Object> mockResponse = Arrays.asList(
SMESSAGE.getRaw(), "channel".getBytes(), "message".getBytes()
);
when(mockConnection.getUnflushedObject()).thenReturn(mockSubscribe, mockResponse);


final CountDownLatch countDownLatch = new CountDownLatch(1);
// action
final Thread thread = new Thread(() -> {
Thread.currentThread().interrupt();
pubSub.proceed(mockConnection, "channel");

countDownLatch.countDown();
});
thread.start();

assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS));

}
}
Loading