From e857b5cef79c333a946f38c028960d6102e158bd Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Fri, 8 Mar 2019 09:43:06 +0800 Subject: [PATCH] Reactive: add Sentinel Reactor adapter module (#545) * Add reactive adapter implementation for Project Reactor (Mono/Flux) including a reactor transformer and an experimental `ReactorSphU` * Add an `InheritableBaseSubscriber` that derives from the original BaseSubscriber of reactor-core * Add basic test cases for reactor adapter * Add Sentinel context enter support for EntryConfig in reactor transformer Signed-off-by: Eric Zhao --- sentinel-adapter/pom.xml | 2 + .../sentinel-reactor-adapter/pom.xml | 44 ++++ .../adapter/reactor/ContextConfig.java | 57 ++++ .../sentinel/adapter/reactor/EntryConfig.java | 77 ++++++ .../adapter/reactor/FluxSentinelOperator.java | 40 +++ .../reactor/InheritableBaseSubscriber.java | 243 ++++++++++++++++++ .../adapter/reactor/MonoSentinelOperator.java | 40 +++ .../sentinel/adapter/reactor/ReactorSphU.java | 72 ++++++ .../reactor/SentinelReactorConstants.java | 27 ++ .../reactor/SentinelReactorSubscriber.java | 166 ++++++++++++ .../reactor/SentinelReactorTransformer.java | 54 ++++ ...uxSentinelOperatorTestIntegrationTest.java | 75 ++++++ .../MonoSentinelOperatorIntegrationTest.java | 168 ++++++++++++ .../adapter/reactor/ReactorSphUTest.java | 74 ++++++ 14 files changed, 1139 insertions(+) create mode 100644 sentinel-adapter/sentinel-reactor-adapter/pom.xml create mode 100644 sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ContextConfig.java create mode 100644 sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/EntryConfig.java create mode 100644 sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperator.java create mode 100644 sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/InheritableBaseSubscriber.java create mode 100644 sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java create mode 100644 sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphU.java create mode 100644 sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorConstants.java create mode 100644 sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java create mode 100644 sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java create mode 100644 sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperatorTestIntegrationTest.java create mode 100644 sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperatorIntegrationTest.java create mode 100644 sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphUTest.java diff --git a/sentinel-adapter/pom.xml b/sentinel-adapter/pom.xml index a665969b63..39d7e496cc 100755 --- a/sentinel-adapter/pom.xml +++ b/sentinel-adapter/pom.xml @@ -19,6 +19,7 @@ sentinel-dubbo-adapter sentinel-grpc-adapter sentinel-zuul-adapter + sentinel-reactor-adapter @@ -38,6 +39,7 @@ sentinel-web-servlet ${project.version} + junit junit diff --git a/sentinel-adapter/sentinel-reactor-adapter/pom.xml b/sentinel-adapter/sentinel-reactor-adapter/pom.xml new file mode 100644 index 0000000000..cebb46e3f3 --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/pom.xml @@ -0,0 +1,44 @@ + + + + sentinel-adapter + com.alibaba.csp + 1.5.0-SNAPSHOT + + 4.0.0 + + sentinel-reactor-adapter + + + 1.8 + 1.8 + 3.2.6.RELEASE + + + + + com.alibaba.csp + sentinel-core + + + io.projectreactor + reactor-core + ${reactor.version} + provided + + + + junit + junit + test + + + io.projectreactor + reactor-test + ${reactor.version} + test + + + \ No newline at end of file diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ContextConfig.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ContextConfig.java new file mode 100644 index 0000000000..a53613ba5c --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ContextConfig.java @@ -0,0 +1,57 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.adapter.reactor; + +import com.alibaba.csp.sentinel.util.AssertUtil; +import com.alibaba.csp.sentinel.util.StringUtil; + +/** + * @author Eric Zhao + */ +public class ContextConfig { + + private final String contextName; + private final String origin; + + public ContextConfig(String contextName) { + this(contextName, ""); + } + + public ContextConfig(String contextName, String origin) { + AssertUtil.assertNotBlank(contextName, "contextName cannot be blank"); + this.contextName = contextName; + if (StringUtil.isBlank(origin)) { + origin = ""; + } + this.origin = origin; + } + + public String getContextName() { + return contextName; + } + + public String getOrigin() { + return origin; + } + + @Override + public String toString() { + return "ContextConfig{" + + "contextName='" + contextName + '\'' + + ", origin='" + origin + '\'' + + '}'; + } +} diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/EntryConfig.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/EntryConfig.java new file mode 100644 index 0000000000..2f7cbd0904 --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/EntryConfig.java @@ -0,0 +1,77 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.adapter.reactor; + +import com.alibaba.csp.sentinel.EntryType; +import com.alibaba.csp.sentinel.util.AssertUtil; + +/** + * @author Eric Zhao + * @since 1.5.0 + */ +public class EntryConfig { + + private final String resourceName; + private final EntryType entryType; + private final ContextConfig contextConfig; + + public EntryConfig(String resourceName) { + this(resourceName, EntryType.OUT); + } + + public EntryConfig(String resourceName, EntryType entryType) { + this(resourceName, entryType, null); + } + + public EntryConfig(String resourceName, EntryType entryType, ContextConfig contextConfig) { + checkParams(resourceName, entryType); + this.resourceName = resourceName; + this.entryType = entryType; + // Constructed ContextConfig should be valid here. Null is allowed here. + this.contextConfig = contextConfig; + } + + public String getResourceName() { + return resourceName; + } + + public EntryType getEntryType() { + return entryType; + } + + public ContextConfig getContextConfig() { + return contextConfig; + } + + public static void assertValid(EntryConfig config) { + AssertUtil.notNull(config, "entry config cannot be null"); + checkParams(config.resourceName, config.entryType); + } + + private static void checkParams(String resourceName, EntryType entryType) { + AssertUtil.assertNotBlank(resourceName, "resourceName cannot be blank"); + AssertUtil.notNull(entryType, "entryType cannot be null"); + } + + @Override + public String toString() { + return "EntryConfig{" + + "resourceName='" + resourceName + '\'' + + ", entryType=" + entryType + + ", contextConfig=" + contextConfig + + '}'; + } +} diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperator.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperator.java new file mode 100644 index 0000000000..10587113c5 --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperator.java @@ -0,0 +1,40 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.adapter.reactor; + +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxOperator; + +/** + * @author Eric Zhao + * @since 1.5.0 + */ +public class FluxSentinelOperator extends FluxOperator { + + private final EntryConfig entryConfig; + + public FluxSentinelOperator(Flux source, EntryConfig entryConfig) { + super(source); + EntryConfig.assertValid(entryConfig); + this.entryConfig = entryConfig; + } + + @Override + public void subscribe(CoreSubscriber actual) { + source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, false)); + } +} diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/InheritableBaseSubscriber.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/InheritableBaseSubscriber.java new file mode 100644 index 0000000000..2139ebebe0 --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/InheritableBaseSubscriber.java @@ -0,0 +1,243 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.adapter.reactor; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.Disposable; +import reactor.core.Exceptions; +import reactor.core.publisher.Operators; +import reactor.core.publisher.SignalType; + +/** + *

+ * Copied from {@link reactor.core.publisher.BaseSubscriber} of reactor-core, + * but allow sub-classes to override {@code onSubscribe}, {@code onNext}, + * {@code onError} and {@code onComplete} method for customization. + *

+ *

This base subscriber also provides predicate for {@code onErrorDropped} hook as a workaround for Sentinel.

+ */ +abstract class InheritableBaseSubscriber implements CoreSubscriber, Subscription, Disposable { + + volatile Subscription subscription; + + static AtomicReferenceFieldUpdater S = + AtomicReferenceFieldUpdater.newUpdater(InheritableBaseSubscriber.class, Subscription.class, + "subscription"); + + /** + * Return current {@link Subscription} + * + * @return current {@link Subscription} + */ + protected Subscription upstream() { + return subscription; + } + + @Override + public boolean isDisposed() { + return subscription == Operators.cancelledSubscription(); + } + + /** + * {@link Disposable#dispose() Dispose} the {@link Subscription} by + * {@link Subscription#cancel() cancelling} it. + */ + @Override + public void dispose() { + cancel(); + } + + /** + * Hook for further processing of onSubscribe's Subscription. Implement this method + * to call {@link #request(long)} as an initial request. Values other than the + * unbounded {@code Long.MAX_VALUE} imply that you'll also call request in + * {@link #hookOnNext(Object)}. + *

Defaults to request unbounded Long.MAX_VALUE as in {@link #requestUnbounded()} + * + * @param subscription the subscription to optionally process + */ + protected void hookOnSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); + } + + /** + * Hook for processing of onNext values. You can call {@link #request(long)} here + * to further request data from the source {@code org.reactivestreams.Publisher} if + * the {@link #hookOnSubscribe(Subscription) initial request} wasn't unbounded. + *

Defaults to doing nothing. + * + * @param value the emitted value to process + */ + protected void hookOnNext(T value) { + // NO-OP + } + + /** + * Optional hook for completion processing. Defaults to doing nothing. + */ + protected void hookOnComplete() { + // NO-OP + } + + /** + * Optional hook for error processing. Default is to call + * {@link Exceptions#errorCallbackNotImplemented(Throwable)}. + * + * @param throwable the error to process + */ + protected void hookOnError(Throwable throwable) { + throw Exceptions.errorCallbackNotImplemented(throwable); + } + + /** + * Optional hook executed when the subscription is cancelled by calling this + * Subscriber's {@link #cancel()} method. Defaults to doing nothing. + */ + protected void hookOnCancel() { + //NO-OP + } + + /** + * Optional hook executed after any of the termination events (onError, onComplete, + * cancel). The hook is executed in addition to and after {@link #hookOnError(Throwable)}, + * {@link #hookOnComplete()} and {@link #hookOnCancel()} hooks, even if these callbacks + * fail. Defaults to doing nothing. A failure of the callback will be caught by + * {@code Operators#onErrorDropped(Throwable, reactor.util.context.Context)}. + * + * @param type the type of termination event that triggered the hook + * ({@link SignalType#ON_ERROR}, {@link SignalType#ON_COMPLETE} or + * {@link SignalType#CANCEL}) + */ + protected void hookFinally(SignalType type) { + //NO-OP + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.setOnce(S, this, s)) { + try { + hookOnSubscribe(s); + } catch (Throwable throwable) { + onError(Operators.onOperatorError(s, throwable, currentContext())); + } + } + } + + @Override + public void onNext(T value) { + Objects.requireNonNull(value, "onNext"); + try { + hookOnNext(value); + } catch (Throwable throwable) { + onError(Operators.onOperatorError(subscription, throwable, value, currentContext())); + } + } + + protected boolean shouldCallErrorDropHook() { + return true; + } + + @Override + public void onError(Throwable t) { + Objects.requireNonNull(t, "onError"); + + if (S.getAndSet(this, Operators.cancelledSubscription()) == Operators + .cancelledSubscription()) { + // Already cancelled concurrently + + // Workaround for Sentinel BlockException: + // Here we add a predicate method to decide whether exception should be dropped implicitly + // or call the {@code onErrorDropped} hook. + if (shouldCallErrorDropHook()) { + Operators.onErrorDropped(t, currentContext()); + } + + return; + } + + try { + hookOnError(t); + } catch (Throwable e) { + e = Exceptions.addSuppressed(e, t); + Operators.onErrorDropped(e, currentContext()); + } finally { + safeHookFinally(SignalType.ON_ERROR); + } + } + + @Override + public void onComplete() { + if (S.getAndSet(this, Operators.cancelledSubscription()) != Operators + .cancelledSubscription()) { + //we're sure it has not been concurrently cancelled + try { + hookOnComplete(); + } catch (Throwable throwable) { + //onError itself will short-circuit due to the CancelledSubscription being push above + hookOnError(Operators.onOperatorError(throwable, currentContext())); + } finally { + safeHookFinally(SignalType.ON_COMPLETE); + } + } + } + + @Override + public final void request(long n) { + if (Operators.validate(n)) { + Subscription s = this.subscription; + if (s != null) { + s.request(n); + } + } + } + + /** + * {@link #request(long) Request} an unbounded amount. + */ + public final void requestUnbounded() { + request(Long.MAX_VALUE); + } + + @Override + public final void cancel() { + if (Operators.terminate(S, this)) { + try { + hookOnCancel(); + } catch (Throwable throwable) { + hookOnError(Operators.onOperatorError(subscription, throwable, currentContext())); + } finally { + safeHookFinally(SignalType.CANCEL); + } + } + } + + void safeHookFinally(SignalType type) { + try { + hookFinally(type); + } catch (Throwable finallyFailure) { + Operators.onErrorDropped(finallyFailure, currentContext()); + } + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java new file mode 100644 index 0000000000..7ee353c297 --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java @@ -0,0 +1,40 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.adapter.reactor; + +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoOperator; + +/** + * @author Eric Zhao + * @since 1.5.0 + */ +public class MonoSentinelOperator extends MonoOperator { + + private final EntryConfig entryConfig; + + public MonoSentinelOperator(Mono source, EntryConfig entryConfig) { + super(source); + EntryConfig.assertValid(entryConfig); + this.entryConfig = entryConfig; + } + + @Override + public void subscribe(CoreSubscriber actual) { + source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, true)); + } +} diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphU.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphU.java new file mode 100644 index 0000000000..b872f0a8bb --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphU.java @@ -0,0 +1,72 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.adapter.reactor; + +import java.util.concurrent.atomic.AtomicReference; + +import com.alibaba.csp.sentinel.AsyncEntry; +import com.alibaba.csp.sentinel.EntryType; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.Tracer; +import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.slots.block.BlockException; + +import reactor.core.publisher.Mono; + +/** + * A {@link SphU} adapter with Project Reactor. + * + * @author Eric Zhao + * @since 1.5.0 + */ +public final class ReactorSphU { + + public static Mono entryWith(String resourceName, Mono actual) { + return entryWith(resourceName, EntryType.OUT, actual); + } + + public static Mono entryWith(String resourceName, EntryType entryType, Mono actual) { + final AtomicReference entryWrapper = new AtomicReference<>(null); + return Mono.defer(() -> { + try { + AsyncEntry entry = SphU.asyncEntry(resourceName, entryType); + entryWrapper.set(entry); + return actual.subscriberContext(context -> { + if (entry == null) { + return context; + } + Context sentinelContext = entry.getAsyncContext(); + if (sentinelContext == null) { + return context; + } + // TODO: check GC friendly? + return context.put(SentinelReactorConstants.SENTINEL_CONTEXT_KEY, sentinelContext); + }).doOnSuccessOrError((o, t) -> { + if (entry != null && entryWrapper.compareAndSet(entry, null)) { + if (t != null) { + Tracer.traceContext(t, 1, entry.getAsyncContext()); + } + entry.exit(); + } + }); + } catch (BlockException ex) { + return Mono.error(ex); + } + }); + } + + private ReactorSphU() {} +} diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorConstants.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorConstants.java new file mode 100644 index 0000000000..9819c7e044 --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorConstants.java @@ -0,0 +1,27 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.adapter.reactor; + +/** + * @author Eric Zhao + * @since 1.5.0 + */ +public final class SentinelReactorConstants { + + public static final String SENTINEL_CONTEXT_KEY = "_sentinel_context"; + + private SentinelReactorConstants() {} +} diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java new file mode 100644 index 0000000000..d95c1f3325 --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java @@ -0,0 +1,166 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.adapter.reactor; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.alibaba.csp.sentinel.AsyncEntry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.Tracer; +import com.alibaba.csp.sentinel.context.ContextUtil; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.util.function.Supplier; + +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.util.context.Context; + +/** + * @author Eric Zhao + * @since 1.5.0 + */ +public class SentinelReactorSubscriber extends InheritableBaseSubscriber { + + private final EntryConfig entryConfig; + + private final CoreSubscriber actual; + private final boolean unary; + + private volatile AsyncEntry currentEntry; + private final AtomicBoolean entryExited = new AtomicBoolean(false); + + public SentinelReactorSubscriber(EntryConfig entryConfig, + CoreSubscriber actual, + boolean unary) { + checkEntryConfig(entryConfig); + this.entryConfig = entryConfig; + this.actual = actual; + this.unary = unary; + } + + private void checkEntryConfig(EntryConfig config) { + EntryConfig.assertValid(config); + } + + @Override + public Context currentContext() { + if (currentEntry == null || entryExited.get()) { + return actual.currentContext(); + } + com.alibaba.csp.sentinel.context.Context sentinelContext = currentEntry.getAsyncContext(); + if (sentinelContext == null) { + return actual.currentContext(); + } + return actual.currentContext() + .put(SentinelReactorConstants.SENTINEL_CONTEXT_KEY, currentEntry.getAsyncContext()); + } + + private void doWithContextOrCurrent(Supplier> contextSupplier, + Runnable f) { + Optional contextOpt = contextSupplier.get(); + if (!contextOpt.isPresent()) { + // Provided context is absent, use current context. + f.run(); + } else { + // Run on provided context. + ContextUtil.runOnContext(contextOpt.get(), f); + } + } + + private void entryWhenSubscribed() { + ContextConfig sentinelContextConfig = entryConfig.getContextConfig(); + if (sentinelContextConfig != null) { + // If current we're already in a context, the context config won't work. + ContextUtil.enter(sentinelContextConfig.getContextName(), sentinelContextConfig.getOrigin()); + } + try { + AsyncEntry entry = SphU.asyncEntry(entryConfig.getResourceName()); + this.currentEntry = entry; + actual.onSubscribe(this); + } catch (BlockException ex) { + // Mark as completed (exited) explicitly. + entryExited.set(true); + // Signal cancel and propagate the {@code BlockException}. + cancel(); + actual.onSubscribe(this); + actual.onError(ex); + } finally { + if (sentinelContextConfig != null) { + ContextUtil.exit(); + } + } + } + + @Override + protected void hookOnSubscribe(Subscription subscription) { + doWithContextOrCurrent(() -> currentContext().getOrEmpty(SentinelReactorConstants.SENTINEL_CONTEXT_KEY), + this::entryWhenSubscribed); + } + + @Override + protected void hookOnNext(T value) { + if (isDisposed()) { + tryCompleteEntry(); + return; + } + doWithContextOrCurrent(() -> Optional.ofNullable(currentEntry).map(AsyncEntry::getAsyncContext), + () -> actual.onNext(value)); + + if (unary) { + // For some cases of unary operator (Mono), we have to do this during onNext hook. + // e.g. this kind of order: onSubscribe() -> onNext() -> cancel() -> onComplete() + // the onComplete hook will not be executed so we'll need to complete the entry in advance. + tryCompleteEntry(); + } + } + + @Override + protected void hookOnComplete() { + tryCompleteEntry(); + actual.onComplete(); + } + + @Override + protected boolean shouldCallErrorDropHook() { + // When flow control triggered or stream terminated, the incoming + // deprecated exceptions should be dropped implicitly, so we'll not call the `onErrorDropped` hook. + return !entryExited.get(); + } + + @Override + protected void hookOnError(Throwable t) { + if (currentEntry != null && currentEntry.getAsyncContext() != null) { + // Normal requests with non-BlockException will go through here. + Tracer.traceContext(t, 1, currentEntry.getAsyncContext()); + } + tryCompleteEntry(); + actual.onError(t); + } + + @Override + protected void hookOnCancel() { + + } + + private boolean tryCompleteEntry() { + if (currentEntry != null && entryExited.compareAndSet(false, true)) { + currentEntry.exit(); + return true; + } + return false; + } +} diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java new file mode 100644 index 0000000000..0b251dca34 --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java @@ -0,0 +1,54 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.adapter.reactor; + +import java.util.function.Function; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * A transformer that transforms given {@code Publisher} to a wrapped Sentinel reactor operator. + * + * @author Eric Zhao + * @since 1.5.0 + */ +public class SentinelReactorTransformer implements Function, Publisher> { + + private final EntryConfig entryConfig; + + public SentinelReactorTransformer(String resourceName) { + this(new EntryConfig(resourceName)); + } + + public SentinelReactorTransformer(EntryConfig entryConfig) { + EntryConfig.assertValid(entryConfig); + this.entryConfig = entryConfig; + } + + @Override + public Publisher apply(Publisher publisher) { + if (publisher instanceof Mono) { + return new MonoSentinelOperator<>((Mono) publisher, entryConfig); + } + if (publisher instanceof Flux) { + return new FluxSentinelOperator<>((Flux) publisher, entryConfig); + } + + throw new IllegalStateException("Publisher type is not supported: " + publisher.getClass().getCanonicalName()); + } +} \ No newline at end of file diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperatorTestIntegrationTest.java b/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperatorTestIntegrationTest.java new file mode 100644 index 0000000000..7b144760ac --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperatorTestIntegrationTest.java @@ -0,0 +1,75 @@ +package com.alibaba.csp.sentinel.adapter.reactor; + +import java.util.ArrayList; +import java.util.Collections; + +import com.alibaba.csp.sentinel.node.ClusterNode; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import static org.junit.Assert.*; + +/** + * @author Eric Zhao + */ +public class FluxSentinelOperatorTestIntegrationTest { + + @Test + public void testEmitMultipleValueSuccess() { + String resourceName = createResourceName("testEmitMultipleSuccess"); + StepVerifier.create(Flux.just(1, 2) + .map(e -> e * 2) + .transform(new SentinelReactorTransformer<>(resourceName))) + .expectNext(2) + .expectNext(4) + .verifyComplete(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(1, cn.passQps(), 0.01); + } + + @Test + public void testEmitFluxError() { + String resourceName = createResourceName("testEmitFluxError"); + StepVerifier.create(Flux.error(new IllegalAccessException("oops")) + .transform(new SentinelReactorTransformer<>(resourceName))) + .expectError(IllegalAccessException.class) + .verify(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(1, cn.passQps()); + assertEquals(1, cn.totalException()); + } + + @Test + public void testEmitMultipleValuesWhenFlowControlTriggered() { + String resourceName = createResourceName("testEmitMultipleValuesWhenFlowControlTriggered"); + FlowRuleManager.loadRules(Collections.singletonList( + new FlowRule(resourceName).setCount(0) + )); + StepVerifier.create(Flux.just(1, 3, 5) + .map(e -> e * 2) + .transform(new SentinelReactorTransformer<>(resourceName))) + .expectError(BlockException.class) + .verify(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(0, cn.passQps(), 0.01); + assertEquals(1, cn.blockRequest()); + + FlowRuleManager.loadRules(new ArrayList<>()); + } + + private String createResourceName(String resourceName) { + return "reactor_test_flux_" + resourceName; + } +} \ No newline at end of file diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperatorIntegrationTest.java b/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperatorIntegrationTest.java new file mode 100644 index 0000000000..e682723a92 --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperatorIntegrationTest.java @@ -0,0 +1,168 @@ +package com.alibaba.csp.sentinel.adapter.reactor; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; + +import com.alibaba.csp.sentinel.Constants; +import com.alibaba.csp.sentinel.EntryType; +import com.alibaba.csp.sentinel.node.ClusterNode; +import com.alibaba.csp.sentinel.node.EntranceNode; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.junit.Assert.*; + +/** + * @author Eric Zhao + */ +public class MonoSentinelOperatorIntegrationTest { + + @Test + public void testTransformMonoWithSentinelContextEnter() { + String resourceName = createResourceName("testTransformMonoWithSentinelContextEnter"); + String contextName = "test_reactive_context"; + String origin = "originA"; + FlowRuleManager.loadRules(Collections.singletonList( + new FlowRule(resourceName).setCount(0).setLimitApp(origin).as(FlowRule.class) + )); + StepVerifier.create(Mono.just(2) + .transform(new SentinelReactorTransformer<>( + // Customized context with origin. + new EntryConfig(resourceName, EntryType.OUT, new ContextConfig(contextName, origin)))) + ) + .expectError(BlockException.class) + .verify(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(0, cn.passQps(), 0.01); + assertEquals(1, cn.blockRequest()); + assertTrue(Constants.ROOT.getChildList() + .stream() + .filter(node -> node instanceof EntranceNode) + .map(e -> (EntranceNode)e) + .anyMatch(e -> e.getId().getName().equals(contextName)) + ); + + FlowRuleManager.loadRules(new ArrayList<>()); + } + + @Test + public void testFluxToMonoNextThenCancelSuccess() { + String resourceName = createResourceName("testFluxToMonoNextThenCancelSuccess"); + StepVerifier.create(Flux.range(1, 10) + .map(e -> e * 2) + .next() + .transform(new SentinelReactorTransformer<>(resourceName))) + .expectNext(2) + .verifyComplete(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(1, cn.passQps(), 0.01); + } + + @Test + public void testEmitSingleLongTimeRt() { + String resourceName = createResourceName("testEmitSingleLongTimeRt"); + StepVerifier.create(Mono.just(2) + .delayElement(Duration.ofMillis(1000)) + .map(e -> e * 2) + .transform(new SentinelReactorTransformer<>(resourceName))) + .expectNext(4) + .verifyComplete(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(1000, cn.avgRt(), 20); + } + + @Test + public void testEmitEmptySuccess() { + String resourceName = createResourceName("testEmitEmptySuccess"); + StepVerifier.create(Mono.empty() + .transform(new SentinelReactorTransformer<>(resourceName))) + .verifyComplete(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(1, cn.passQps(), 0.01); + } + + @Test + public void testEmitSingleSuccess() { + String resourceName = createResourceName("testEmitSingleSuccess"); + StepVerifier.create(Mono.just(1) + .transform(new SentinelReactorTransformer<>(resourceName))) + .expectNext(1) + .verifyComplete(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(1, cn.passQps(), 0.01); + } + + @Test + public void testEmitSingleValueWhenFlowControlTriggered() { + String resourceName = createResourceName("testEmitSingleValueWhenFlowControlTriggered"); + FlowRuleManager.loadRules(Collections.singletonList( + new FlowRule(resourceName).setCount(0) + )); + StepVerifier.create(Mono.just(1) + .map(e -> e * 2) + .transform(new SentinelReactorTransformer<>(resourceName))) + .expectError(BlockException.class) + .verify(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(0, cn.passQps(), 0.01); + assertEquals(1, cn.blockRequest()); + + FlowRuleManager.loadRules(new ArrayList<>()); + } + + @Test + public void testEmitExceptionWhenFlowControlTriggered() { + String resourceName = createResourceName("testEmitExceptionWhenFlowControlTriggered"); + FlowRuleManager.loadRules(Collections.singletonList( + new FlowRule(resourceName).setCount(0) + )); + StepVerifier.create(Mono.error(new IllegalStateException("some")) + .transform(new SentinelReactorTransformer<>(resourceName))) + .expectError(BlockException.class) + .verify(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(0, cn.passQps(), 0.01); + assertEquals(1, cn.blockRequest()); + + FlowRuleManager.loadRules(new ArrayList<>()); + } + + @Test + public void testEmitSingleError() { + String resourceName = createResourceName("testEmitSingleError"); + StepVerifier.create(Mono.error(new IllegalStateException()) + .transform(new SentinelReactorTransformer<>(resourceName))) + .expectError(IllegalStateException.class) + .verify(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(1, cn.totalException()); + } + + private String createResourceName(String resourceName) { + return "reactor_test_mono_" + resourceName; + } +} \ No newline at end of file diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphUTest.java b/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphUTest.java new file mode 100644 index 0000000000..0f4e52341f --- /dev/null +++ b/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphUTest.java @@ -0,0 +1,74 @@ +package com.alibaba.csp.sentinel.adapter.reactor; + +import java.util.ArrayList; +import java.util.Collections; + +import com.alibaba.csp.sentinel.node.ClusterNode; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; + +import org.junit.Test; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; + +import static org.junit.Assert.*; + +/** + * @author Eric Zhao + */ +public class ReactorSphUTest { + + @Test + public void testReactorEntryNormalWhenFlowControlTriggered() { + String resourceName = createResourceName("testReactorEntryNormalWhenFlowControlTriggered"); + FlowRuleManager.loadRules(Collections.singletonList( + new FlowRule(resourceName).setCount(0) + )); + StepVerifier.create(ReactorSphU.entryWith(resourceName, Mono.just(60)) + .subscribeOn(Schedulers.elastic()) + .map(e -> e * 3)) + .expectError(BlockException.class) + .verify(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(0, cn.passQps(), 0.01); + assertEquals(1, cn.blockRequest()); + + FlowRuleManager.loadRules(new ArrayList<>()); + } + + @Test + public void testReactorEntryWithCommon() { + String resourceName = createResourceName("testReactorEntryWithCommon"); + StepVerifier.create(ReactorSphU.entryWith(resourceName, Mono.just(60)) + .subscribeOn(Schedulers.elastic()) + .map(e -> e * 3)) + .expectNext(180) + .verifyComplete(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(1, cn.passQps(), 0.01); + } + + @Test + public void testReactorEntryWithBizException() { + String resourceName = createResourceName("testReactorEntryWithBizException"); + StepVerifier.create(ReactorSphU.entryWith(resourceName, Mono.error(new IllegalStateException()))) + .expectError(IllegalStateException.class) + .verify(); + + ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName); + assertNotNull(cn); + assertEquals(1, cn.passQps(), 0.01); + assertEquals(1, cn.totalException()); + } + + private String createResourceName(String resourceName) { + return "reactor_test_SphU_" + resourceName; + } +} \ No newline at end of file