Skip to content

Commit

Permalink
#6529: remove thread from ExecutionResultSender (#6634)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaroslawmalekcodete authored and Mariusz Jurowicz committed Jan 30, 2018
1 parent b27663b commit 23341ca
Showing 1 changed file with 8 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package com.twosigma.beakerx.kernel.threads;

import java.util.List;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.ConcurrentLinkedQueue;

import com.twosigma.beakerx.jvm.object.SimpleEvaluationObject;
import com.twosigma.beakerx.kernel.KernelFunctionality;
Expand All @@ -31,9 +31,6 @@
public class ExecutionResultSender implements Observer {

public static Logger logger = LoggerFactory.getLogger(ExecutionResultSender.class);

private final ConcurrentLinkedQueue<MessageHolder> messageQueue = new ConcurrentLinkedQueue<>();
private AbstractThread workingThread;
private KernelFunctionality kernel;

public ExecutionResultSender(KernelFunctionality kernel) {
Expand All @@ -44,41 +41,18 @@ public ExecutionResultSender(KernelFunctionality kernel) {
public synchronized void update(Observable o, Object arg) {
SimpleEvaluationObject seo = (SimpleEvaluationObject) o;
if (seo != null) {
messageQueue.addAll(MessageCreator.createMessage(seo));
if (workingThread == null || !workingThread.isAlive()) {
workingThread = new MessageRunnable();
workingThread.start();
}
}
}

protected class MessageRunnable extends AbstractThread {

@Override
public boolean getRunning() {
return running && !messageQueue.isEmpty();
}

@Override
public void run() {
while (getRunning()) {
MessageHolder job = messageQueue.poll();
if (job != null) {
if (SocketEnum.IOPUB_SOCKET.equals(job.getSocketType())) {
kernel.publish(job.getMessage());
} else if (SocketEnum.SHELL_SOCKET.equals(job.getSocketType())) {
kernel.send(job.getMessage());
}
List<MessageHolder> message = MessageCreator.createMessage(seo);
message.forEach(job -> {
if (SocketEnum.IOPUB_SOCKET.equals(job.getSocketType())) {
kernel.publish(job.getMessage());
} else if (SocketEnum.SHELL_SOCKET.equals(job.getSocketType())) {
kernel.send(job.getMessage());
}
}
logger.debug("MessageRunnable shutdown.");
});
}
}

public void exit() {
if (workingThread != null) {
workingThread.halt();
}
}

}

0 comments on commit 23341ca

Please sign in to comment.