Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

Commit

Permalink
disable GooglePubSubSender if pubsub cannot be reached
Browse files Browse the repository at this point in the history
Adds a `isHealthy()` method to the EventSender interface that is used to
filter out any unhealthy senders at startup.

This logic is done in a new class named EventSenderFactory, which
extracts the identical logic for constructing Lists of EventSenders from
the MasterService and the AgentService. To make this common class
possible, also extracted a class for the common configuration fields
between AgentConfig and MasterConfig. This new CommonConfiguration class
is incomplete - more fields can be moved to this class to avoid
duplication, but I left that for future commits.

I created EventSenderFactory in hopes of writing a test for the logic
that unhealthy senders are removed from the List, but the nature of this
class makes this test to impractical - since the
List-of-EventSender-building involves constructing new instances of
KafkaProvider/GooglePubSubProvider/etc and calling methods on instances
that those instances return, which is not really feasible to be mocked
out in a test.
  • Loading branch information
mattnworb committed Dec 1, 2016
1 parent 8cd160e commit 82bd16c
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
package com.spotify.helios.agent;

import com.spotify.docker.client.DockerHost;
import com.spotify.helios.servicescommon.CommonConfiguration;
import com.spotify.helios.servicescommon.FastForwardConfig;

import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;

import io.dropwizard.Configuration;

/**
* The configuration of the Helios agent.
*/
public class AgentConfig extends Configuration {
public class AgentConfig extends CommonConfiguration<AgentConfig> {

private String domain;
private String name;
Expand All @@ -57,8 +56,6 @@ public class AgentConfig extends Configuration {
private InetSocketAddress httpEndpoint;
private boolean noHttp;
private List<String> binds;
private List<String> kafkaBrokers;
private List<String> pubsubPrefixes;
private Map<String, String> labels;
private boolean zooKeeperEnableAcls;
private String zookeeperAclMasterUser;
Expand Down Expand Up @@ -289,24 +286,6 @@ public AgentConfig setBinds(List<String> binds) {
return this;
}

public List<String> getKafkaBrokers() {
return kafkaBrokers;
}

public AgentConfig setKafkaBrokers(List<String> kafkaBrokers) {
this.kafkaBrokers = kafkaBrokers;
return this;
}

public List<String> getPubsubPrefixes() {
return pubsubPrefixes;
}

public AgentConfig setPubsubPrefixes(List<String> pubsubPrefixes) {
this.pubsubPrefixes = pubsubPrefixes;
return this;
}

public Map<String, String> getLabels() {
return labels;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@
import com.spotify.helios.master.metrics.HealthCheckGauge;
import com.spotify.helios.serviceregistration.ServiceRegistrar;
import com.spotify.helios.servicescommon.EventSender;
import com.spotify.helios.servicescommon.EventSenderFactory;
import com.spotify.helios.servicescommon.FastForwardConfig;
import com.spotify.helios.servicescommon.GooglePubSubProvider;
import com.spotify.helios.servicescommon.KafkaClientProvider;
import com.spotify.helios.servicescommon.KafkaSender;
import com.spotify.helios.servicescommon.ManagedStatsdReporter;
import com.spotify.helios.servicescommon.PersistentAtomicReference;
import com.spotify.helios.servicescommon.ReactorFactory;
Expand Down Expand Up @@ -70,7 +68,6 @@
import com.google.common.base.Strings;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
Expand All @@ -84,7 +81,6 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.eclipse.jetty.server.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -97,7 +93,6 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -241,24 +236,8 @@ public AgentService(final AgentConfig config, final Environment environment)
new ZooKeeperModelReporter(riemannFacade, metrics.getZooKeeperMetrics());
final ZooKeeperClientProvider zkClientProvider = new ZooKeeperClientProvider(
zooKeeperClient, modelReporter);
final KafkaClientProvider kafkaClientProvider = new KafkaClientProvider(
config.getKafkaBrokers());

final GooglePubSubProvider googlePubSubProvider = new GooglePubSubProvider(
config.getPubsubPrefixes());

final ImmutableList.Builder<EventSender> eventSenders = ImmutableList.builder();

// Make a KafkaProducer for events that can be serialized to an array of bytes,
// and wrap it in our KafkaSender.
final Optional<KafkaProducer<String, byte[]>> kafkaProducer =
kafkaClientProvider.getDefaultProducer();
if (kafkaProducer.isPresent()) {
eventSenders.add(new KafkaSender(kafkaProducer));
}

// GooglePubsub senders
eventSenders.addAll(googlePubSubProvider.senders());
final List<EventSender> eventSenders = new EventSenderFactory(config, true).get();

final TaskHistoryWriter historyWriter;
if (config.isJobHistoryDisabled()) {
Expand All @@ -269,9 +248,8 @@ public AgentService(final AgentConfig config, final Environment environment)
}

try {
this.model = new ZooKeeperAgentModel(zkClientProvider,
config.getName(), stateDirectory, historyWriter,
eventSenders.build());
this.model = new ZooKeeperAgentModel(
zkClientProvider, config.getName(), stateDirectory, historyWriter, eventSenders);
} catch (IOException e) {
throw Throwables.propagate(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@

package com.spotify.helios.master;

import com.google.common.collect.ImmutableSet;

import com.spotify.helios.servicescommon.CommonConfiguration;
import com.spotify.helios.servicescommon.FastForwardConfig;

import com.google.common.collect.ImmutableSet;

import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;

import io.dropwizard.Configuration;

/**
* The collection of the configuration info of the master.
*/
public class MasterConfig extends Configuration {
public class MasterConfig extends CommonConfiguration<MasterConfig> {

// TODO (dano): defaults

Expand All @@ -50,8 +48,6 @@ public class MasterConfig extends Configuration {
private boolean noZooKeeperMasterRegistration;
private InetSocketAddress adminEndpoint;
private InetSocketAddress httpEndpoint;
private List<String> kafkaBrokers;
private List<String> pubsubPrefixes;
private Path stateDirectory;
private boolean zooKeeperEnableAcls;
private String zookeeperAclAgentUser;
Expand Down Expand Up @@ -191,24 +187,6 @@ public MasterConfig setHttpEndpoint(InetSocketAddress httpEndpoint) {
return this;
}

public List<String> getKafkaBrokers() {
return kafkaBrokers;
}

public MasterConfig setKafkaBrokers(List<String> kafkaBrokers) {
this.kafkaBrokers = kafkaBrokers;
return this;
}

public List<String> getPubsubPrefixes() {
return pubsubPrefixes;
}

public MasterConfig setPubsubPrefixes(List<String> pubsubPrefixes) {
this.pubsubPrefixes = pubsubPrefixes;
return this;
}

public Path getStateDirectory() {
return stateDirectory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@
import com.spotify.helios.serviceregistration.ServiceRegistrar;
import com.spotify.helios.serviceregistration.ServiceRegistration;
import com.spotify.helios.servicescommon.EventSender;
import com.spotify.helios.servicescommon.EventSenderFactory;
import com.spotify.helios.servicescommon.FastForwardConfig;
import com.spotify.helios.servicescommon.GooglePubSubProvider;
import com.spotify.helios.servicescommon.KafkaClientProvider;
import com.spotify.helios.servicescommon.KafkaSender;
import com.spotify.helios.servicescommon.ManagedStatsdReporter;
import com.spotify.helios.servicescommon.ReactorFactory;
import com.spotify.helios.servicescommon.RiemannFacade;
Expand All @@ -68,7 +66,6 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;

import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
Expand All @@ -88,7 +85,6 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.zookeeper.data.ACL;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
Expand Down Expand Up @@ -192,10 +188,6 @@ public MasterService(final MasterConfig config,
riemannFacade, metrics.getZooKeeperMetrics());
final ZooKeeperClientProvider zkClientProvider = new ZooKeeperClientProvider(
zooKeeperClient, modelReporter);
final KafkaClientProvider kafkaClientProvider = new KafkaClientProvider(
config.getKafkaBrokers());
final GooglePubSubProvider googlePubSubProvider = new GooglePubSubProvider(
config.getPubsubPrefixes());

// Create state directory, if necessary
final Path stateDirectory = config.getStateDirectory().toAbsolutePath().normalize();
Expand All @@ -208,21 +200,10 @@ public MasterService(final MasterConfig config,
}
}

final ImmutableList.Builder<EventSender> eventSenders = ImmutableList.builder();

// Make a KafkaProducer for events that can be serialized to an array of bytes,
// and wrap it in our KafkaSender.
final Optional<KafkaProducer<String, byte[]>> kafkaProducer =
kafkaClientProvider.getDefaultProducer();
if (kafkaProducer.isPresent()) {
eventSenders.add(new KafkaSender(kafkaProducer));
}

// GooglePubsub senders
eventSenders.addAll(googlePubSubProvider.senders());
final List<EventSender> eventSenders = new EventSenderFactory(config, false).get();

final ZooKeeperMasterModel model =
new ZooKeeperMasterModel(zkClientProvider, config.getName(), eventSenders.build());
new ZooKeeperMasterModel(zkClientProvider, config.getName(), eventSenders);

final ZooKeeperHealthChecker zooKeeperHealthChecker = new ZooKeeperHealthChecker(
zooKeeperClient, Paths.statusMasters(), riemannFacade, TimeUnit.MINUTES, 2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2016 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.helios.servicescommon;

import io.dropwizard.Configuration;

import java.util.List;

public class CommonConfiguration<C extends CommonConfiguration<C>> extends Configuration {

private List<String> kafkaBrokers;
private List<String> pubsubPrefixes;

public List<String> getKafkaBrokers() {
return kafkaBrokers;
}

@SuppressWarnings("unchecked")
public C setKafkaBrokers(List<String> kafkaBrokers) {
this.kafkaBrokers = kafkaBrokers;
return (C) this;
}

public List<String> getPubsubPrefixes() {
return pubsubPrefixes;
}

@SuppressWarnings("unchecked")
public C setPubsubPrefixes(List<String> pubsubPrefixes) {
this.pubsubPrefixes = pubsubPrefixes;
return (C) this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,12 @@
package com.spotify.helios.servicescommon;

public interface EventSender {

void send(String topic, byte[] message);

/**
* Tests if the sender is healthy. Provides a way for callers to filter out unhealthy event
* senders to avoid sending events via unhealthy senders.
*/
boolean isHealthy();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2016 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.helios.servicescommon;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

public class EventSenderFactory implements Supplier<List<EventSender>> {

private static final Logger log = LoggerFactory.getLogger(EventSenderFactory.class);

private final CommonConfiguration<?> config;
private final boolean performHealthchecks;

public EventSenderFactory(final CommonConfiguration<?> config,
final boolean performHealthchecks) {
this.config = config;
this.performHealthchecks = performHealthchecks;
}

@Override
public List<EventSender> get() {
final List<EventSender> senders = new ArrayList<>();

final KafkaClientProvider kafkaClientProvider =
new KafkaClientProvider(config.getKafkaBrokers());

final Optional<KafkaProducer<String, byte[]>> kafkaProducer =
kafkaClientProvider.getDefaultProducer();

if (kafkaProducer.isPresent()) {
senders.add(new KafkaSender(kafkaProducer));
}

final GooglePubSubProvider googlePubSubProvider =
new GooglePubSubProvider(config.getPubsubPrefixes());

senders.addAll(googlePubSubProvider.senders());

if (performHealthchecks) {
// filter out any senders that fail the healthcheck
senders.removeIf(sender -> !sender.isHealthy());
}

log.info("health eventSenders: {}", senders);

return senders;
}
}
Loading

0 comments on commit 82bd16c

Please sign in to comment.