Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

disable GooglePubSubSender if pubsub cannot be reached #1030

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
package com.spotify.helios.agent;

import com.spotify.docker.client.DockerHost;
import com.spotify.helios.servicescommon.CommonConfiguration;
import com.spotify.helios.servicescommon.FastForwardConfig;

import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;

import io.dropwizard.Configuration;

/**
* The configuration of the Helios agent.
*/
public class AgentConfig extends Configuration {
public class AgentConfig extends CommonConfiguration<AgentConfig> {

private String domain;
private String name;
Expand All @@ -57,8 +56,6 @@ public class AgentConfig extends Configuration {
private InetSocketAddress httpEndpoint;
private boolean noHttp;
private List<String> binds;
private List<String> kafkaBrokers;
private List<String> pubsubPrefixes;
private Map<String, String> labels;
private boolean zooKeeperEnableAcls;
private String zookeeperAclMasterUser;
Expand Down Expand Up @@ -289,24 +286,6 @@ public AgentConfig setBinds(List<String> binds) {
return this;
}

public List<String> getKafkaBrokers() {
return kafkaBrokers;
}

public AgentConfig setKafkaBrokers(List<String> kafkaBrokers) {
this.kafkaBrokers = kafkaBrokers;
return this;
}

public List<String> getPubsubPrefixes() {
return pubsubPrefixes;
}

public AgentConfig setPubsubPrefixes(List<String> pubsubPrefixes) {
this.pubsubPrefixes = pubsubPrefixes;
return this;
}

public Map<String, String> getLabels() {
return labels;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@
import com.spotify.helios.master.metrics.HealthCheckGauge;
import com.spotify.helios.serviceregistration.ServiceRegistrar;
import com.spotify.helios.servicescommon.EventSender;
import com.spotify.helios.servicescommon.EventSenderFactory;
import com.spotify.helios.servicescommon.FastForwardConfig;
import com.spotify.helios.servicescommon.GooglePubSubProvider;
import com.spotify.helios.servicescommon.KafkaClientProvider;
import com.spotify.helios.servicescommon.KafkaSender;
import com.spotify.helios.servicescommon.ManagedStatsdReporter;
import com.spotify.helios.servicescommon.PersistentAtomicReference;
import com.spotify.helios.servicescommon.ReactorFactory;
Expand Down Expand Up @@ -70,7 +68,6 @@
import com.google.common.base.Strings;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
Expand All @@ -84,7 +81,6 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.eclipse.jetty.server.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -97,7 +93,6 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -241,24 +236,8 @@ public AgentService(final AgentConfig config, final Environment environment)
new ZooKeeperModelReporter(riemannFacade, metrics.getZooKeeperMetrics());
final ZooKeeperClientProvider zkClientProvider = new ZooKeeperClientProvider(
zooKeeperClient, modelReporter);
final KafkaClientProvider kafkaClientProvider = new KafkaClientProvider(
config.getKafkaBrokers());

final GooglePubSubProvider googlePubSubProvider = new GooglePubSubProvider(
config.getPubsubPrefixes());

final ImmutableList.Builder<EventSender> eventSenders = ImmutableList.builder();

// Make a KafkaProducer for events that can be serialized to an array of bytes,
// and wrap it in our KafkaSender.
final Optional<KafkaProducer<String, byte[]>> kafkaProducer =
kafkaClientProvider.getDefaultProducer();
if (kafkaProducer.isPresent()) {
eventSenders.add(new KafkaSender(kafkaProducer));
}

// GooglePubsub senders
eventSenders.addAll(googlePubSubProvider.senders());
final List<EventSender> eventSenders = new EventSenderFactory(config, true).get();

final TaskHistoryWriter historyWriter;
if (config.isJobHistoryDisabled()) {
Expand All @@ -269,9 +248,8 @@ public AgentService(final AgentConfig config, final Environment environment)
}

try {
this.model = new ZooKeeperAgentModel(zkClientProvider,
config.getName(), stateDirectory, historyWriter,
eventSenders.build());
this.model = new ZooKeeperAgentModel(
zkClientProvider, config.getName(), stateDirectory, historyWriter, eventSenders);
} catch (IOException e) {
throw Throwables.propagate(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@

package com.spotify.helios.master;

import com.google.common.collect.ImmutableSet;

import com.spotify.helios.servicescommon.CommonConfiguration;
import com.spotify.helios.servicescommon.FastForwardConfig;

import com.google.common.collect.ImmutableSet;

import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;

import io.dropwizard.Configuration;

/**
* The collection of the configuration info of the master.
*/
public class MasterConfig extends Configuration {
public class MasterConfig extends CommonConfiguration<MasterConfig> {

// TODO (dano): defaults

Expand All @@ -50,8 +48,6 @@ public class MasterConfig extends Configuration {
private boolean noZooKeeperMasterRegistration;
private InetSocketAddress adminEndpoint;
private InetSocketAddress httpEndpoint;
private List<String> kafkaBrokers;
private List<String> pubsubPrefixes;
private Path stateDirectory;
private boolean zooKeeperEnableAcls;
private String zookeeperAclAgentUser;
Expand Down Expand Up @@ -191,24 +187,6 @@ public MasterConfig setHttpEndpoint(InetSocketAddress httpEndpoint) {
return this;
}

public List<String> getKafkaBrokers() {
return kafkaBrokers;
}

public MasterConfig setKafkaBrokers(List<String> kafkaBrokers) {
this.kafkaBrokers = kafkaBrokers;
return this;
}

public List<String> getPubsubPrefixes() {
return pubsubPrefixes;
}

public MasterConfig setPubsubPrefixes(List<String> pubsubPrefixes) {
this.pubsubPrefixes = pubsubPrefixes;
return this;
}

public Path getStateDirectory() {
return stateDirectory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@
import com.spotify.helios.serviceregistration.ServiceRegistrar;
import com.spotify.helios.serviceregistration.ServiceRegistration;
import com.spotify.helios.servicescommon.EventSender;
import com.spotify.helios.servicescommon.EventSenderFactory;
import com.spotify.helios.servicescommon.FastForwardConfig;
import com.spotify.helios.servicescommon.GooglePubSubProvider;
import com.spotify.helios.servicescommon.KafkaClientProvider;
import com.spotify.helios.servicescommon.KafkaSender;
import com.spotify.helios.servicescommon.ManagedStatsdReporter;
import com.spotify.helios.servicescommon.ReactorFactory;
import com.spotify.helios.servicescommon.RiemannFacade;
Expand All @@ -68,7 +66,6 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;

import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
Expand All @@ -88,7 +85,6 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.zookeeper.data.ACL;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
Expand Down Expand Up @@ -192,10 +188,6 @@ public MasterService(final MasterConfig config,
riemannFacade, metrics.getZooKeeperMetrics());
final ZooKeeperClientProvider zkClientProvider = new ZooKeeperClientProvider(
zooKeeperClient, modelReporter);
final KafkaClientProvider kafkaClientProvider = new KafkaClientProvider(
config.getKafkaBrokers());
final GooglePubSubProvider googlePubSubProvider = new GooglePubSubProvider(
config.getPubsubPrefixes());

// Create state directory, if necessary
final Path stateDirectory = config.getStateDirectory().toAbsolutePath().normalize();
Expand All @@ -208,21 +200,10 @@ public MasterService(final MasterConfig config,
}
}

final ImmutableList.Builder<EventSender> eventSenders = ImmutableList.builder();

// Make a KafkaProducer for events that can be serialized to an array of bytes,
// and wrap it in our KafkaSender.
final Optional<KafkaProducer<String, byte[]>> kafkaProducer =
kafkaClientProvider.getDefaultProducer();
if (kafkaProducer.isPresent()) {
eventSenders.add(new KafkaSender(kafkaProducer));
}

// GooglePubsub senders
eventSenders.addAll(googlePubSubProvider.senders());
final List<EventSender> eventSenders = new EventSenderFactory(config, false).get();

final ZooKeeperMasterModel model =
new ZooKeeperMasterModel(zkClientProvider, config.getName(), eventSenders.build());
new ZooKeeperMasterModel(zkClientProvider, config.getName(), eventSenders);

final ZooKeeperHealthChecker zooKeeperHealthChecker = new ZooKeeperHealthChecker(
zooKeeperClient, Paths.statusMasters(), riemannFacade, TimeUnit.MINUTES, 2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2016 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.helios.servicescommon;

import io.dropwizard.Configuration;

import java.util.List;

public class CommonConfiguration<C extends CommonConfiguration<C>> extends Configuration {

private List<String> kafkaBrokers;
private List<String> pubsubPrefixes;

public List<String> getKafkaBrokers() {
return kafkaBrokers;
}

@SuppressWarnings("unchecked")
public C setKafkaBrokers(List<String> kafkaBrokers) {
this.kafkaBrokers = kafkaBrokers;
return (C) this;
}

public List<String> getPubsubPrefixes() {
return pubsubPrefixes;
}

@SuppressWarnings("unchecked")
public C setPubsubPrefixes(List<String> pubsubPrefixes) {
this.pubsubPrefixes = pubsubPrefixes;
return (C) this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,12 @@
package com.spotify.helios.servicescommon;

public interface EventSender {

void send(String topic, byte[] message);

/**
* Tests if the sender is healthy. Provides a way for callers to filter out unhealthy event
* senders to avoid sending events via unhealthy senders.
*/
boolean isHealthy();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2016 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.helios.servicescommon;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

public class EventSenderFactory implements Supplier<List<EventSender>> {

private static final Logger log = LoggerFactory.getLogger(EventSenderFactory.class);

private final CommonConfiguration<?> config;
private final boolean performHealthchecks;

public EventSenderFactory(final CommonConfiguration<?> config,
final boolean performHealthchecks) {
this.config = config;
this.performHealthchecks = performHealthchecks;
}

@Override
public List<EventSender> get() {
final List<EventSender> senders = new ArrayList<>();

final KafkaClientProvider kafkaClientProvider =
new KafkaClientProvider(config.getKafkaBrokers());

final Optional<KafkaProducer<String, byte[]>> kafkaProducer =
kafkaClientProvider.getDefaultProducer();

if (kafkaProducer.isPresent()) {
senders.add(new KafkaSender(kafkaProducer));
}

final GooglePubSubProvider googlePubSubProvider =
new GooglePubSubProvider(config.getPubsubPrefixes());

senders.addAll(googlePubSubProvider.senders());

if (performHealthchecks) {
// filter out any senders that fail the healthcheck
senders.removeIf(sender -> !sender.isHealthy());
}

log.info("health eventSenders: {}", senders);

return senders;
}
}
Loading