Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Microprofile Fault Tolerance changes #12

Merged
merged 4 commits into from
Sep 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public void beforeBeanDiscovery(@Observes BeforeBeanDiscovery beforeBeanDiscover
beforeBeanDiscovery.addInterceptorBinding(bindingType);
AnnotatedType<FaultToleranceInterceptor> interceptorType = beanManager.createAnnotatedType(FaultToleranceInterceptor.class);
beforeBeanDiscovery.addAnnotatedType(interceptorType);
AnnotatedType<FaultToleranceInterceptor.ExecutorCleanup> executorCleanup = beanManager.createAnnotatedType(FaultToleranceInterceptor.ExecutorCleanup.class);
beforeBeanDiscovery.addAnnotatedType(executorCleanup);
}

public <T> void processAnnotatedType(@Observes @WithAnnotations({ Asynchronous.class, Fallback.class, Timeout.class, CircuitBreaker.class, Retry.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

import javax.annotation.PreDestroy;
import javax.annotation.Priority;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.spi.BeanManager;
import javax.inject.Inject;
import javax.interceptor.AroundInvoke;
Expand All @@ -32,6 +34,8 @@
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.faulttolerance.Timeout;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.ws.ffdc.annotation.FFDCIgnore;
import com.ibm.ws.microprofile.faulttolerance.cdi.config.AsynchronousConfig;
import com.ibm.ws.microprofile.faulttolerance.cdi.config.BulkheadConfig;
Expand All @@ -58,6 +62,11 @@ public class FaultToleranceInterceptor {

private final ConcurrentHashMap<Method, AggregatedFTPolicy> policyCache = new ConcurrentHashMap<>();

@Inject
public FaultToleranceInterceptor(ExecutorCleanup executorCleanup) {
executorCleanup.setPolicies(policyCache.values());
}

@AroundInvoke
public Object executeFT(InvocationContext context) throws Throwable {

Expand Down Expand Up @@ -214,7 +223,12 @@ private Object execute(InvocationContext invocationContext, AggregatedFTPolicy a
};

Executor<Future<Object>> async = (Executor<Future<Object>>) executor;
result = async.execute(callable, executionContext);
try {
result = async.execute(callable, executionContext);
} catch (ExecutionException e) {
throw e.getCause();
}

} else {

Callable<Object> callable = () -> {
Expand All @@ -235,11 +249,25 @@ private Object execute(InvocationContext invocationContext, AggregatedFTPolicy a
return result;
}

@PreDestroy
public void cleanUpExecutors(InvocationContext ctx) throws Exception {
ctx.proceed();
policyCache.forEach((k, v) -> {
v.close();
});
@Dependent
public static class ExecutorCleanup {
private static final TraceComponent tc = Tr.register(ExecutorCleanup.class);

private Collection<AggregatedFTPolicy> policies;

public void setPolicies(Collection<AggregatedFTPolicy> policies) {
this.policies = policies;
}

@PreDestroy
public void cleanUpExecutors() {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Cleaning up executors");
}

policies.forEach((e) -> {
e.close();
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,33 @@ public void testCBFailureThresholdWithException() throws Exception {
"SUCCESS");
}

@Test
public void testCBAsync() throws Exception {
WebBrowser browser = createWebBrowserForTestCase();
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/circuitbreaker?testMethod=testCBAsync",
"SUCCESS");
}

@Test
public void testCBAsyncFallback() throws Exception {
WebBrowser browser = createWebBrowserForTestCase();
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/circuitbreaker?testMethod=testCBAsyncFallback",
"SUCCESS");
}

/** {@inheritDoc} */
@Override
protected SharedServer getSharedServer() {
return SHARED_SERVER;
}
@BeforeClass
public static void setUp() throws Exception {
if (!SHARED_SERVER.getLibertyServer().isStarted()) {
SHARED_SERVER.getLibertyServer().startServer();
}
}

@BeforeClass
public static void setUp() throws Exception {
if (!SHARED_SERVER.getLibertyServer().isStarted()) {
SHARED_SERVER.getLibertyServer().startServer();
}

}

@AfterClass
public static void tearDown() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.junit.ClassRule;
import org.junit.Test;

import com.ibm.websphere.simplicity.RemoteFile;
import com.ibm.ws.fat.util.LoggingTest;
import com.ibm.ws.fat.util.SharedServer;
import com.ibm.ws.fat.util.browser.WebBrowser;
Expand Down Expand Up @@ -72,6 +73,23 @@ public void testRetryDurationZero() throws Exception {
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/retry?testMethod=testRetryDurationZero", "SUCCESS");
}

/**
* Not really related to retry but it's easiest to test it here
*/
@Test
public void testExecutorsClose() throws Exception {

RemoteFile traceLog = SHARED_SERVER.getLibertyServer().getMostRecentTraceFile();
SHARED_SERVER.getLibertyServer().setMarkToEndOfLog(traceLog);

// This calls a RequestScoped bean which only has fault tolerance annotations on the method
// This should cause executors to get cleaned up
WebBrowser browser = createWebBrowserForTestCase();
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/retry?testMethod=testRetryAbortOn", "SUCCESS");

SHARED_SERVER.getLibertyServer().waitForStringInLog("Cleaning up executors", traceLog);
}

/** {@inheritDoc} */
@Override
protected SharedServer getSharedServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ public void testTimeoutZero() throws Exception {
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/timeout?testMethod=testTimeoutZero", "SUCCESS");
}

@Test
public void testNonInterruptableTimeout() throws Exception {
WebBrowser browser = createWebBrowserForTestCase();
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/timeout?testMethod=testNonInterruptableTimeout", "SUCCESS");
}

@Test
public void testNonInterruptableDoesntTimeout() throws Exception {
WebBrowser browser = createWebBrowserForTestCase();
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/timeout?testMethod=testNonInterruptableDoesntTimeout", "SUCCESS");
}

/** {@inheritDoc} */
@Override
protected SharedServer getSharedServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void testAsyncDisabled(HttpServletRequest request, HttpServletResponse re
long duration = end - start;

// Ensure that this method was executed synchronously
assertThat("Call duration", duration, greaterThan(TestConstants.WORK_TIME));
assertThat("Call duration", duration, greaterThan(TestConstants.WORK_TIME - TestConstants.TEST_TWEAK_TIME_UNIT));
assertThat("Call result", future.get(), is(notNullValue()));
assertThat("Call result", future.get().getData(), equalTo(AsyncBean.CONNECT_A_DATA));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.ibm.ws.microprofile.faulttolerance_fat.cdi;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

/*******************************************************************************
* Copyright (c) 2017 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
Expand All @@ -12,6 +16,8 @@
*******************************************************************************/

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import javax.inject.Inject;
import javax.servlet.ServletException;
Expand Down Expand Up @@ -106,6 +112,44 @@ public void testCBFailureThresholdWithException(HttpServletRequest request,
}
}

public void testCBAsync() throws Exception {
for (int i = 0; i < 3; i++) {
try {
bean.serviceC().get();
fail("Exception not thrown");
} catch (ExecutionException e) {
//assertThat("Execution exception cause", e.getCause(), instanceOf(ConnectException.class));
}
}

// Circuit should now be open

try {
bean.serviceC();
fail("Exception not thrown");
} catch (CircuitBreakerOpenException e) {
// Expected
}
}

public void testCBAsyncFallback() throws Exception {
for (int i = 0; i < 3; i++) {
assertThat(bean.serviceD().get(), is("serviceDFallback"));
}

assertThat(bean.getExecutionCounterD(), is(3));

// Circuit should now be open
Future<String> future = bean.serviceD();
String result = future.get();

// CB is open, expect to fall back
assertThat(result, is("serviceDFallback"));

// However, we don't expect the call to have reached the serviceD method
assertThat(bean.getExecutionCounterD(), is(3));
}

/**
* This test should only pass if MP_Fault_Tolerance_NonFallback_Enabled is set to false
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,27 @@ public void testTimeoutZero(HttpServletRequest request, HttpServletResponse resp
// No TimeoutException expected
}

public void testNonInterruptableTimeout() throws InterruptedException {
try {
bean.busyWait(1000); // Busy wait time is greater than timeout (=500)
fail("No exception thrown");
} catch (TimeoutException e) {
if (Thread.interrupted()) {
fail("Thread was in interrupted state upon return");
}
// This wait is to ensure our thread doesn't get interrupted later, after the method has finished
Thread.sleep(1000);
}
}

public void testNonInterruptableDoesntTimeout() throws Exception {
bean.busyWait(10); // Busy wait time is less than timeout (=500)

if (Thread.interrupted()) {
fail("Thread was in interrupted state upon return");
}

Thread.sleep(2000); // Wait to ensure that our thread isn't interrupted after the method has finished
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
package com.ibm.ws.microprofile.faulttolerance_fat.cdi.beans;

import java.time.temporal.ChronoUnit;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

import javax.enterprise.context.RequestScoped;

import org.eclipse.microprofile.faulttolerance.Asynchronous;
import org.eclipse.microprofile.faulttolerance.CircuitBreaker;
import org.eclipse.microprofile.faulttolerance.Fallback;
import org.eclipse.microprofile.faulttolerance.Timeout;

import com.ibm.ws.microprofile.faulttolerance_fat.util.ConnectException;
Expand All @@ -24,6 +28,7 @@ public class CircuitBreakerBean {

private int executionCounterA = 0;
private int executionCounterB = 0;
private int executionCounterD = 0;

@CircuitBreaker(delay = 1, delayUnit = ChronoUnit.SECONDS, requestVolumeThreshold = 3, failureRatio = 1.0)
@Timeout(value = 3, unit = ChronoUnit.SECONDS)
Expand Down Expand Up @@ -52,4 +57,26 @@ public String serviceB() throws ConnectException {
}
return "serviceB: " + executionCounterB;
}

@Asynchronous
@CircuitBreaker(requestVolumeThreshold = 3, failureRatio = 1.0)
public Future<String> serviceC() throws ConnectException {
throw new ConnectException("serviceC");
}

@Asynchronous
@CircuitBreaker(requestVolumeThreshold = 3, failureRatio = 1.0)
@Fallback(fallbackMethod = "serviceDFallback")
public Future<String> serviceD() throws ConnectException {
executionCounterD++;
throw new ConnectException("serviceD: " + executionCounterD);
}

public Future<String> serviceDFallback() {
return CompletableFuture.completedFuture("serviceDFallback");
}

public int getExecutionCounterD() {
return executionCounterD;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*******************************************************************************/
package com.ibm.ws.microprofile.faulttolerance_fat.cdi.beans;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -91,4 +92,18 @@ public int getConnectDCalls() {
public void connectE() throws InterruptedException {
Thread.sleep(2000);
}

/**
* Used for testing timeout with workloads which are not interruptable
*
* @param milliseconds number of milliseconds to busy wait for
*/
@Timeout(500)
public void busyWait(int milliseconds) {
long duration = Duration.ofMillis(milliseconds).toNanos();
long start = System.nanoTime();
while (System.nanoTime() - start < duration) {
// Do nothing
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.ws.microprofile.faulttolerance.spi.CircuitBreakerPolicy;

import net.jodah.failsafe.CircuitBreaker;
Expand All @@ -22,11 +24,9 @@
*/
public class CircuitBreakerImpl extends CircuitBreaker {

private final boolean async;
private volatile boolean nested = false;
private static final TraceComponent tc = Tr.register(CircuitBreakerImpl.class);

public CircuitBreakerImpl(CircuitBreakerPolicy policy, boolean async) {
this.async = async;
public CircuitBreakerImpl(CircuitBreakerPolicy policy) {

Duration delay = policy.getDelay();
Class<? extends Throwable>[] failOn = policy.getFailOn();
Expand All @@ -48,14 +48,4 @@ public CircuitBreakerImpl(CircuitBreakerPolicy policy, boolean async) {
withSuccessThreshold(successThreshold);
}

@Override
public void recordSuccess() {
if (!async || nested) {
super.recordSuccess();
}
}

public void setNested() {
this.nested = true;
}
}
Loading