From 9c8f4e761951bfac0ad50e3bffe92824e5497a6d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E6=97=B6=E6=97=A0=E4=B8=A4=E4=B8=B6?= <442367943@qq.com>
Date: Tue, 21 Aug 2018 13:26:50 +0800
Subject: [PATCH 01/12] HashedWheelTimer (#1973)
* * HashedWheelTimer to check timeout future
* when the future is done, do not set it is timeout
---
.../dubbo/common/timer/HashedWheelTimer.java | 805 ++++++++++++++++++
.../apache/dubbo/common/timer/Timeout.java | 55 ++
.../org/apache/dubbo/common/timer/Timer.java | 48 ++
.../apache/dubbo/common/timer/TimerTask.java | 34 +
.../dubbo/common/utils/StringUtils.java | 14 +
.../common/timer/HashedWheelTimerTest.java | 72 ++
.../exchange/support/DefaultFuture.java | 82 +-
.../support/header/HeaderExchangeChannel.java | 2 +-
.../exchange/support/DefaultFutureTest.java | 124 +++
.../rpc/protocol/thrift/ThriftCodecTest.java | 4 +-
10 files changed, 1202 insertions(+), 38 deletions(-)
create mode 100644 dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java
create mode 100644 dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timeout.java
create mode 100644 dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timer.java
create mode 100644 dubbo-common/src/main/java/org/apache/dubbo/common/timer/TimerTask.java
create mode 100644 dubbo-common/src/test/java/org/apache/dubbo/common/timer/HashedWheelTimerTest.java
create mode 100644 dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java
new file mode 100644
index 00000000000..db5c43fee97
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java
@@ -0,0 +1,805 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.common.timer;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Queue;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A {@link Timer} optimized for approximated I/O timeout scheduling.
+ *
+ *
Tick Duration
+ *
+ * As described with 'approximated', this timer does not execute the scheduled
+ * {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will
+ * check if there are any {@link TimerTask}s behind the schedule and execute
+ * them.
+ *
+ * You can increase or decrease the accuracy of the execution timing by
+ * specifying smaller or larger tick duration in the constructor. In most
+ * network applications, I/O timeout does not need to be accurate. Therefore,
+ * the default tick duration is 100 milliseconds and you will not need to try
+ * different configurations in most cases.
+ *
+ *
Ticks per Wheel (Wheel Size)
+ *
+ * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
+ * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
+ * function is 'dead line of the task'. The default number of ticks per wheel
+ * (i.e. the size of the wheel) is 512. You could specify a larger value
+ * if you are going to schedule a lot of timeouts.
+ *
+ *
Do not create many instances.
+ *
+ * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
+ * started. Therefore, you should make sure to create only one instance and
+ * share it across your application. One of the common mistakes, that makes
+ * your application unresponsive, is to create a new instance for every connection.
+ *
+ *
Implementation Details
+ *
+ * {@link HashedWheelTimer} is based on
+ * George Varghese and
+ * Tony Lauck's paper,
+ * 'Hashed
+ * and Hierarchical Timing Wheels: data structures to efficiently implement a
+ * timer facility'. More comprehensive slides are located
+ * here.
+ */
+public class HashedWheelTimer implements Timer {
+
+ /**
+ * may be in spi?
+ */
+ public static final String NAME = "hased";
+
+ private static final Logger logger = LoggerFactory.getLogger(HashedWheelTimer.class);
+
+ private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
+ private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
+ private static final int INSTANCE_COUNT_LIMIT = 64;
+ private static final AtomicIntegerFieldUpdater WORKER_STATE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
+
+ private final Worker worker = new Worker();
+ private final Thread workerThread;
+
+ private static final int WORKER_STATE_INIT = 0;
+ private static final int WORKER_STATE_STARTED = 1;
+ private static final int WORKER_STATE_SHUTDOWN = 2;
+
+ /**
+ * 0 - init, 1 - started, 2 - shut down
+ */
+ @SuppressWarnings({"unused", "FieldMayBeFinal"})
+ private volatile int workerState;
+
+ private final long tickDuration;
+ private final HashedWheelBucket[] wheel;
+ private final int mask;
+ private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
+ private final Queue timeouts = new ArrayBlockingQueue(1024);
+ private final Queue cancelledTimeouts = new ArrayBlockingQueue(1024);
+ private final AtomicLong pendingTimeouts = new AtomicLong(0);
+ private final long maxPendingTimeouts;
+
+ private volatile long startTime;
+
+ /**
+ * Creates a new timer with the default thread factory
+ * ({@link Executors#defaultThreadFactory()}), default tick duration, and
+ * default number of ticks per wheel.
+ */
+ public HashedWheelTimer() {
+ this(Executors.defaultThreadFactory());
+ }
+
+ /**
+ * Creates a new timer with the default thread factory
+ * ({@link Executors#defaultThreadFactory()}) and default number of ticks
+ * per wheel.
+ *
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @throws NullPointerException if {@code unit} is {@code null}
+ * @throws IllegalArgumentException if {@code tickDuration} is <= 0
+ */
+ public HashedWheelTimer(long tickDuration, TimeUnit unit) {
+ this(Executors.defaultThreadFactory(), tickDuration, unit);
+ }
+
+ /**
+ * Creates a new timer with the default thread factory
+ * ({@link Executors#defaultThreadFactory()}).
+ *
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @param ticksPerWheel the size of the wheel
+ * @throws NullPointerException if {@code unit} is {@code null}
+ * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
+ */
+ public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
+ this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
+ }
+
+ /**
+ * Creates a new timer with the default tick duration and default number of
+ * ticks per wheel.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @throws NullPointerException if {@code threadFactory} is {@code null}
+ */
+ public HashedWheelTimer(ThreadFactory threadFactory) {
+ this(threadFactory, 100, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a new timer with the default number of ticks per wheel.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
+ * @throws IllegalArgumentException if {@code tickDuration} is <= 0
+ */
+ public HashedWheelTimer(
+ ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
+ this(threadFactory, tickDuration, unit, 512);
+ }
+
+ /**
+ * Creates a new timer.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @param ticksPerWheel the size of the wheel
+ * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
+ * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
+ */
+ public HashedWheelTimer(
+ ThreadFactory threadFactory,
+ long tickDuration, TimeUnit unit, int ticksPerWheel) {
+ this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
+ }
+
+ /**
+ * Creates a new timer.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @param ticksPerWheel the size of the wheel
+ * @param maxPendingTimeouts The maximum number of pending timeouts after which call to
+ * {@code newTimeout} will result in
+ * {@link java.util.concurrent.RejectedExecutionException}
+ * being thrown. No maximum pending timeouts limit is assumed if
+ * this value is 0 or negative.
+ * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
+ * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
+ */
+ public HashedWheelTimer(
+ ThreadFactory threadFactory,
+ long tickDuration, TimeUnit unit, int ticksPerWheel,
+ long maxPendingTimeouts) {
+
+ if (threadFactory == null) {
+ throw new NullPointerException("threadFactory");
+ }
+ if (unit == null) {
+ throw new NullPointerException("unit");
+ }
+ if (tickDuration <= 0) {
+ throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
+ }
+ if (ticksPerWheel <= 0) {
+ throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
+ }
+
+ // Normalize ticksPerWheel to power of two and initialize the wheel.
+ wheel = createWheel(ticksPerWheel);
+ mask = wheel.length - 1;
+
+ // Convert tickDuration to nanos.
+ this.tickDuration = unit.toNanos(tickDuration);
+
+ // Prevent overflow.
+ if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
+ throw new IllegalArgumentException(String.format(
+ "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
+ tickDuration, Long.MAX_VALUE / wheel.length));
+ }
+ workerThread = threadFactory.newThread(worker);
+
+ this.maxPendingTimeouts = maxPendingTimeouts;
+
+ if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
+ WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
+ reportTooManyInstances();
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ try {
+ super.finalize();
+ } finally {
+ // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
+ // we have not yet shutdown then we want to make sure we decrement the active instance count.
+ if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
+ INSTANCE_COUNTER.decrementAndGet();
+ }
+ }
+ }
+
+ private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
+ if (ticksPerWheel <= 0) {
+ throw new IllegalArgumentException(
+ "ticksPerWheel must be greater than 0: " + ticksPerWheel);
+ }
+ if (ticksPerWheel > 1073741824) {
+ throw new IllegalArgumentException(
+ "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
+ }
+
+ ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
+ HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
+ for (int i = 0; i < wheel.length; i++) {
+ wheel[i] = new HashedWheelBucket();
+ }
+ return wheel;
+ }
+
+ private static int normalizeTicksPerWheel(int ticksPerWheel) {
+ int normalizedTicksPerWheel = 1;
+ while (normalizedTicksPerWheel < ticksPerWheel) {
+ normalizedTicksPerWheel <<= 1;
+ }
+ return normalizedTicksPerWheel;
+ }
+
+ /**
+ * Starts the background thread explicitly. The background thread will
+ * start automatically on demand even if you did not call this method.
+ *
+ * @throws IllegalStateException if this timer has been
+ * {@linkplain #stop() stopped} already
+ */
+ public void start() {
+ switch (WORKER_STATE_UPDATER.get(this)) {
+ case WORKER_STATE_INIT:
+ if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
+ workerThread.start();
+ }
+ break;
+ case WORKER_STATE_STARTED:
+ break;
+ case WORKER_STATE_SHUTDOWN:
+ throw new IllegalStateException("cannot be started once stopped");
+ default:
+ throw new Error("Invalid WorkerState");
+ }
+
+ // Wait until the startTime is initialized by the worker.
+ while (startTime == 0) {
+ try {
+ startTimeInitialized.await();
+ } catch (InterruptedException ignore) {
+ // Ignore - it will be ready very soon.
+ }
+ }
+ }
+
+ @Override
+ public Set stop() {
+ if (Thread.currentThread() == workerThread) {
+ throw new IllegalStateException(
+ HashedWheelTimer.class.getSimpleName() +
+ ".stop() cannot be called from " +
+ TimerTask.class.getSimpleName());
+ }
+
+ if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
+ // workerState can be 0 or 2 at this moment - let it always be 2.
+ if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
+ INSTANCE_COUNTER.decrementAndGet();
+ }
+
+ return Collections.emptySet();
+ }
+
+ try {
+ boolean interrupted = false;
+ while (workerThread.isAlive()) {
+ workerThread.interrupt();
+ try {
+ workerThread.join(100);
+ } catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ } finally {
+ INSTANCE_COUNTER.decrementAndGet();
+ }
+ return worker.unprocessedTimeouts();
+ }
+
+ @Override
+ public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
+ if (task == null) {
+ throw new NullPointerException("task");
+ }
+ if (unit == null) {
+ throw new NullPointerException("unit");
+ }
+
+ long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
+
+ if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
+ pendingTimeouts.decrementAndGet();
+ throw new RejectedExecutionException("Number of pending timeouts ("
+ + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ + "timeouts (" + maxPendingTimeouts + ")");
+ }
+
+ start();
+
+ // Add the timeout to the timeout queue which will be processed on the next tick.
+ // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
+ long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
+
+ // Guard against overflow.
+ if (delay > 0 && deadline < 0) {
+ deadline = Long.MAX_VALUE;
+ }
+ HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
+ timeouts.add(timeout);
+ return timeout;
+ }
+
+ /**
+ * Returns the number of pending timeouts of this {@link Timer}.
+ */
+ public long pendingTimeouts() {
+ return pendingTimeouts.get();
+ }
+
+ private static void reportTooManyInstances() {
+ String resourceType = StringUtils.simpleClassName(HashedWheelTimer.class);
+ logger.error("You are creating too many " + resourceType + " instances. " +
+ resourceType + " is a shared resource that must be reused across the JVM," +
+ "so that only a few instances are created.");
+ }
+
+ private final class Worker implements Runnable {
+ private final Set unprocessedTimeouts = new HashSet();
+
+ private long tick;
+
+ @Override
+ public void run() {
+ // Initialize the startTime.
+ startTime = System.nanoTime();
+ if (startTime == 0) {
+ // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
+ startTime = 1;
+ }
+
+ // Notify the other threads waiting for the initialization at start().
+ startTimeInitialized.countDown();
+
+ do {
+ final long deadline = waitForNextTick();
+ if (deadline > 0) {
+ int idx = (int) (tick & mask);
+ processCancelledTasks();
+ HashedWheelBucket bucket =
+ wheel[idx];
+ transferTimeoutsToBuckets();
+ bucket.expireTimeouts(deadline);
+ tick++;
+ }
+ } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
+
+ // Fill the unprocessedTimeouts so we can return them from stop() method.
+ for (HashedWheelBucket bucket : wheel) {
+ bucket.clearTimeouts(unprocessedTimeouts);
+ }
+ for (; ; ) {
+ HashedWheelTimeout timeout = timeouts.poll();
+ if (timeout == null) {
+ break;
+ }
+ if (!timeout.isCancelled()) {
+ unprocessedTimeouts.add(timeout);
+ }
+ }
+ processCancelledTasks();
+ }
+
+ private void transferTimeoutsToBuckets() {
+ // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
+ // adds new timeouts in a loop.
+ for (int i = 0; i < 100000; i++) {
+ HashedWheelTimeout timeout = timeouts.poll();
+ if (timeout == null) {
+ // all processed
+ break;
+ }
+ if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
+ // Was cancelled in the meantime.
+ continue;
+ }
+
+ long calculated = timeout.deadline / tickDuration;
+ timeout.remainingRounds = (calculated - tick) / wheel.length;
+
+ // Ensure we don't schedule for past.
+ final long ticks = Math.max(calculated, tick);
+ int stopIndex = (int) (ticks & mask);
+
+ HashedWheelBucket bucket = wheel[stopIndex];
+ bucket.addTimeout(timeout);
+ }
+ }
+
+ private void processCancelledTasks() {
+ for (; ; ) {
+ HashedWheelTimeout timeout = cancelledTimeouts.poll();
+ if (timeout == null) {
+ // all processed
+ break;
+ }
+ try {
+ timeout.remove();
+ } catch (Throwable t) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("An exception was thrown while process a cancellation task", t);
+ }
+ }
+ }
+ }
+
+ /**
+ * calculate goal nanoTime from startTime and current tick number,
+ * then wait until that goal has been reached.
+ *
+ * @return Long.MIN_VALUE if received a shutdown request,
+ * current time otherwise (with Long.MIN_VALUE changed by +1)
+ */
+ private long waitForNextTick() {
+ long deadline = tickDuration * (tick + 1);
+
+ for (; ; ) {
+ final long currentTime = System.nanoTime() - startTime;
+ long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
+
+ if (sleepTimeMs <= 0) {
+ if (currentTime == Long.MIN_VALUE) {
+ return -Long.MAX_VALUE;
+ } else {
+ return currentTime;
+ }
+ }
+ if (isWindows()) {
+ sleepTimeMs = sleepTimeMs / 10 * 10;
+ }
+
+ try {
+ Thread.sleep(sleepTimeMs);
+ } catch (InterruptedException ignored) {
+ if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
+ return Long.MIN_VALUE;
+ }
+ }
+ }
+ }
+
+ Set unprocessedTimeouts() {
+ return Collections.unmodifiableSet(unprocessedTimeouts);
+ }
+ }
+
+ private static final class HashedWheelTimeout implements Timeout {
+
+ private static final int ST_INIT = 0;
+ private static final int ST_CANCELLED = 1;
+ private static final int ST_EXPIRED = 2;
+ private static final AtomicIntegerFieldUpdater STATE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
+
+ private final HashedWheelTimer timer;
+ private final TimerTask task;
+ private final long deadline;
+
+ @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
+ private volatile int state = ST_INIT;
+
+ /**
+ * RemainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
+ * HashedWheelTimeout will be added to the correct HashedWheelBucket.
+ */
+ long remainingRounds;
+
+ /**
+ * This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
+ * As only the workerThread will act on it there is no need for synchronization / volatile.
+ */
+ HashedWheelTimeout next;
+ HashedWheelTimeout prev;
+
+ /**
+ * The bucket to which the timeout was added
+ */
+ HashedWheelBucket bucket;
+
+ HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
+ this.timer = timer;
+ this.task = task;
+ this.deadline = deadline;
+ }
+
+ @Override
+ public Timer timer() {
+ return timer;
+ }
+
+ @Override
+ public TimerTask task() {
+ return task;
+ }
+
+ @Override
+ public boolean cancel() {
+ // only update the state it will be removed from HashedWheelBucket on next tick.
+ if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
+ return false;
+ }
+ // If a task should be canceled we put this to another queue which will be processed on each tick.
+ // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
+ // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
+ timer.cancelledTimeouts.add(this);
+ return true;
+ }
+
+ void remove() {
+ HashedWheelBucket bucket = this.bucket;
+ if (bucket != null) {
+ bucket.remove(this);
+ } else {
+ timer.pendingTimeouts.decrementAndGet();
+ }
+ }
+
+ public boolean compareAndSetState(int expected, int state) {
+ return STATE_UPDATER.compareAndSet(this, expected, state);
+ }
+
+ public int state() {
+ return state;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return state() == ST_CANCELLED;
+ }
+
+ @Override
+ public boolean isExpired() {
+ return state() == ST_EXPIRED;
+ }
+
+ public void expire() {
+ if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
+ return;
+ }
+
+ try {
+ task.run(this);
+ } catch (Throwable t) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ final long currentTime = System.nanoTime();
+ long remaining = deadline - currentTime + timer.startTime;
+ String simpleClassName = StringUtils.simpleClassName(this.getClass());
+
+ StringBuilder buf = new StringBuilder(192)
+ .append(simpleClassName)
+ .append('(')
+ .append("deadline: ");
+ if (remaining > 0) {
+ buf.append(remaining)
+ .append(" ns later");
+ } else if (remaining < 0) {
+ buf.append(-remaining)
+ .append(" ns ago");
+ } else {
+ buf.append("now");
+ }
+
+ if (isCancelled()) {
+ buf.append(", cancelled");
+ }
+
+ return buf.append(", task: ")
+ .append(task())
+ .append(')')
+ .toString();
+ }
+ }
+
+ /**
+ * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
+ * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
+ * extra object creation is needed.
+ */
+ private static final class HashedWheelBucket {
+
+ /**
+ * Used for the linked-list datastructure
+ */
+ private HashedWheelTimeout head;
+ private HashedWheelTimeout tail;
+
+ /**
+ * Add {@link HashedWheelTimeout} to this bucket.
+ */
+ void addTimeout(HashedWheelTimeout timeout) {
+ assert timeout.bucket == null;
+ timeout.bucket = this;
+ if (head == null) {
+ head = tail = timeout;
+ } else {
+ tail.next = timeout;
+ timeout.prev = tail;
+ tail = timeout;
+ }
+ }
+
+ /**
+ * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
+ */
+ void expireTimeouts(long deadline) {
+ HashedWheelTimeout timeout = head;
+
+ // process all timeouts
+ while (timeout != null) {
+ HashedWheelTimeout next = timeout.next;
+ if (timeout.remainingRounds <= 0) {
+ next = remove(timeout);
+ if (timeout.deadline <= deadline) {
+ timeout.expire();
+ } else {
+ // The timeout was placed into a wrong slot. This should never happen.
+ throw new IllegalStateException(String.format(
+ "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
+ }
+ } else if (timeout.isCancelled()) {
+ next = remove(timeout);
+ } else {
+ timeout.remainingRounds--;
+ }
+ timeout = next;
+ }
+ }
+
+ public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
+ HashedWheelTimeout next = timeout.next;
+ // remove timeout that was either processed or cancelled by updating the linked-list
+ if (timeout.prev != null) {
+ timeout.prev.next = next;
+ }
+ if (timeout.next != null) {
+ timeout.next.prev = timeout.prev;
+ }
+
+ if (timeout == head) {
+ // if timeout is also the tail we need to adjust the entry too
+ if (timeout == tail) {
+ tail = null;
+ head = null;
+ } else {
+ head = next;
+ }
+ } else if (timeout == tail) {
+ // if the timeout is the tail modify the tail to be the prev node.
+ tail = timeout.prev;
+ }
+ // null out prev, next and bucket to allow for GC.
+ timeout.prev = null;
+ timeout.next = null;
+ timeout.bucket = null;
+ timeout.timer.pendingTimeouts.decrementAndGet();
+ return next;
+ }
+
+ /**
+ * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
+ */
+ void clearTimeouts(Set set) {
+ for (; ; ) {
+ HashedWheelTimeout timeout = pollTimeout();
+ if (timeout == null) {
+ return;
+ }
+ if (timeout.isExpired() || timeout.isCancelled()) {
+ continue;
+ }
+ set.add(timeout);
+ }
+ }
+
+ private HashedWheelTimeout pollTimeout() {
+ HashedWheelTimeout head = this.head;
+ if (head == null) {
+ return null;
+ }
+ HashedWheelTimeout next = head.next;
+ if (next == null) {
+ tail = this.head = null;
+ } else {
+ this.head = next;
+ next.prev = null;
+ }
+
+ // null out prev and next to allow for GC.
+ head.next = null;
+ head.prev = null;
+ head.bucket = null;
+ return head;
+ }
+ }
+
+ private boolean isWindows() {
+ return System.getProperty("os.name", "").toLowerCase(Locale.US).contains("win");
+ }
+}
\ No newline at end of file
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timeout.java b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timeout.java
new file mode 100644
index 00000000000..3d4c2318f2f
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timeout.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.common.timer;
+
+/**
+ * A handle associated with a {@link TimerTask} that is returned by a
+ * {@link Timer}.
+ */
+public interface Timeout {
+
+ /**
+ * Returns the {@link Timer} that created this handle.
+ */
+ Timer timer();
+
+ /**
+ * Returns the {@link TimerTask} which is associated with this handle.
+ */
+ TimerTask task();
+
+ /**
+ * Returns {@code true} if and only if the {@link TimerTask} associated
+ * with this handle has been expired.
+ */
+ boolean isExpired();
+
+ /**
+ * Returns {@code true} if and only if the {@link TimerTask} associated
+ * with this handle has been cancelled.
+ */
+ boolean isCancelled();
+
+ /**
+ * Attempts to cancel the {@link TimerTask} associated with this handle.
+ * If the task has been executed or cancelled already, it will return with
+ * no side effect.
+ *
+ * @return True if the cancellation completed successfully, otherwise false
+ */
+ boolean cancel();
+}
\ No newline at end of file
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timer.java b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timer.java
new file mode 100644
index 00000000000..9e87059124f
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timer.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.common.timer;
+
+import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Schedules {@link TimerTask}s for one-time future execution in a background
+ * thread.
+ */
+public interface Timer {
+
+ /**
+ * Schedules the specified {@link TimerTask} for one-time execution after
+ * the specified delay.
+ *
+ * @return a handle which is associated with the specified task
+ * @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already
+ * @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
+ * can cause instability in the system.
+ */
+ Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
+
+ /**
+ * Releases all resources acquired by this {@link Timer} and cancels all
+ * tasks which were scheduled but not executed yet.
+ *
+ * @return the handles associated with the tasks which were canceled by
+ * this method
+ */
+ Set stop();
+}
\ No newline at end of file
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/timer/TimerTask.java b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/TimerTask.java
new file mode 100644
index 00000000000..ce818e1c6ce
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/TimerTask.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.common.timer;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A task which is executed after the delay specified with
+ * {@link Timer#newTimeout(TimerTask, long, TimeUnit)} (TimerTask, long, TimeUnit)}.
+ */
+public interface TimerTask {
+
+ /**
+ * Executed after the delay specified with
+ * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
+ *
+ * @param timeout a handle which is associated with this task
+ */
+ void run(Timeout timeout) throws Exception;
+}
\ No newline at end of file
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/StringUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/StringUtils.java
index fed75258b68..f403f2f55c1 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/StringUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/StringUtils.java
@@ -47,6 +47,8 @@ public final class StringUtils {
private static final Pattern INT_PATTERN = Pattern.compile("^\\d+$");
private static final int PAD_LIMIT = 8192;
+ private static final char PACKAGE_SEPARATOR_CHAR = '.';
+
private StringUtils() {
}
@@ -718,4 +720,16 @@ public static String toArgumentString(Object[] args) {
}
return buf.toString();
}
+
+ public static String simpleClassName(Class> clazz) {
+ if (clazz == null) {
+ throw new NullPointerException("clazz");
+ }
+ String className = clazz.getName();
+ final int lastDotIdx = className.lastIndexOf(PACKAGE_SEPARATOR_CHAR);
+ if (lastDotIdx > -1) {
+ return className.substring(lastDotIdx + 1);
+ }
+ return className;
+ }
}
\ No newline at end of file
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/timer/HashedWheelTimerTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/timer/HashedWheelTimerTest.java
new file mode 100644
index 00000000000..15bd5c52a75
--- /dev/null
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/timer/HashedWheelTimerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.timer;
+
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.concurrent.TimeUnit;
+
+public class HashedWheelTimerTest {
+
+ private class PrintTask implements TimerTask {
+
+ @Override
+ public void run(Timeout timeout) {
+ final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ System.out.println("task :" + LocalDateTime.now().format(formatter));
+ }
+ }
+
+ @Test
+ public void newTimeout() throws InterruptedException {
+ final Timer timer = newTimer();
+ for (int i = 0; i < 10; i++) {
+ timer.newTimeout(new PrintTask(), 1, TimeUnit.SECONDS);
+ Thread.sleep(1000);
+ }
+ Thread.sleep(5000);
+ }
+
+ @Test
+ public void stop() throws InterruptedException {
+ final Timer timer = newTimer();
+ for (int i = 0; i < 10; i++) {
+ timer.newTimeout(new PrintTask(), 5, TimeUnit.SECONDS);
+ Thread.sleep(100);
+ }
+ //stop timer
+ timer.stop();
+
+ try {
+ //this will throw a exception
+ timer.newTimeout(new PrintTask(), 5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private Timer newTimer() {
+ return new HashedWheelTimer(
+ new NamedThreadFactory("dubbo-future-timeout", true),
+ 100,
+ TimeUnit.MILLISECONDS);
+ }
+}
\ No newline at end of file
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index 51e10dc9b56..ae9c95bd7e5 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -19,6 +19,11 @@
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.timer.HashedWheelTimer;
+import org.apache.dubbo.common.timer.Timeout;
+import org.apache.dubbo.common.timer.Timer;
+import org.apache.dubbo.common.timer.TimerTask;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.TimeoutException;
@@ -47,11 +52,10 @@ public class DefaultFuture implements ResponseFuture {
private static final Map FUTURES = new ConcurrentHashMap();
- static {
- Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
- th.setDaemon(true);
- th.start();
- }
+ public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
+ new NamedThreadFactory("dubbo-future-timeout", true),
+ 30,
+ TimeUnit.MILLISECONDS);
// invoke id.
private final long id;
@@ -65,7 +69,7 @@ public class DefaultFuture implements ResponseFuture {
private volatile Response response;
private volatile ResponseCallback callback;
- public DefaultFuture(Channel channel, Request request, int timeout) {
+ private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
@@ -75,6 +79,20 @@ public DefaultFuture(Channel channel, Request request, int timeout) {
CHANNELS.put(id, channel);
}
+ /**
+ * check time out of the future
+ */
+ private static void timeoutCheck(DefaultFuture future) {
+ TimeoutCheckTask task = new TimeoutCheckTask(future);
+ TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
+ }
+
+ public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
+ final DefaultFuture defaultFuture = new DefaultFuture(channel, request, timeout);
+ timeoutCheck(defaultFuture);
+ return defaultFuture;
+ }
+
public static DefaultFuture getFuture(long id) {
return FUTURES.get(id);
}
@@ -174,6 +192,29 @@ public void setCallback(ResponseCallback callback) {
}
}
+ private static class TimeoutCheckTask implements TimerTask {
+
+ private DefaultFuture future;
+
+ TimeoutCheckTask(DefaultFuture future) {
+ this.future = future;
+ }
+
+ @Override
+ public void run(Timeout timeout) {
+ if (future == null || future.isDone()) {
+ return;
+ }
+ // create exception response.
+ Response timeoutResponse = new Response(future.getId());
+ // set timeout status.
+ timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
+ timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
+ // handle response.
+ DefaultFuture.received(future.getChannel(), timeoutResponse);
+ }
+ }
+
private void invokeCallback(ResponseCallback c) {
ResponseCallback callbackCopy = c;
if (callbackCopy == null) {
@@ -277,33 +318,4 @@ private String getTimeoutMessage(boolean scan) {
+ timeout + " ms, request: " + request + ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress();
}
-
- private static class RemotingInvocationTimeoutScan implements Runnable {
-
- @Override
- public void run() {
- while (true) {
- try {
- for (DefaultFuture future : FUTURES.values()) {
- if (future == null || future.isDone()) {
- continue;
- }
- if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
- // create exception response.
- Response timeoutResponse = new Response(future.getId());
- // set timeout status.
- timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
- timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
- // handle response.
- DefaultFuture.received(future.getChannel(), timeoutResponse);
- }
- }
- Thread.sleep(30);
- } catch (Throwable e) {
- logger.error("Exception when scan the timeout invocation of remoting.", e);
- }
- }
- }
- }
-
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
index 34cef2bc4e8..73503cd4c2d 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
@@ -111,7 +111,7 @@ public ResponseFuture request(Object request, int timeout) throws RemotingExcept
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
- DefaultFuture future = new DefaultFuture(channel, req, timeout);
+ DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
new file mode 100644
index 00000000000..d088a56f9f4
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.exchange.support;
+
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.TimeoutException;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.handler.MockedChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DefaultFutureTest {
+
+ private static final AtomicInteger index = new AtomicInteger();
+
+ @Test
+ public void newFuture() {
+ DefaultFuture future = defaultFuture(3000);
+ Assert.assertNotNull("new future return null", future);
+ }
+
+ @Test
+ public void isDone() {
+ DefaultFuture future = defaultFuture(3000);
+ Assert.assertTrue("init future is finished!", !future.isDone());
+
+ //cancel a future
+ future.cancel();
+ Assert.assertTrue("cancel a future failed!", future.isDone());
+ }
+
+ /**
+ * for example, it will print like this:
+ * before a future is create , time is : 2018-06-21 15:06:17
+ * after a future is timeout , time is : 2018-06-21 15:06:22
+ *
+ * The exception info print like:
+ * Sending request timeout in client-side by scan timer.
+ * start time: 2018-06-21 15:13:02.215, end time: 2018-06-21 15:13:07.231...
+ */
+ @Test
+ public void timeoutNotSend() throws Exception {
+ final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ System.out.println("before a future is create , time is : " + LocalDateTime.now().format(formatter));
+ // timeout after 5 seconds.
+ DefaultFuture f = defaultFuture(5000);
+ while (!f.isDone()) {
+ //spin
+ Thread.sleep(100);
+ }
+ System.out.println("after a future is timeout , time is : " + LocalDateTime.now().format(formatter));
+
+ // get operate will throw a timeout exception, because the future is timeout.
+ try {
+ f.get();
+ } catch (Exception e) {
+ Assert.assertTrue("catch exception is not timeout exception!", e instanceof TimeoutException);
+ System.out.println(e.getMessage());
+ }
+ }
+
+ /**
+ * for example, it will print like this:
+ * before a future is create , time is : 2018-06-21 15:11:31
+ * after a future is timeout , time is : 2018-06-21 15:11:36
+ *
+ * The exception info print like:
+ * Waiting server-side response timeout by scan timer.
+ * start time: 2018-06-21 15:12:38.337, end time: 2018-06-21 15:12:43.354...
+ */
+ @Test
+ public void timeoutSend() throws Exception {
+ final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ System.out.println("before a future is create , time is : " + LocalDateTime.now().format(formatter));
+ // timeout after 5 seconds.
+ Channel channel = new MockedChannel();
+ Request request = new Request(10);
+ DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000);
+ //mark the future is sent
+ DefaultFuture.sent(channel, request);
+ while (!f.isDone()) {
+ //spin
+ Thread.sleep(100);
+ }
+ System.out.println("after a future is timeout , time is : " + LocalDateTime.now().format(formatter));
+
+ // get operate will throw a timeout exception, because the future is timeout.
+ try {
+ f.get();
+ } catch (Exception e) {
+ Assert.assertTrue("catch exception is not timeout exception!", e instanceof TimeoutException);
+ System.out.println(e.getMessage());
+ }
+ }
+
+ /**
+ * mock a default future
+ */
+ private DefaultFuture defaultFuture(int timeout) {
+ Channel channel = new MockedChannel();
+ Request request = new Request(index.getAndIncrement());
+ return DefaultFuture.newFuture(channel, request, timeout);
+ }
+
+}
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java b/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java
index f5222c64579..0a372728c11 100644
--- a/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java
+++ b/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java
@@ -130,7 +130,7 @@ public void testDecodeReplyResponse() throws Exception {
Request request = createRequest();
- DefaultFuture future = new DefaultFuture(channel, request, 10);
+ DefaultFuture future = DefaultFuture.newFuture(channel, request, 10);
TMessage message = new TMessage("echoString", TMessageType.REPLY, ThriftCodec.getSeqId());
@@ -205,7 +205,7 @@ public void testDecodeExceptionResponse() throws Exception {
Request request = createRequest();
- DefaultFuture future = new DefaultFuture(channel, request, 10);
+ DefaultFuture future = DefaultFuture.newFuture(channel, request, 10);
TMessage message = new TMessage("echoString", TMessageType.EXCEPTION, ThriftCodec.getSeqId());
From 7500033b9d22e2162c5ab614d3233eb43aaeebd5 Mon Sep 17 00:00:00 2001
From: Ian Luo
Date: Wed, 22 Aug 2018 16:12:13 +0800
Subject: [PATCH 02/12] supplemental change for pull request 1973 (#2329)
* supplemental change for pull request 1973
* update LICENSE, to put all netty related together
---
LICENSE | 6 +++++-
.../dubbo/common/timer/HashedWheelTimer.java | 8 ++++----
.../apache/dubbo/common/utils/ClassHelper.java | 16 +++++++++++++++-
.../apache/dubbo/common/utils/StringUtils.java | 16 +---------------
4 files changed, 25 insertions(+), 21 deletions(-)
diff --git a/LICENSE b/LICENSE
index 42e2705ba00..b85622380cf 100644
--- a/LICENSE
+++ b/LICENSE
@@ -217,11 +217,15 @@ This product bundles and repackages the following code in Google Guava 16.0.1, w
* com.google.common.util.concurrent.ListenableFuture
* com.google.common.util.concurrent.ListenableFutureTask
-For the package org.apache.dubbo.common.threadlocal:
+For the package org.apache.dubbo.common.threadlocal and org.apache.dubbo.common.timer:
This product contains a modified portion of 'Netty', an event-driven asynchronous network application framework also
under a "Apache License 2.0" license, see https://github.com/netty/netty/blob/4.1/LICENSE.txt:
* io.netty.util.concurrent.FastThreadLocal
* io.netty.util.internal.InternalThreadLocalMap
+ * io.netty.util.Timer
+ * io.netty.util.TimerTask
+ * io.netty.util.Timeout
+ * io.netty.util.HashedWheelTimer
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java
index db5c43fee97..a54cc70b9b4 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java
@@ -18,7 +18,7 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.common.utils.ClassHelper;
import java.util.Queue;
import java.util.Set;
@@ -408,7 +408,7 @@ public long pendingTimeouts() {
}
private static void reportTooManyInstances() {
- String resourceType = StringUtils.simpleClassName(HashedWheelTimer.class);
+ String resourceType = ClassHelper.simpleClassName(HashedWheelTimer.class);
logger.error("You are creating too many " + resourceType + " instances. " +
resourceType + " is a shared resource that must be reused across the JVM," +
"so that only a few instances are created.");
@@ -650,7 +650,7 @@ public void expire() {
public String toString() {
final long currentTime = System.nanoTime();
long remaining = deadline - currentTime + timer.startTime;
- String simpleClassName = StringUtils.simpleClassName(this.getClass());
+ String simpleClassName = ClassHelper.simpleClassName(this.getClass());
StringBuilder buf = new StringBuilder(192)
.append(simpleClassName)
@@ -802,4 +802,4 @@ private HashedWheelTimeout pollTimeout() {
private boolean isWindows() {
return System.getProperty("os.name", "").toLowerCase(Locale.US).contains("win");
}
-}
\ No newline at end of file
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ClassHelper.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ClassHelper.java
index 8ecb8f4045a..e38752fcabf 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ClassHelper.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ClassHelper.java
@@ -45,6 +45,8 @@ public class ClassHelper {
*/
private static final Map, Class>> primitiveWrapperTypeMap = new HashMap, Class>>(8);
+ private static final char PACKAGE_SEPARATOR_CHAR = '.';
+
static {
primitiveWrapperTypeMap.put(Boolean.class, boolean.class);
primitiveWrapperTypeMap.put(Byte.class, byte.class);
@@ -205,4 +207,16 @@ public static String toShortString(Object obj) {
return obj.getClass().getSimpleName() + "@" + System.identityHashCode(obj);
}
-}
\ No newline at end of file
+
+ public static String simpleClassName(Class> clazz) {
+ if (clazz == null) {
+ throw new NullPointerException("clazz");
+ }
+ String className = clazz.getName();
+ final int lastDotIdx = className.lastIndexOf(PACKAGE_SEPARATOR_CHAR);
+ if (lastDotIdx > -1) {
+ return className.substring(lastDotIdx + 1);
+ }
+ return className;
+ }
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/StringUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/StringUtils.java
index f403f2f55c1..ccc5a10ccc8 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/StringUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/StringUtils.java
@@ -47,8 +47,6 @@ public final class StringUtils {
private static final Pattern INT_PATTERN = Pattern.compile("^\\d+$");
private static final int PAD_LIMIT = 8192;
- private static final char PACKAGE_SEPARATOR_CHAR = '.';
-
private StringUtils() {
}
@@ -720,16 +718,4 @@ public static String toArgumentString(Object[] args) {
}
return buf.toString();
}
-
- public static String simpleClassName(Class> clazz) {
- if (clazz == null) {
- throw new NullPointerException("clazz");
- }
- String className = clazz.getName();
- final int lastDotIdx = className.lastIndexOf(PACKAGE_SEPARATOR_CHAR);
- if (lastDotIdx > -1) {
- return className.substring(lastDotIdx + 1);
- }
- return className;
- }
-}
\ No newline at end of file
+}
From fcd1af81fd2fd6ad55c78f281d7b29feb10a006a Mon Sep 17 00:00:00 2001
From: xujingfeng <250577914@qq.com>
Date: Wed, 22 Aug 2018 16:17:44 +0800
Subject: [PATCH 03/12] [Dubbo- support tag router feature] Add a new Router
implement -- TagRouter (#2228)
* tagRouter feature
* update dubbo.xsd
* remove reference router param
* add Unit Test
* rollback pom.xml for merge
* rollback pom.xml for merge
* fix checkstyle
* fix checkstyle
* fix unit test
* format import style
* add license&remove author info
* trigger again
---
.../rpc/cluster/router/tag/TagRouter.java | 109 +++++++++++
.../cluster/router/tag/TagRouterFactory.java | 32 ++++
...org.apache.dubbo.rpc.cluster.RouterFactory | 3 +-
.../rpc/cluster/router/tag/TagRouterTest.java | 169 ++++++++++++++++++
.../org/apache/dubbo/common/Constants.java | 4 +
.../dubbo/config/AbstractInterfaceConfig.java | 1 -
.../dubbo/config/AbstractReferenceConfig.java | 1 -
.../dubbo/config/AbstractServiceConfig.java | 10 ++
.../dubbo/config/annotation/Service.java | 5 +
.../dubbo/config/spring/AnnotationBean.java | 24 +--
.../main/resources/META-INF/compat/dubbo.xsd | 6 +
.../src/main/resources/META-INF/dubbo.xsd | 6 +
.../integration/RegistryDirectory.java | 8 +-
13 files changed, 360 insertions(+), 18 deletions(-)
create mode 100644 dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
create mode 100644 dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouterFactory.java
create mode 100644 dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouterTest.java
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
new file mode 100644
index 00000000000..38fd6166eff
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.tag;
+
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Router;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TagRouter
+ */
+public class TagRouter implements Router, Comparable {
+
+ private static final Logger logger = LoggerFactory.getLogger(TagRouter.class);
+
+ private final int priority;
+ private final URL url;
+
+ public static final URL ROUTER_URL = new URL("tag", Constants.ANYHOST_VALUE, 0, Constants.ANY_VALUE).addParameters(Constants.RUNTIME_KEY, "true");
+
+ public TagRouter(URL url) {
+ this.url = url;
+ this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
+ }
+
+ public TagRouter() {
+ this.url = ROUTER_URL;
+ this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
+ }
+
+ @Override
+ public URL getUrl() {
+ return url;
+ }
+
+ @Override
+ public List> route(List> invokers, URL url, Invocation invocation) throws RpcException {
+ // filter
+ List> result = new ArrayList<>();
+ try {
+ // Dynamic param
+ String tag = RpcContext.getContext().getAttachment(Constants.REQUEST_TAG_KEY);
+ // Tag request
+ if (!StringUtils.isEmpty(tag)) {
+ // Select tag invokers first
+ for (Invoker invoker : invokers) {
+ if (tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
+ result.add(invoker);
+ }
+ }
+ // If no invoker be selected, downgrade to normal invokers
+ if (result.isEmpty()) {
+ for (Invoker invoker : invokers) {
+ if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
+ result.add(invoker);
+ }
+ }
+ }
+ // Normal request
+ } else {
+ for (Invoker invoker : invokers) {
+ // Can't access tag invoker,only normal invoker should be selected
+ if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
+ result.add(invoker);
+ }
+ }
+ }
+ return result;
+ } catch (Exception e) {
+ logger.error("Route by tag error,return all invokers.", e);
+ }
+ // Downgrade to all invokers
+ return invokers;
+ }
+
+ @Override
+ public int compareTo(Router o) {
+ if (o == null || o.getClass() != TagRouter.class) {
+ return 1;
+ }
+ TagRouter c = (TagRouter) o;
+ return this.priority == c.priority ? url.toFullString().compareTo(c.url.toFullString()) : (this.priority > c.priority ? 1 : -1);
+ }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouterFactory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouterFactory.java
new file mode 100644
index 00000000000..05ad427ce6e
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouterFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.tag;
+
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.cluster.Router;
+import org.apache.dubbo.rpc.cluster.RouterFactory;
+
+public class TagRouterFactory implements RouterFactory {
+
+ public static final String NAME = "tag";
+
+ @Override
+ public Router getRouter(URL url) {
+ return new TagRouter(url);
+ }
+}
diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
index 0ada9c3be9c..2d4717cfaa3 100644
--- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
+++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
@@ -1,3 +1,4 @@
file=org.apache.dubbo.rpc.cluster.router.file.FileRouterFactory
script=org.apache.dubbo.rpc.cluster.router.script.ScriptRouterFactory
-condition=org.apache.dubbo.rpc.cluster.router.condition.ConditionRouterFactory
\ No newline at end of file
+condition=org.apache.dubbo.rpc.cluster.router.condition.ConditionRouterFactory
+tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory
\ No newline at end of file
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouterTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouterTest.java
new file mode 100644
index 00000000000..839af44efe9
--- /dev/null
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouterTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.tag;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.cluster.Router;
+import org.apache.dubbo.rpc.cluster.RouterFactory;
+import org.apache.dubbo.rpc.cluster.router.MockInvoker;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TagRouterTest {
+
+ private URL tagUrl = new URL("tag"
+ , Constants.ANYHOST_VALUE, 0
+ , Constants.ANY_VALUE)
+ .addParameters(
+ Constants.RUNTIME_KEY, "true"
+ );
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @Test
+ public void testRoute_matchTag() {
+
+ RpcContext.getContext().setAttachment(Constants.REQUEST_TAG_KEY, "red");
+
+ List> invokers = new ArrayList<>();
+ Invoker redInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.1:20880/com.foo.BarService?tag=red"));
+ Invoker yellowInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.2:20880/com.foo.BarService?tag=yellow"));
+ Invoker blueInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.3:20880/com.foo.BarService?tag=blue"));
+ Invoker defaultInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.4:20880/com.foo.BarService"));
+
+ invokers.add(redInvoker);
+ invokers.add(yellowInvoker);
+ invokers.add(blueInvoker);
+ invokers.add(defaultInvoker);
+
+ Router tagRouter = new TagRouterFactory().getRouter(tagUrl);
+ List> filteredInvokers = tagRouter.route(invokers, URL.valueOf("consumer://" + NetUtils.getLocalHost() + "/com.foo.BarService"), new RpcInvocation());
+ Assert.assertTrue(filteredInvokers.contains(redInvoker));
+ Assert.assertFalse(filteredInvokers.contains(yellowInvoker));
+ Assert.assertFalse(filteredInvokers.contains(blueInvoker));
+ Assert.assertFalse(filteredInvokers.contains(defaultInvoker));
+ }
+
+ @Test
+ public void testRoute_matchDefault() {
+
+ RpcContext.getContext().setAttachment(Constants.REQUEST_TAG_KEY, "");
+
+ List> invokers = new ArrayList<>();
+ Invoker redInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.1:20880/com.foo.BarService?tag=red"));
+ Invoker yellowInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.2:20880/com.foo.BarService?tag=yellow"));
+ Invoker blueInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.3:20880/com.foo.BarService?tag=blue"));
+ Invoker defaultInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.4:20880/com.foo.BarService"));
+
+ invokers.add(redInvoker);
+ invokers.add(yellowInvoker);
+ invokers.add(blueInvoker);
+ invokers.add(defaultInvoker);
+
+ Router tagRouter = new TagRouterFactory().getRouter(tagUrl);
+ List> filteredInvokers = tagRouter.route(invokers, URL.valueOf("consumer://" + NetUtils.getLocalHost() + "/com.foo.BarService"), new RpcInvocation());
+ Assert.assertTrue(filteredInvokers.contains(defaultInvoker));
+ Assert.assertFalse(filteredInvokers.contains(yellowInvoker));
+ Assert.assertFalse(filteredInvokers.contains(blueInvoker));
+ Assert.assertFalse(filteredInvokers.contains(redInvoker));
+ }
+
+ @Test
+ public void testRoute_requestWithTag_shouldDowngrade() {
+
+ RpcContext.getContext().setAttachment(Constants.REQUEST_TAG_KEY, "black");
+
+ List> invokers = new ArrayList<>();
+ Invoker redInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.1:20880/com.foo.BarService?tag=red"));
+ Invoker yellowInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.2:20880/com.foo.BarService?tag=yellow"));
+ Invoker blueInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.3:20880/com.foo.BarService?tag=blue"));
+ Invoker defaultInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.4:20880/com.foo.BarService"));
+
+ invokers.add(redInvoker);
+ invokers.add(yellowInvoker);
+ invokers.add(blueInvoker);
+ invokers.add(defaultInvoker);
+
+ Router tagRouter = new TagRouterFactory().getRouter(tagUrl);
+ List> filteredInvokers = tagRouter.route(invokers, URL.valueOf("consumer://" + NetUtils.getLocalHost() + "/com.foo.BarService"), new RpcInvocation());
+ Assert.assertTrue(filteredInvokers.contains(defaultInvoker));
+ Assert.assertFalse(filteredInvokers.contains(yellowInvoker));
+ Assert.assertFalse(filteredInvokers.contains(blueInvoker));
+ Assert.assertFalse(filteredInvokers.contains(redInvoker));
+ }
+
+ @Test
+ public void testRoute_requestWithoutTag_shouldNotDowngrade() {
+
+ RpcContext.getContext().setAttachment(Constants.REQUEST_TAG_KEY, "");
+
+ List> invokers = new ArrayList<>();
+ Invoker redInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.1:20880/com.foo.BarService?tag=red"));
+ Invoker yellowInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.2:20880/com.foo.BarService?tag=yellow"));
+ Invoker blueInvoker = new MockInvoker<>(URL.valueOf(
+ "dubbo://10.20.3.3:20880/com.foo.BarService?tag=blue"));
+
+ invokers.add(redInvoker);
+ invokers.add(yellowInvoker);
+ invokers.add(blueInvoker);
+
+ Router tagRouter = new TagRouterFactory().getRouter(tagUrl);
+ List> filteredInvokers = tagRouter.route(invokers, URL.valueOf("consumer://" + NetUtils.getLocalHost() + "/com.foo.BarService"), new RpcInvocation());
+ Assert.assertEquals(0, filteredInvokers.size());
+ }
+
+ @Test
+ public void testRoute_createBySpi() {
+ URL zkProvider = URL.valueOf("zookeeper://10.20.3.1:20880/com.foo.BarService?router=tag");
+ String parameter = zkProvider.getParameter(Constants.ROUTER_KEY);
+ RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(parameter);
+ Router tagRouter = routerFactory.getRouter(zkProvider);
+ Assert.assertTrue(tagRouter instanceof TagRouter);
+ }
+
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
index a1c8fac34c6..95069306907 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
@@ -641,6 +641,10 @@ public class Constants {
public static final String MULTICAST = "multicast";
+ public static final String TAG_KEY = "tag";
+
+ public static final String REQUEST_TAG_KEY = "request.tag";
+
/*
* private Constants(){ }
*/
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java
index 91a4a1641d6..70ab62e4493 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java
@@ -522,5 +522,4 @@ public String getScope() {
public void setScope(String scope) {
this.scope = scope;
}
-
}
\ No newline at end of file
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/AbstractReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/AbstractReferenceConfig.java
index 08f3ad5e97b..552ccf1e4a7 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/AbstractReferenceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/AbstractReferenceConfig.java
@@ -197,5 +197,4 @@ public void setGroup(String group) {
checkKey("group", group);
this.group = group;
}
-
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
index f23341fff17..4da1ffa89db 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
@@ -73,6 +73,9 @@ public abstract class AbstractServiceConfig extends AbstractInterfaceConfig {
// serialization
private String serialization;
+ // provider tag
+ protected String tag;
+
public String getVersion() {
return version;
}
@@ -240,4 +243,11 @@ public void setSerialization(String serialization) {
this.serialization = serialization;
}
+ public String getTag() {
+ return tag;
+ }
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/annotation/Service.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/annotation/Service.java
index bee838745e4..6f956f50c05 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/annotation/Service.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/annotation/Service.java
@@ -263,4 +263,9 @@
* Registry spring bean name
*/
String[] registry() default {};
+
+ /**
+ * Service tag name
+ */
+ String tag() default "";
}
diff --git a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/AnnotationBean.java b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/AnnotationBean.java
index 61513547c15..7612c3302a3 100644
--- a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/AnnotationBean.java
+++ b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/AnnotationBean.java
@@ -33,7 +33,6 @@
import org.apache.dubbo.config.ServiceConfig;
import org.apache.dubbo.config.annotation.Reference;
import org.apache.dubbo.config.annotation.Service;
-
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
@@ -76,7 +75,7 @@ public String getPackage() {
public void setPackage(String annotationPackage) {
this.annotationPackage = annotationPackage;
this.annotationPackages = (annotationPackage == null || annotationPackage.length() == 0) ? null
- : Constants.COMMA_SPLIT_PATTERN.split(annotationPackage);
+ : Constants.COMMA_SPLIT_PATTERN.split(annotationPackage);
}
@Override
@@ -86,7 +85,7 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
- throws BeansException {
+ throws BeansException {
if (annotationPackage == null || annotationPackage.length() == 0) {
return;
}
@@ -135,7 +134,7 @@ public void destroy() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
- throws BeansException {
+ throws BeansException {
if (!isMatchPackage(bean)) {
return bean;
}
@@ -144,7 +143,7 @@ public Object postProcessAfterInitialization(Object bean, String beanName)
ServiceBean