-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(chore): refactor batch event processor to use blocking queue poll and take so as not to spin too much. #343
Conversation
Pull Request Test Coverage Report for Build 1238
💛 - Coveralls |
logger.debug("Empty item, sleeping for 50ms."); | ||
Thread.sleep(50); | ||
logger.debug("Empty item after waiting flush interval. Flushing."); | ||
flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flush() is not necessary since it will be captured on the next while
iteration. in general I'd like to keep the number of places we're calling flush()
and setting the deadline to a minimum. (Ideally one place)
} | ||
|
||
Object item = eventQueue.poll(50, TimeUnit.MILLISECONDS); | ||
Object item = eventQueue.poll(flushInterval, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this another configuration option. If we set to flushInterval
then the messages are only guaranteed to be dispatched every (2 * flushInterval) milliseconds. I'd like to better understand the CPU and battery utilization with a change like this.
public class EventConsumer implements Runnable { | ||
private LinkedList<UserEvent> currentBatch = new LinkedList<>(); | ||
private long deadline = System.currentTimeMillis() + flushInterval; | ||
|
||
@Override | ||
public void run() { | ||
try { | ||
int waitCount = 0; | ||
|
||
QueueService polling = () -> eventQueue.poll(flushInterval, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When polling the queue, should we block until the next deadline, instead of blocking for flushInterval
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the approach of switching from poll
to take()
. This is great since we're not introducing another concurrent structure which would have greatly added to the overall complexity.
I did suggest a refactor to not use to lambdas and to use the computed timeout interval. I'm also still not keen on setting the deadline
and calling flush()
in as many places as we are. Reading the code it looks like we're taking shots in the dark.
} | ||
|
||
Object item = eventQueue.poll(50, TimeUnit.MILLISECONDS); | ||
Object item = using.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the QueueService
adds a bit too much cognitive load then what we need in this already complex loop. Since we're basing our decision to use take()
or poll(...)
based on the number of times we're exited the poll with null
items we can use a ternary operator inline:
long timeout = deadline - System.currentTimeMillis();
Object item = emptyCount > MAX_EMPTY_COUNT ? eventQueue.take() : eventQueue.poll(timeout, TimeUnit.MILLISECONDS);
Note I'm suggesting renaming from "wait" to "empty" since I think better captures the intention of the counter. I also used the dynamic timeout as opposed to the fix flushInterval
as we discussed earlier.
public class EventConsumer implements Runnable { | ||
private LinkedList<UserEvent> currentBatch = new LinkedList<>(); | ||
private long deadline = System.currentTimeMillis() + flushInterval; | ||
|
||
@Override | ||
public void run() { | ||
try { | ||
int waitCount = 0; | ||
|
||
QueueService polling = () -> eventQueue.poll(System.currentTimeMillis() - flushInterval, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not the same as deadline - System.currentTimeMillis()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…d take so as not to spin too much. (#343) * need to reset the deadline after it is hit once. * use flush interval for event queue polling * use take if you have timed out enough * refactor to only reset interval in one place. * remove lambda after discussion with miked * fix log message * check if deadline is equal or passed since we set timeout at deadline
Summary
Test plan
FSC test?