Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][proxy] Support disabling metrics endpoint #21031

Merged
merged 3 commits into from
Aug 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ zooKeeperCacheExpirySeconds=-1

### --- Metrics --- ###

# Whether to enable the proxy's /metrics, /proxy-stats, and /status.html http endpoints
enableProxyStatsEndpoints=true
# Whether the '/metrics' endpoint requires authentication. Defaults to true
authenticateMetricsEndpoint=true
# Enable cache metrics data, default value is false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private int authenticationRefreshCheckSeconds = 60;

@FieldContext(
category = CATEGORY_HTTP,
doc = "Whether to enable the proxy's /metrics, /proxy-stats, and /status.html http endpoints"
)
private boolean enableProxyStatsEndpoints = true;

@FieldContext(
category = CATEGORY_AUTHENTICATION,
doc = "Whether the '/metrics' endpoint requires authentication. Defaults to true."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,19 @@ public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider) throws Exception {
if (service != null) {
PrometheusMetricsServlet metricsServlet = service.getMetricsServlet();
if (metricsServlet != null) {
server.addServlet("/metrics", new ServletHolder(metricsServlet),
Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
if (config.isEnableProxyStatsEndpoints()) {
server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(),
VipStatus.class);
server.addRestResource("/proxy-stats", ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service,
ProxyStats.class);
if (service != null) {
PrometheusMetricsServlet metricsServlet = service.getMetricsServlet();
if (metricsServlet != null) {
server.addServlet("/metrics", new ServletHolder(metricsServlet),
Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
}
}
}
server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(), VipStatus.class);
server.addRestResource("/proxy-stats", ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service, ProxyStats.class);

AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,20 @@ public void addServlet(String basePath, ServletHolder servletHolder, List<Pair<S

public void addServlet(String basePath, ServletHolder servletHolder,
List<Pair<String, Object>> attributes, boolean requireAuthentication) {
addServlet(basePath, servletHolder, attributes, requireAuthentication, true);
}

private void addServlet(String basePath, ServletHolder servletHolder,
List<Pair<String, Object>> attributes, boolean requireAuthentication, boolean checkForExistingPaths) {
popularServletParams(servletHolder, config);

Optional<String> existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst();
if (existingPath.isPresent()) {
throw new IllegalArgumentException(
String.format("Cannot add servlet at %s, path %s already exists", basePath, existingPath.get()));
if (checkForExistingPaths) {
Optional<String> existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst();
if (existingPath.isPresent()) {
throw new IllegalArgumentException(
String.format("Cannot add servlet at %s, path %s already exists", basePath,
existingPath.get()));
}
}
servletPaths.add(basePath);

Expand Down Expand Up @@ -237,11 +245,9 @@ public void addRestResource(String basePath, String attribute, Object attributeV
config.register(JsonMapperProvider.class);
ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
servletHolder.setAsyncSupported(true);
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(basePath);
context.addServlet(servletHolder, MATCH_ALL);
context.setAttribute(attribute, attributeValue);
handlers.add(context);
// This method has not historically checked for existing paths, so we don't check here either. The
// method call is added to reduce code duplication.
addServlet(basePath, servletHolder, Collections.singletonList(Pair.of(attribute, attributeValue)), true, false);
}

public int getExternalServicePort() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.proxy.stats;

import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.channel.Channel;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
Expand All @@ -27,7 +28,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand All @@ -36,26 +40,35 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/")
@Api(value = "/proxy-stats", description = "Stats for proxy", tags = "proxy-stats", hidden = true)
@Produces(MediaType.APPLICATION_JSON)
public class ProxyStats {

private static final Logger log = LoggerFactory.getLogger(ProxyStats.class);
public static final String ATTRIBUTE_PULSAR_PROXY_NAME = "pulsar-proxy";

private ProxyService service;

@Context
protected ServletContext servletContext;
@Context
protected HttpServletRequest httpRequest;

@GET
@Path("/connections")
@ApiOperation(value = "Proxy stats api to get info for live connections",
response = List.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 503, message = "Proxy service is not initialized") })
public List<ConnectionStats> metrics() {
throwIfNotSuperUser("metrics");
List<ConnectionStats> stats = new ArrayList<>();
proxyService().getClientCnxs().forEach(cnx -> {
if (cnx.getDirectProxyHandler() == null) {
Expand All @@ -76,7 +89,7 @@ public List<ConnectionStats> metrics() {
@ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy logging should be > 2 to capture topic stats"),
@ApiResponse(code = 503, message = "Proxy service is not initialized") })
public Map<String, TopicStats> topics() {

throwIfNotSuperUser("topics");
Optional<Integer> logLevel = proxyService().getConfiguration().getProxyLogLevel();
if (!logLevel.isPresent() || logLevel.get() < 2) {
throw new RestException(Status.PRECONDITION_FAILED, "Proxy doesn't have logging level 2");
Expand All @@ -90,6 +103,7 @@ public Map<String, TopicStats> topics() {
notes = "It only changes log-level in memory, change it config file to persist the change")
@ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy log level can be [0-2]"), })
public void updateProxyLogLevel(@PathParam("logLevel") int logLevel) {
throwIfNotSuperUser("updateProxyLogLevel");
if (logLevel < 0 || logLevel > 2) {
throw new RestException(Status.PRECONDITION_FAILED, "Proxy log level can be only [0-2]");
}
Expand All @@ -100,6 +114,7 @@ public void updateProxyLogLevel(@PathParam("logLevel") int logLevel) {
@Path("/logging")
@ApiOperation(hidden = true, value = "Get proxy logging")
public int getProxyLogLevel(@PathParam("logLevel") int logLevel) {
throwIfNotSuperUser("getProxyLogLevel");
return proxyService().getProxyLogLevel();
}

Expand All @@ -112,4 +127,26 @@ protected ProxyService proxyService() {
}
return service;
}

private void throwIfNotSuperUser(String action) {
if (proxyService().getConfiguration().isAuthorizationEnabled()) {
AuthenticationParameters authParams = AuthenticationParameters.builder()
.clientRole((String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName))
.clientAuthenticationDataSource((AuthenticationDataSource)
httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName))
.build();
try {
if (authParams.getClientRole() == null
|| !proxyService().getAuthorizationService().isSuperUser(authParams).get(30, SECONDS)) {
log.error("Client with role [{}] is not authorized to {}", authParams.getClientRole(), action);
throw new org.apache.pulsar.common.util.RestException(Status.UNAUTHORIZED,
"Client is not authorized to perform operation");
}
} catch (ExecutionException | TimeoutException | InterruptedException e) {
log.warn("Time-out {} sec while checking the role {} is a super user role ", 30,
authParams.getClientRole());
throw new org.apache.pulsar.common.util.RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
}
}