Skip to content

Commit

Permalink
Stop filling monitoring queues when processor fails
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Bescos Gascon <[email protected]>
  • Loading branch information
jbescos committed Feb 18, 2021
1 parent fc07c4a commit ebe41ef
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -20,14 +20,14 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import javax.ws.rs.ProcessingException;

import javax.annotation.Priority;
import javax.inject.Inject;
import javax.ws.rs.ProcessingException;

import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.server.internal.LocalizationMessages;
Expand Down Expand Up @@ -68,6 +68,8 @@ public final class MonitoringEventListener implements ApplicationEventListener {
private final Queue<Integer> responseStatuses = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
private final Queue<RequestEvent> exceptionMapperEvents = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
private volatile MonitoringStatisticsProcessor monitoringStatisticsProcessor;
// By default new events can arrive before MonitoringStatisticsProcessor is running.
private final AtomicBoolean processorFailed = new AtomicBoolean(false);

/**
* Time statistics.
Expand Down Expand Up @@ -185,6 +187,7 @@ public void onEvent(final ApplicationEvent event) {
case RELOAD_FINISHED:
case INITIALIZATION_FINISHED:
this.monitoringStatisticsProcessor = new MonitoringStatisticsProcessor(injectionManager, this);
processorFailed.set(false);
this.monitoringStatisticsProcessor.startMonitoringWorker();
break;
case DESTROY_FINISHED:
Expand Down Expand Up @@ -238,13 +241,13 @@ public void onEvent(final RequestEvent event) {
methodStats = new MethodStats(method, methodTimeStart, now - methodTimeStart);
break;
case EXCEPTION_MAPPING_FINISHED:
if (!exceptionMapperEvents.offer(event)) {
if (!offer(exceptionMapperEvents, event)) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_MAPPER());
}
break;
case FINISHED:
if (event.isResponseWritten()) {
if (!responseStatuses.offer(event.getContainerResponse().getStatus())) {
if (!offer(responseStatuses, event.getContainerResponse().getStatus())) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_RESPONSE());
}
}
Expand All @@ -264,8 +267,7 @@ public void onEvent(final RequestEvent event) {
}
sb.setLength(sb.length() - 1);
}

if (!requestQueuedItems.offer(new RequestStats(new TimeStats(requestTimeStart, now - requestTimeStart),
if (!offer(requestQueuedItems, new RequestStats(new TimeStats(requestTimeStart, now - requestTimeStart),
methodStats, sb.toString()))) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_REQUEST());
}
Expand All @@ -274,6 +276,21 @@ public void onEvent(final RequestEvent event) {
}
}

private <T> boolean offer(Queue<T> queue, T event) {
if (!processorFailed.get()) {
return queue.offer(event);
}
// Don't need to warn that the event was not queued because an Exception was thrown by MonitoringStatisticsProcessor
return true;
}

/**
* Invoked by {@link MonitoringStatisticsProcessor} when there is one exception consuming from queues.
*/
void processorFailed() {
processorFailed.set(true);
}

/**
* Get the exception mapper event queue.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -35,6 +35,7 @@
import org.glassfish.jersey.server.ExtendedResourceContext;
import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.server.internal.LocalizationMessages;
import org.glassfish.jersey.server.internal.monitoring.MonitoringEventListener.RequestStats;
import org.glassfish.jersey.server.model.ResourceMethod;
import org.glassfish.jersey.server.model.ResourceModel;
import org.glassfish.jersey.server.monitoring.MonitoringStatisticsListener;
Expand Down Expand Up @@ -94,6 +95,7 @@ public void run() {
processResponseCodeEvents();
processExceptionMapperEvents();
} catch (final Throwable t) {
monitoringEventListener.processorFailed();
LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_MONITORING_STATISTICS_GENERATION(), t);
// rethrowing exception stops further task execution
throw new ProcessingException(LocalizationMessages.ERROR_MONITORING_STATISTICS_GENERATION(), t);
Expand All @@ -120,11 +122,9 @@ public void run() {
private void processExceptionMapperEvents() {
final Queue<RequestEvent> eventQueue = monitoringEventListener.getExceptionMapperEvents();
final FloodingLogger floodingLogger = new FloodingLogger(eventQueue);

while (!eventQueue.isEmpty()) {
RequestEvent event = null;
while ((event = eventQueue.poll()) != null) {
floodingLogger.conditionallyLogFlooding();

final RequestEvent event = eventQueue.remove();
final ExceptionMapperStatisticsImpl.Builder mapperStats = statisticsBuilder.getExceptionMapperStatisticsBuilder();

if (event.getExceptionMapper() != null) {
Expand All @@ -138,12 +138,9 @@ private void processExceptionMapperEvents() {
private void processRequestItems() {
final Queue<MonitoringEventListener.RequestStats> requestQueuedItems = monitoringEventListener.getRequestQueuedItems();
final FloodingLogger floodingLogger = new FloodingLogger(requestQueuedItems);

while (!requestQueuedItems.isEmpty()) {
RequestStats event = null;
while ((event = requestQueuedItems.poll()) != null) {
floodingLogger.conditionallyLogFlooding();

final MonitoringEventListener.RequestStats event = requestQueuedItems.remove();

final MonitoringEventListener.TimeStats requestStats = event.getRequestStats();
statisticsBuilder.addRequestExecution(requestStats.getStartTime(), requestStats.getDuration());

Expand All @@ -160,11 +157,9 @@ private void processRequestItems() {
private void processResponseCodeEvents() {
final Queue<Integer> responseEvents = monitoringEventListener.getResponseStatuses();
final FloodingLogger floodingLogger = new FloodingLogger(responseEvents);

while (!responseEvents.isEmpty()) {
Integer code = null;
while ((code = responseEvents.poll()) != null) {
floodingLogger.conditionallyLogFlooding();

final Integer code = responseEvents.remove();
statisticsBuilder.addResponseCode(code);
}

Expand Down
45 changes: 45 additions & 0 deletions tests/integration/jersey-4697/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved.
This program and the accompanying materials are made available under the
terms of the Eclipse Public License v. 2.0, which is available at
http://www.eclipse.org/legal/epl-2.0.
This Source Code may also be made available under the following Secondary
Licenses when the conditions for such availability set forth in the
Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
version 2 with the GNU Classpath Exception, which is available at
https://www.gnu.org/software/classpath/license.html.
SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>project</artifactId>
<groupId>org.glassfish.jersey.tests.integration</groupId>
<version>2.34-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>jersey-4697</artifactId>

<dependencies>
<dependency>
<groupId>org.glassfish.jersey.test-framework.providers</groupId>
<artifactId>jersey-test-framework-provider-bundle</artifactId>
<type>pom</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.tests.integration.jersey4697;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeDataSupport;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;

import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.internal.inject.Providers;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.server.internal.monitoring.MonitoringEventListener;
import org.glassfish.jersey.server.monitoring.ApplicationEventListener;
import org.glassfish.jersey.server.monitoring.ExceptionMapperMXBean;
import org.glassfish.jersey.server.monitoring.RequestEvent;
import org.glassfish.jersey.server.monitoring.RequestEventListener;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class MonitoringEventListenerTest extends JerseyTest {

private static final long TIMEOUT = 60000; // 1 minute
private static final String MBEAN_EXCEPTION =
"org.glassfish.jersey:type=MonitoringEventListenerTest,subType=Global,exceptions=ExceptionMapper";

@Path("/example")
public static class ExampleResource {
@Inject
private InjectionManager injectionManager;
@GET
@Path("/error")
public Response error() {
throw new RuntimeException("Any exception to be counted in ExceptionMapper");
}
@GET
@Path("/poison")
public Response poison() {
MonitoringEventListener monitoringEventListener = listener();
RequestEvent requestEvent = mock(RequestEvent.class);
when(requestEvent.getType()).thenReturn(RequestEvent.Type.START);
RequestEventListener eventListener = monitoringEventListener.onRequest(requestEvent);
RequestEvent poisonEvent = mock(RequestEvent.class);
when(poisonEvent.getType()).thenReturn(RequestEvent.Type.EXCEPTION_MAPPING_FINISHED);
when(poisonEvent.getExceptionMapper())
.thenThrow(new IllegalStateException("This causes the scheduler to stop working"));
eventListener.onEvent(poisonEvent);
return Response.ok().build();
}
@GET
@Path("/queueSize")
public Response queueSize() throws Exception {
MonitoringEventListener monitoringEventListener = listener();
Method method = MonitoringEventListener.class.getDeclaredMethod("getExceptionMapperEvents");
method.setAccessible(true);
Collection<?> queue = (Collection<?>) method.invoke(monitoringEventListener);
return Response.ok(queue.size()).build();
}
private MonitoringEventListener listener() {
Iterable<ApplicationEventListener> listeners =
Providers.getAllProviders(injectionManager, ApplicationEventListener.class);
for (ApplicationEventListener listener : listeners) {
if (listener instanceof MonitoringEventListener) {
return (MonitoringEventListener) listener;
}
}
throw new IllegalStateException("MonitoringEventListener was not found");
}
}

@Provider
public static class RuntimeExceptionMapper implements ExceptionMapper<RuntimeException> {
@Override
public Response toResponse(RuntimeException e) {
return Response.status(500).entity("RuntimeExceptionMapper: " + e.getMessage()).build();
}
}

@Override
protected Application configure() {
ResourceConfig resourceConfig = new ResourceConfig(ExampleResource.class);
// Need to map the exception to be counted by ExceptionMapper
resourceConfig.register(RuntimeExceptionMapper.class);
resourceConfig.property(ServerProperties.MONITORING_ENABLED, true);
resourceConfig.property(ServerProperties.MONITORING_STATISTICS_ENABLED, true);
resourceConfig.property(ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED, true);
resourceConfig.property(ServerProperties.MONITORING_STATISTICS_REFRESH_INTERVAL, 1);
resourceConfig.setApplicationName("MonitoringEventListenerTest");
return resourceConfig;
}

@Test
public void exceptionInScheduler() throws Exception {
final Long ERRORS_BEFORE_FAIL = 10L;
// Send some requests to process some statistics.
request(ERRORS_BEFORE_FAIL);
// Give some time to the scheduler to collect data.
waitTillQueueIsEmpty();
// Make the scheduler to fail. No more statistics are collected.
makeFailure();
// Sending again requests
request(20);
// Explicit wait
Thread.sleep(1000);
// No new events should be accepted because scheduler is not working.
queueIsEmpty();
Long monitoredErrors = mappedErrorsFromJMX(MBEAN_EXCEPTION);
assertEquals(ERRORS_BEFORE_FAIL, monitoredErrors);
}

private void waitTillQueueIsEmpty() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
Response response = target("/example/queueSize").request().get();
int queueSize = response.readEntity(Integer.class);
if (queueSize == 0) {
latch.countDown();
}
}, 0, 10, TimeUnit.MILLISECONDS);
boolean wait = latch.await(TIMEOUT, TimeUnit.MILLISECONDS);
assertTrue("The queue was not empty in the given time", wait);
scheduler.shutdown();
}

private void makeFailure() {
Response response = target("/example/poison").request().get();
assertEquals(200, response.getStatus());
}

private void queueIsEmpty() {
Response response = target("/example/queueSize").request().get();
assertEquals(200, response.getStatus());
assertEquals(Integer.valueOf(0), response.readEntity(Integer.class));
}

private Long mappedErrorsFromJMX(String name) throws Exception {
Long monitoredErrors = null;
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName = new ObjectName(name);
ExceptionMapperMXBean bean = JMX.newMBeanProxy(mbs, objectName, ExceptionMapperMXBean.class);
Map<?, ?> counter = bean.getExceptionMapperCount();
CompositeDataSupport value = (CompositeDataSupport) counter.entrySet().iterator().next().getValue();
for (Object obj : value.values()) {
if (obj instanceof Long) {
// Messy way to get the errors, but generic types doesn't match and there is no nice way
monitoredErrors = (Long) obj;
break;
}
}
return monitoredErrors;
}

private void request(long requests) {
for (long i = 0; i < requests; i++) {
Response response = target("/example/error").request().get();
assertEquals(500, response.getStatus());
}
}
}
Loading

0 comments on commit ebe41ef

Please sign in to comment.