From 23341caea19fb22f870105e95caef6426bb612f2 Mon Sep 17 00:00:00 2001 From: jaroslawmalekcodete Date: Sat, 20 Jan 2018 00:53:18 +0100 Subject: [PATCH] #6529: remove thread from ExecutionResultSender (#6634) --- .../kernel/threads/ExecutionResultSender.java | 42 ++++--------------- 1 file changed, 8 insertions(+), 34 deletions(-) diff --git a/kernel/base/src/main/java/com/twosigma/beakerx/kernel/threads/ExecutionResultSender.java b/kernel/base/src/main/java/com/twosigma/beakerx/kernel/threads/ExecutionResultSender.java index f59e007d29..9543c49719 100644 --- a/kernel/base/src/main/java/com/twosigma/beakerx/kernel/threads/ExecutionResultSender.java +++ b/kernel/base/src/main/java/com/twosigma/beakerx/kernel/threads/ExecutionResultSender.java @@ -15,9 +15,9 @@ */ package com.twosigma.beakerx.kernel.threads; +import java.util.List; import java.util.Observable; import java.util.Observer; -import java.util.concurrent.ConcurrentLinkedQueue; import com.twosigma.beakerx.jvm.object.SimpleEvaluationObject; import com.twosigma.beakerx.kernel.KernelFunctionality; @@ -31,9 +31,6 @@ public class ExecutionResultSender implements Observer { public static Logger logger = LoggerFactory.getLogger(ExecutionResultSender.class); - - private final ConcurrentLinkedQueue messageQueue = new ConcurrentLinkedQueue<>(); - private AbstractThread workingThread; private KernelFunctionality kernel; public ExecutionResultSender(KernelFunctionality kernel) { @@ -44,41 +41,18 @@ public ExecutionResultSender(KernelFunctionality kernel) { public synchronized void update(Observable o, Object arg) { SimpleEvaluationObject seo = (SimpleEvaluationObject) o; if (seo != null) { - messageQueue.addAll(MessageCreator.createMessage(seo)); - if (workingThread == null || !workingThread.isAlive()) { - workingThread = new MessageRunnable(); - workingThread.start(); - } - } - } - - protected class MessageRunnable extends AbstractThread { - - @Override - public boolean getRunning() { - return running && !messageQueue.isEmpty(); - } - - @Override - public void run() { - while (getRunning()) { - MessageHolder job = messageQueue.poll(); - if (job != null) { - if (SocketEnum.IOPUB_SOCKET.equals(job.getSocketType())) { - kernel.publish(job.getMessage()); - } else if (SocketEnum.SHELL_SOCKET.equals(job.getSocketType())) { - kernel.send(job.getMessage()); - } + List message = MessageCreator.createMessage(seo); + message.forEach(job -> { + if (SocketEnum.IOPUB_SOCKET.equals(job.getSocketType())) { + kernel.publish(job.getMessage()); + } else if (SocketEnum.SHELL_SOCKET.equals(job.getSocketType())) { + kernel.send(job.getMessage()); } - } - logger.debug("MessageRunnable shutdown."); + }); } } public void exit() { - if (workingThread != null) { - workingThread.halt(); - } } }