diff --git a/helios-services/src/main/java/com/spotify/helios/agent/AgentConfig.java b/helios-services/src/main/java/com/spotify/helios/agent/AgentConfig.java index c57cb2c6b..c9b3d8c31 100644 --- a/helios-services/src/main/java/com/spotify/helios/agent/AgentConfig.java +++ b/helios-services/src/main/java/com/spotify/helios/agent/AgentConfig.java @@ -18,6 +18,7 @@ package com.spotify.helios.agent; import com.spotify.docker.client.DockerHost; +import com.spotify.helios.servicescommon.CommonConfiguration; import com.spotify.helios.servicescommon.FastForwardConfig; import java.net.InetSocketAddress; @@ -25,12 +26,10 @@ import java.util.List; import java.util.Map; -import io.dropwizard.Configuration; - /** * The configuration of the Helios agent. */ -public class AgentConfig extends Configuration { +public class AgentConfig extends CommonConfiguration { private String domain; private String name; @@ -57,8 +56,6 @@ public class AgentConfig extends Configuration { private InetSocketAddress httpEndpoint; private boolean noHttp; private List binds; - private List kafkaBrokers; - private List pubsubPrefixes; private Map labels; private boolean zooKeeperEnableAcls; private String zookeeperAclMasterUser; @@ -289,24 +286,6 @@ public AgentConfig setBinds(List binds) { return this; } - public List getKafkaBrokers() { - return kafkaBrokers; - } - - public AgentConfig setKafkaBrokers(List kafkaBrokers) { - this.kafkaBrokers = kafkaBrokers; - return this; - } - - public List getPubsubPrefixes() { - return pubsubPrefixes; - } - - public AgentConfig setPubsubPrefixes(List pubsubPrefixes) { - this.pubsubPrefixes = pubsubPrefixes; - return this; - } - public Map getLabels() { return labels; } diff --git a/helios-services/src/main/java/com/spotify/helios/agent/AgentService.java b/helios-services/src/main/java/com/spotify/helios/agent/AgentService.java index a947e0a23..cb5bb297a 100644 --- a/helios-services/src/main/java/com/spotify/helios/agent/AgentService.java +++ b/helios-services/src/main/java/com/spotify/helios/agent/AgentService.java @@ -37,10 +37,8 @@ import com.spotify.helios.master.metrics.HealthCheckGauge; import com.spotify.helios.serviceregistration.ServiceRegistrar; import com.spotify.helios.servicescommon.EventSender; +import com.spotify.helios.servicescommon.EventSenderFactory; import com.spotify.helios.servicescommon.FastForwardConfig; -import com.spotify.helios.servicescommon.GooglePubSubProvider; -import com.spotify.helios.servicescommon.KafkaClientProvider; -import com.spotify.helios.servicescommon.KafkaSender; import com.spotify.helios.servicescommon.ManagedStatsdReporter; import com.spotify.helios.servicescommon.PersistentAtomicReference; import com.spotify.helios.servicescommon.ReactorFactory; @@ -70,7 +68,6 @@ import com.google.common.base.Strings; import com.google.common.base.Suppliers; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Resources; @@ -84,7 +81,6 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.kafka.clients.producer.KafkaProducer; import org.eclipse.jetty.server.Server; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,7 +93,6 @@ import java.nio.file.Path; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -241,24 +236,8 @@ public AgentService(final AgentConfig config, final Environment environment) new ZooKeeperModelReporter(riemannFacade, metrics.getZooKeeperMetrics()); final ZooKeeperClientProvider zkClientProvider = new ZooKeeperClientProvider( zooKeeperClient, modelReporter); - final KafkaClientProvider kafkaClientProvider = new KafkaClientProvider( - config.getKafkaBrokers()); - final GooglePubSubProvider googlePubSubProvider = new GooglePubSubProvider( - config.getPubsubPrefixes()); - - final ImmutableList.Builder eventSenders = ImmutableList.builder(); - - // Make a KafkaProducer for events that can be serialized to an array of bytes, - // and wrap it in our KafkaSender. - final Optional> kafkaProducer = - kafkaClientProvider.getDefaultProducer(); - if (kafkaProducer.isPresent()) { - eventSenders.add(new KafkaSender(kafkaProducer)); - } - - // GooglePubsub senders - eventSenders.addAll(googlePubSubProvider.senders()); + final List eventSenders = new EventSenderFactory(config, true).get(); final TaskHistoryWriter historyWriter; if (config.isJobHistoryDisabled()) { @@ -269,9 +248,8 @@ public AgentService(final AgentConfig config, final Environment environment) } try { - this.model = new ZooKeeperAgentModel(zkClientProvider, - config.getName(), stateDirectory, historyWriter, - eventSenders.build()); + this.model = new ZooKeeperAgentModel( + zkClientProvider, config.getName(), stateDirectory, historyWriter, eventSenders); } catch (IOException e) { throw Throwables.propagate(e); } diff --git a/helios-services/src/main/java/com/spotify/helios/master/MasterConfig.java b/helios-services/src/main/java/com/spotify/helios/master/MasterConfig.java index f494f3895..92d34d998 100644 --- a/helios-services/src/main/java/com/spotify/helios/master/MasterConfig.java +++ b/helios-services/src/main/java/com/spotify/helios/master/MasterConfig.java @@ -17,21 +17,19 @@ package com.spotify.helios.master; -import com.google.common.collect.ImmutableSet; - +import com.spotify.helios.servicescommon.CommonConfiguration; import com.spotify.helios.servicescommon.FastForwardConfig; +import com.google.common.collect.ImmutableSet; + import java.net.InetSocketAddress; import java.nio.file.Path; -import java.util.List; import java.util.Set; -import io.dropwizard.Configuration; - /** * The collection of the configuration info of the master. */ -public class MasterConfig extends Configuration { +public class MasterConfig extends CommonConfiguration { // TODO (dano): defaults @@ -50,8 +48,6 @@ public class MasterConfig extends Configuration { private boolean noZooKeeperMasterRegistration; private InetSocketAddress adminEndpoint; private InetSocketAddress httpEndpoint; - private List kafkaBrokers; - private List pubsubPrefixes; private Path stateDirectory; private boolean zooKeeperEnableAcls; private String zookeeperAclAgentUser; @@ -191,24 +187,6 @@ public MasterConfig setHttpEndpoint(InetSocketAddress httpEndpoint) { return this; } - public List getKafkaBrokers() { - return kafkaBrokers; - } - - public MasterConfig setKafkaBrokers(List kafkaBrokers) { - this.kafkaBrokers = kafkaBrokers; - return this; - } - - public List getPubsubPrefixes() { - return pubsubPrefixes; - } - - public MasterConfig setPubsubPrefixes(List pubsubPrefixes) { - this.pubsubPrefixes = pubsubPrefixes; - return this; - } - public Path getStateDirectory() { return stateDirectory; } diff --git a/helios-services/src/main/java/com/spotify/helios/master/MasterService.java b/helios-services/src/main/java/com/spotify/helios/master/MasterService.java index d78ccc82c..a20a522b5 100644 --- a/helios-services/src/main/java/com/spotify/helios/master/MasterService.java +++ b/helios-services/src/main/java/com/spotify/helios/master/MasterService.java @@ -41,10 +41,8 @@ import com.spotify.helios.serviceregistration.ServiceRegistrar; import com.spotify.helios.serviceregistration.ServiceRegistration; import com.spotify.helios.servicescommon.EventSender; +import com.spotify.helios.servicescommon.EventSenderFactory; import com.spotify.helios.servicescommon.FastForwardConfig; -import com.spotify.helios.servicescommon.GooglePubSubProvider; -import com.spotify.helios.servicescommon.KafkaClientProvider; -import com.spotify.helios.servicescommon.KafkaSender; import com.spotify.helios.servicescommon.ManagedStatsdReporter; import com.spotify.helios.servicescommon.ReactorFactory; import com.spotify.helios.servicescommon.RiemannFacade; @@ -68,7 +66,6 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.jvm.GarbageCollectorMetricSet; import com.codahale.metrics.jvm.MemoryUsageGaugeSet; - import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -88,7 +85,6 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.zookeeper.data.ACL; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; @@ -192,10 +188,6 @@ public MasterService(final MasterConfig config, riemannFacade, metrics.getZooKeeperMetrics()); final ZooKeeperClientProvider zkClientProvider = new ZooKeeperClientProvider( zooKeeperClient, modelReporter); - final KafkaClientProvider kafkaClientProvider = new KafkaClientProvider( - config.getKafkaBrokers()); - final GooglePubSubProvider googlePubSubProvider = new GooglePubSubProvider( - config.getPubsubPrefixes()); // Create state directory, if necessary final Path stateDirectory = config.getStateDirectory().toAbsolutePath().normalize(); @@ -208,21 +200,10 @@ public MasterService(final MasterConfig config, } } - final ImmutableList.Builder eventSenders = ImmutableList.builder(); - - // Make a KafkaProducer for events that can be serialized to an array of bytes, - // and wrap it in our KafkaSender. - final Optional> kafkaProducer = - kafkaClientProvider.getDefaultProducer(); - if (kafkaProducer.isPresent()) { - eventSenders.add(new KafkaSender(kafkaProducer)); - } - - // GooglePubsub senders - eventSenders.addAll(googlePubSubProvider.senders()); + final List eventSenders = new EventSenderFactory(config, false).get(); final ZooKeeperMasterModel model = - new ZooKeeperMasterModel(zkClientProvider, config.getName(), eventSenders.build()); + new ZooKeeperMasterModel(zkClientProvider, config.getName(), eventSenders); final ZooKeeperHealthChecker zooKeeperHealthChecker = new ZooKeeperHealthChecker( zooKeeperClient, Paths.statusMasters(), riemannFacade, TimeUnit.MINUTES, 2); diff --git a/helios-services/src/main/java/com/spotify/helios/servicescommon/CommonConfiguration.java b/helios-services/src/main/java/com/spotify/helios/servicescommon/CommonConfiguration.java new file mode 100644 index 000000000..865cee572 --- /dev/null +++ b/helios-services/src/main/java/com/spotify/helios/servicescommon/CommonConfiguration.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2016 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.helios.servicescommon; + +import io.dropwizard.Configuration; + +import java.util.List; + +public class CommonConfiguration> extends Configuration { + + private List kafkaBrokers; + private List pubsubPrefixes; + + public List getKafkaBrokers() { + return kafkaBrokers; + } + + @SuppressWarnings("unchecked") + public C setKafkaBrokers(List kafkaBrokers) { + this.kafkaBrokers = kafkaBrokers; + return (C) this; + } + + public List getPubsubPrefixes() { + return pubsubPrefixes; + } + + @SuppressWarnings("unchecked") + public C setPubsubPrefixes(List pubsubPrefixes) { + this.pubsubPrefixes = pubsubPrefixes; + return (C) this; + } +} diff --git a/helios-services/src/main/java/com/spotify/helios/servicescommon/EventSender.java b/helios-services/src/main/java/com/spotify/helios/servicescommon/EventSender.java index 1f36127fd..d51124b9d 100644 --- a/helios-services/src/main/java/com/spotify/helios/servicescommon/EventSender.java +++ b/helios-services/src/main/java/com/spotify/helios/servicescommon/EventSender.java @@ -18,5 +18,12 @@ package com.spotify.helios.servicescommon; public interface EventSender { + void send(String topic, byte[] message); + + /** + * Tests if the sender is healthy. Provides a way for callers to filter out unhealthy event + * senders to avoid sending events via unhealthy senders. + */ + boolean isHealthy(); } diff --git a/helios-services/src/main/java/com/spotify/helios/servicescommon/EventSenderFactory.java b/helios-services/src/main/java/com/spotify/helios/servicescommon/EventSenderFactory.java new file mode 100644 index 000000000..a1ad6339c --- /dev/null +++ b/helios-services/src/main/java/com/spotify/helios/servicescommon/EventSenderFactory.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2016 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.helios.servicescommon; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +public class EventSenderFactory implements Supplier> { + + private static final Logger log = LoggerFactory.getLogger(EventSenderFactory.class); + + private final CommonConfiguration config; + private final boolean performHealthchecks; + + public EventSenderFactory(final CommonConfiguration config, + final boolean performHealthchecks) { + this.config = config; + this.performHealthchecks = performHealthchecks; + } + + @Override + public List get() { + final List senders = new ArrayList<>(); + + final KafkaClientProvider kafkaClientProvider = + new KafkaClientProvider(config.getKafkaBrokers()); + + final Optional> kafkaProducer = + kafkaClientProvider.getDefaultProducer(); + + if (kafkaProducer.isPresent()) { + senders.add(new KafkaSender(kafkaProducer)); + } + + final GooglePubSubProvider googlePubSubProvider = + new GooglePubSubProvider(config.getPubsubPrefixes()); + + senders.addAll(googlePubSubProvider.senders()); + + if (performHealthchecks) { + // filter out any senders that fail the healthcheck + senders.removeIf(sender -> !sender.isHealthy()); + } + + log.info("health eventSenders: {}", senders); + + return senders; + } +} diff --git a/helios-services/src/main/java/com/spotify/helios/servicescommon/GooglePubSubProvider.java b/helios-services/src/main/java/com/spotify/helios/servicescommon/GooglePubSubProvider.java index 710290131..af9350f7c 100644 --- a/helios-services/src/main/java/com/spotify/helios/servicescommon/GooglePubSubProvider.java +++ b/helios-services/src/main/java/com/spotify/helios/servicescommon/GooglePubSubProvider.java @@ -17,18 +17,18 @@ package com.spotify.helios.servicescommon; +import static java.util.Collections.emptyList; +import static java.util.stream.Collectors.toList; + import com.google.cloud.pubsub.PubSub; import com.google.cloud.pubsub.PubSubOptions; - +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.function.Supplier; -import static java.util.Collections.emptyList; -import static java.util.stream.Collectors.toList; - public class GooglePubSubProvider { private static final Logger log = LoggerFactory.getLogger(GooglePubSubProvider.class); @@ -42,18 +42,18 @@ public List senders() { return senders(() -> PubSubOptions.getDefaultInstance().getService()); } - public List senders(Supplier pubsubSupplier) { - if (pubsubPrefixes != null && !pubsubPrefixes.isEmpty()) { - try { - final PubSub pubsub = pubsubSupplier.get(); - return pubsubPrefixes.stream() - .map(prefix -> new GooglePubSubSender(pubsub, prefix)) - .collect(toList()); - } catch (Exception e) { - log.warn("Failed to set up google pubsub service", e); - return emptyList(); - } - } else { + @VisibleForTesting + List senders(Supplier pubsubSupplier) { + if (pubsubPrefixes == null || pubsubPrefixes.isEmpty()) { + return emptyList(); + } + try { + final PubSub pubsub = pubsubSupplier.get(); + return pubsubPrefixes.stream() + .map(prefix -> new GooglePubSubSender(pubsub, prefix)) + .collect(toList()); + } catch (Exception e) { + log.warn("Failed to set up google pubsub service", e); return emptyList(); } } diff --git a/helios-services/src/main/java/com/spotify/helios/servicescommon/GooglePubSubSender.java b/helios-services/src/main/java/com/spotify/helios/servicescommon/GooglePubSubSender.java index 20fba24fd..1a8845209 100644 --- a/helios-services/src/main/java/com/spotify/helios/servicescommon/GooglePubSubSender.java +++ b/helios-services/src/main/java/com/spotify/helios/servicescommon/GooglePubSubSender.java @@ -40,6 +40,23 @@ public GooglePubSubSender(final PubSub pubSub, final String topicPrefix) { this.topicPrefix = topicPrefix; } + @Override + public boolean isHealthy() { + final String topic = topicPrefix + "canary"; + try { + // perform a blocking call to see if we can connect to pubsub at all + // if the topic does not exist, this method returns null and does not throw an exception + pubsub.getTopic(topic); + log.info("successfully checked if topic {} exists - this instance is healthy", topic); + return true; + } catch (RuntimeException ex) { + // PubSubException is an instance of RuntimeException, catch any other subtypes too + log.warn("caught exception checking if topic {} exists - this instance is unhealthy", + topic, ex); + return false; + } + } + @Override public void send(final String topic, final byte[] message) { final String combinedTopic = topicPrefix + topic; diff --git a/helios-services/src/main/java/com/spotify/helios/servicescommon/KafkaSender.java b/helios-services/src/main/java/com/spotify/helios/servicescommon/KafkaSender.java index 140c3421e..cd53179c6 100644 --- a/helios-services/src/main/java/com/spotify/helios/servicescommon/KafkaSender.java +++ b/helios-services/src/main/java/com/spotify/helios/servicescommon/KafkaSender.java @@ -39,7 +39,12 @@ public KafkaSender(final Optional> kafkaProducer) this.kafkaProducer = kafkaProducer; } - public void send(final KafkaRecord kafkaRecord) { + @Override + public boolean isHealthy() { + return true; + } + + private void send(final KafkaRecord kafkaRecord) { if (kafkaProducer.isPresent()) { final ProducerRecord record = new ProducerRecord<>(kafkaRecord.getKafkaTopic(), kafkaRecord.getKafkaData()); diff --git a/helios-services/src/test/java/com/spotify/helios/servicescommon/GooglePubSubSenderTest.java b/helios-services/src/test/java/com/spotify/helios/servicescommon/GooglePubSubSenderTest.java index 17d2ee832..95e09aba8 100644 --- a/helios-services/src/test/java/com/spotify/helios/servicescommon/GooglePubSubSenderTest.java +++ b/helios-services/src/test/java/com/spotify/helios/servicescommon/GooglePubSubSenderTest.java @@ -17,18 +17,26 @@ package com.spotify.helios.servicescommon; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import com.google.cloud.pubsub.Message; import com.google.cloud.pubsub.PubSub; - +import com.google.cloud.pubsub.PubSubException; +import com.google.cloud.pubsub.Topic; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.verify; +import java.io.IOException; public class GooglePubSubSenderTest { + private PubSub pubsub; private String prefix = "prefix."; private GooglePubSubSender sender; @@ -46,4 +54,24 @@ public void testSend() throws Exception { verify(pubsub).publishAsync(eq(prefix + topic), any(Message.class)); } + + + @Test + public void testIsHealthy() { + final GooglePubSubSender sender = new GooglePubSubSender(pubsub, "foo."); + + when(pubsub.getTopic("foo.canary")).thenReturn(mock(Topic.class)); + + assertThat(sender.isHealthy(), is(true)); + } + + @Test + public void testIsUnhealthy() { + final GooglePubSubSender sender = new GooglePubSubSender(pubsub, "foo."); + + when(pubsub.getTopic("foo.canary")) + .thenThrow(new PubSubException(new IOException("oops"), false)); + + assertThat(sender.isHealthy(), is(false)); + } }