-
Notifications
You must be signed in to change notification settings - Fork 233
periodically check connectivity to Cloud Pubsub #1031
Conversation
changes compared to #1030:
the CommonConfiguration stuff is unchanged from #1030. |
956df2b
to
47d8b35
Compare
Current coverage is 51.14% (diff: 35.29%)@@ master #1031 diff @@
==========================================
Files 107 275 +168
Lines 5095 13157 +8062
Methods 0 0
Messages 0 0
Branches 858 1699 +841
==========================================
+ Hits 2363 6729 +4366
- Misses 2514 5925 +3411
- Partials 218 503 +285
|
6f14735
to
0e327c1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but would like another pair of eyes.
this.healthchecker = healthchecker; | ||
} | ||
|
||
// public void start() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete this commented out stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is removed in a later commit
boolean isHealthy(); | ||
} | ||
|
||
public static class DefaultHealthChecker implements HealthChecker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: This and the constructor be package-local.
} | ||
@Override | ||
public void stop() throws Exception { | ||
kafkaProducer.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
|
||
public final class EventSenderFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth testing this class or too much of a pain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it would be too cumbersome as this is constructing new instances of KafkaSender GooglePubSubSender etc directly
} | ||
|
||
@Override | ||
public void send(final String topic, final byte[] message) { | ||
final String combinedTopic = topicPrefix + topic; | ||
|
||
if (!healthchecker.isHealthy()) { | ||
log.warn("will not publish message to pubsub topic={} as the pubsub client " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're missing the topic for interpolation here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please smash that approve button now
when the GooglePubSubSender cannot connect to the pubsub service, disable message publishing until connectivity is re-established (at the next scheduled time). Along the way, I refactored the common EventSender-instantiation code from AgentService and MasterService into a new class named EventSenderFactory. To make this common class possible, also extracted a class for the common configuration fields between AgentConfig and MasterConfig. This new CommonConfiguration class is incomplete - more fields can be moved to this class to avoid duplication, but I left that for future commits.
make all the EventSender implementations also implement the dropwizard Managed interface so that we can have dropwizard manage their lifecycle. This lets us cleanly shutdown the KafkaProvider or the executor used by GooglePubSubSender.
Add a Gauge for whether or not the GooglePubSubSender is healthy. Refactor the builder for GooglePubSubSender to move the creation of the healthchecker, ScheduledExecutorService etc to EventSenderFactory.
this class is only ever constructed with a non-empty Optional, so we can simplify it by removing the Optional.
0e327c1
to
fbe6378
Compare
This is an alternate implementation of #1030 which performs the healthchecks periodically rather than only once at process startup.