Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add occupy mechanism for future buckets of sliding window to support "prioritized requests" #568

Merged
merged 7 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public ClusterMetricLeapArray(int sampleCount, int intervalInMs) {
}

@Override
public ClusterMetricBucket newEmptyBucket() {
public ClusterMetricBucket newEmptyBucket(long timeMillis) {
return new ClusterMetricBucket();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public ClusterParameterLeapArray(int sampleCount, int intervalInMs, int maxCapac
}

@Override
public CacheMap<Object, C> newEmptyBucket() {
public CacheMap<Object, C> newEmptyBucket(long timeMillis) {
return new ConcurrentLinkedHashMapWrapper<>(maxCapacity);
}

Expand All @@ -48,5 +48,4 @@ protected WindowWrap<CacheMap<Object, C>> resetWindowTo(WindowWrap<CacheMap<Obje
return w;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* @author leyou
* @author Eric Zhao
*/
public interface Node extends DebugSupport {
public interface Node extends OccupySupport, DebugSupport {

/**
* Get incoming request per minute ({@code pass + block}).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.node;

/**
* @author Eric Zhao
* @since 1.5.0
*/
public interface OccupySupport {

/**
* Try to occupy latter time windows' tokens. If occupy success, a value less than
* {@code occupyTimeout} in {@link OccupyTimeoutProperty} will be return.
*
* <p>
* Each time we occupy tokens of the future window, current thread should sleep for the
* corresponding time for smoothing QPS. We can't occupy tokens of the future with unlimited,
* the sleep time limit is {@code occupyTimeout} in {@link OccupyTimeoutProperty}.
* </p>
*
* @param currentTime current time millis.
* @param acquireCount tokens count to acquire.
* @param threshold qps threshold.
* @return time should sleep. Time >= {@code occupyTimeout} in {@link OccupyTimeoutProperty} means
* occupy fail, in this case, the request should be rejected immediately.
*/
long tryOccupyNext(long currentTime, int acquireCount, double threshold);

/**
* Get current waiting amount. Useful for debug.
*
* @return current waiting amount
*/
long waiting();

/**
* Add request that occupied.
*
* @param futureTime future timestamp that the acquireCount should be added on.
* @param acquireCount tokens count.
*/
void addWaitingRequest(long futureTime, int acquireCount);

/**
* Add occupied pass request, which represents pass requests that borrow the latter windows' token.
*
* @param acquireCount tokens count.
*/
void addOccupiedPass(int acquireCount);

/**
* Get current occupied pass QPS.
*
* @return current occupied pass QPS
*/
double occupiedPassQps();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.node;

import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.property.SentinelProperty;
import com.alibaba.csp.sentinel.property.SimplePropertyListener;

/**
* @author jialiang.linjl
* @author Carpenter Lee
* @since 1.5.0
*/
public class OccupyTimeoutProperty {

/**
* <p>
* Max occupy timeout in milliseconds. Requests with priority can occupy tokens of the future statistic
* window, and {@code occupyTimeout} limit the max time length that can be occupied.
* </p>
* <p>
* Note that the timeout value should never be greeter than {@link IntervalProperty#INTERVAL}.
* </p>
* DO NOT MODIFY this value directly, use {@link #updateTimeout(int)},
* otherwise the modification will not take effect.
*/
private static volatile int occupyTimeout = 500;

public static void register2Property(SentinelProperty<Integer> property) {
property.addListener(new SimplePropertyListener<Integer>() {
@Override
public void configUpdate(Integer value) {
if (value != null) {
updateTimeout(value);
}
}
});
}

public static int getOccupyTimeout() {
return occupyTimeout;
}

/**
* Update the timeout value.</br>
* Note that the time out should never greeter than {@link IntervalProperty#INTERVAL},
* or it will be ignored.
*
* @param newInterval new value.
*/
public static void updateTimeout(int newInterval) {
if (newInterval < 0) {
RecordLog.warn("[OccupyTimeoutProperty] Illegal timeout value will be ignored: " + occupyTimeout);
return;
}
if (newInterval > IntervalProperty.INTERVAL) {
RecordLog.warn("[OccupyTimeoutProperty] Illegal timeout value will be ignored: " + occupyTimeout
+ ", should <= " + IntervalProperty.INTERVAL);
return;
}
if (newInterval != occupyTimeout) {
occupyTimeout = newInterval;
}
RecordLog.info("[OccupyTimeoutProperty] occupyTimeout updated to: " + occupyTimeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class StatisticNode implements Node {
* Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
* meaning each bucket per second, in this way we can get accurate statistics of each second.
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

/**
* The counter for thread count.
Expand All @@ -116,7 +116,7 @@ public Map<Long, MetricNode> metrics() {
// The fetch operation is thread-safe under a single-thread scheduler pool.
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
Map<Long, MetricNode> metrics = new ConcurrentHashMap<Long, MetricNode>();
Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();
List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details();
long newLastFetchTime = lastFetchTime;
// Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).
Expand All @@ -137,7 +137,7 @@ private boolean isNodeInTime(MetricNode node, long currentTime) {

private boolean isValidMetricNode(MetricNode node) {
return node.getPassQps() > 0 || node.getBlockQps() > 0 || node.getSuccessQps() > 0
|| node.getExceptionQps() > 0 || node.getRt() > 0;
|| node.getExceptionQps() > 0 || node.getRt() > 0 || node.getOccupiedPassQps() > 0;
}

@Override
Expand All @@ -151,11 +151,6 @@ public long totalRequest() {
return totalRequest;
}

@Override
public long totalPass() {
return rollingCounterInMinute.pass();
}

@Override
public long blockRequest() {
return rollingCounterInMinute.block();
Expand Down Expand Up @@ -201,6 +196,11 @@ public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}

@Override
public long totalPass() {
return rollingCounterInMinute.pass();
}

@Override
public double successQps() {
return rollingCounterInSecond.success() / rollingCounterInSecond.getWindowIntervalInSec();
Expand All @@ -211,6 +211,11 @@ public double maxSuccessQps() {
return rollingCounterInSecond.maxSuccess() * rollingCounterInSecond.getSampleCount();
}

@Override
public double occupiedPassQps() {
return rollingCounterInSecond.occupiedPass() / rollingCounterInSecond.getWindowIntervalInSec();
}

@Override
public double avgRt() {
long successCount = rollingCounterInSecond.success();
Expand Down Expand Up @@ -256,7 +261,6 @@ public void increaseBlockQps(int count) {
public void increaseExceptionQps(int count) {
rollingCounterInSecond.addException(count);
rollingCounterInMinute.addException(count);

}

@Override
Expand All @@ -271,6 +275,57 @@ public void decreaseThreadNum() {

@Override
public void debug() {
rollingCounterInSecond.debugQps();
rollingCounterInSecond.debug();
}

@Override
public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {
double maxCount = threshold * IntervalProperty.INTERVAL / 1000;
long currentBorrow = rollingCounterInSecond.waiting();
if (currentBorrow >= maxCount) {
return OccupyTimeoutProperty.getOccupyTimeout();
}

int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;
long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL;

int idx = 0;
/*
* Note: here {@code currentPass} may be less than it really is NOW, because time difference
* since call rollingCounterInSecond.pass(). So in high concurrency, the following code may
* lead more tokens be borrowed.
*/
long currentPass = rollingCounterInSecond.pass();
while (earliestTime < currentTime) {
long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;
if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {
break;
}
long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);
if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {
return waitInMs;
}
earliestTime += windowLength;
currentPass -= windowPass;
idx++;
}

return OccupyTimeoutProperty.getOccupyTimeout();
}

@Override
public long waiting() {
return rollingCounterInSecond.waiting();
}

@Override
public void addWaitingRequest(long futureTime, int acquireCount) {
rollingCounterInSecond.addWaiting(futureTime, acquireCount);
}

@Override
public void addOccupiedPass(int acquireCount) {
rollingCounterInMinute.addOccupiedPass(acquireCount);
rollingCounterInMinute.addPass(acquireCount);
}
}
Loading