Skip to content

Commit

Permalink
1. none unit services prioritize routing to instances within the curr…
Browse files Browse the repository at this point in the history
…ent space

2. fix liveless inbound error.
  • Loading branch information
hexiaofeng committed Oct 24, 2024
1 parent 3df272c commit 977a990
Show file tree
Hide file tree
Showing 19 changed files with 240 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.jd.live.agent.governance.rule.tag.TagCondition;

import java.util.List;
import java.util.Set;

/**
* Represents an endpoint in a distributed system, providing methods to access its properties and match against tag conditions.
Expand Down Expand Up @@ -204,6 +205,16 @@ default String getGroup() {
return getLabel(Constants.LABEL_SERVICE_GROUP, PolicyId.DEFAULT_GROUP);
}

/**
* Checks if the live space ID is null or empty.
*
* @return true if the live space ID is null or empty, false otherwise
*/
default boolean isLiveless() {
String spaceId = getLiveSpaceId();
return spaceId == null || spaceId.isEmpty();
}

/**
* Determines if the live space ID matches the specified live space ID.
*
Expand All @@ -224,6 +235,20 @@ default boolean isUnit(String unit) {
return unit != null && unit.equals(getUnit());
}

/**
* Determines if the unit matches the specified units.
*
* @param units The units to match.
* @return true if the unit matches, false otherwise.
*/
default boolean isUnit(Set<String> units) {
if (units == null || units.isEmpty()) {
return false;
}
String unit = getUnit();
return unit != null && units.contains(unit);
}

/**
* Determines if the live space and unit match the specified live space ID and unit.
*
Expand All @@ -235,6 +260,17 @@ default boolean isUnit(String liveSpaceId, String unit) {
return isLiveSpace(liveSpaceId) && isUnit(unit);
}

/**
* Determines if the live space and unit match the specified live space ID and units.
*
* @param liveSpaceId The live space ID to match.
* @param units The units to match.
* @return true if both the live space ID and unit match, false otherwise.
*/
default boolean isUnit(String liveSpaceId, Set<String> units) {
return isLiveSpace(liveSpaceId) && isUnit(units);
}

/**
* Determines if the cell matches the specified cell.
*
Expand Down Expand Up @@ -305,8 +341,9 @@ default boolean isCloud(String cloud) {
*/
default boolean isGroup(String group) {
String label = getGroup();
if (group == null) {
return PolicyId.DEFAULT_GROUP.equals(label);
// default group
if (group == null || group.isEmpty() || PolicyId.DEFAULT_GROUP.equals(group)) {
return label == null || label.isEmpty() || PolicyId.DEFAULT_GROUP.equals(label);
} else {
return group.equals(label);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public class CellAction {
*/
private final String message;

public CellAction(CellActionType type) {
this(type, null);
}

/**
* Creates a new CellAction with the specified type and message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,50 @@ public void choose(Function<List<? extends Endpoint>, List<? extends Endpoint>>
}
}

/**
* Filters the list of endpoints based on the provided predicate.
*
* @param predicate The predicate to use for filtering.
* @return The filtered list of endpoints.
*/
public List<? extends Endpoint> filtrate(Predicate<Endpoint> predicate) {
filter(endpoints, predicate, -1, true);
return endpoints;
}

/**
* Filters the list of endpoints based on the provided predicate and maximum size.
*
* @param predicate The predicate to use for filtering.
* @param maxSize The maximum size of the list to return.
* @return The filtered list of endpoints.
*/
public List<? extends Endpoint> filtrate(Predicate<Endpoint> predicate, int maxSize) {
filter(endpoints, predicate, maxSize, true);
return endpoints;
}

/**
* Filters the list of endpoints based on the provided predicate and maximum size.
*
* @param predicate The predicate to use for filtering.
* @param maxSize The maximum size of the list to return.
* @param nullable Whether a null list is acceptable.
* @return The filtered list of endpoints.
*/
public List<? extends Endpoint> filtrate(Predicate<Endpoint> predicate, int maxSize, boolean nullable) {
filter(endpoints, predicate, maxSize, nullable);
return endpoints;
}

/**
* Filters the list of endpoints based on the provided predicate.
*
* @param predicate The predicate to use for filtering.
* @return The count of endpoints that matched the predicate.
*/
public int filter(Predicate<Endpoint> predicate) {
return filter(predicate, -1, false);
return filter(endpoints, predicate, -1, true);
}

/**
Expand All @@ -165,7 +201,7 @@ public int filter(Predicate<Endpoint> predicate) {
* @return The count of endpoints that matched the predicate.
*/
public int filter(Predicate<Endpoint> predicate, int maxSize) {
return filter(predicate, maxSize, false);
return filter(endpoints, predicate, maxSize, true);
}

/**
Expand All @@ -180,6 +216,16 @@ public int filter(Predicate<Endpoint> predicate, int maxSize, boolean nullable)
return filter(endpoints, predicate, maxSize, nullable);
}

/**
* Creates a new list containing elements from the original list that match the given predicate.
*
* @param predicate The predicate to use for filtering. If null, the method returns the original list.
* @return A new list containing the filtered endpoints. If the input list or predicate is null, returns the original list.
*/
public List<? extends Endpoint> tryCopy(Predicate<Endpoint> predicate) {
return tryCopy(endpoints, predicate, 0);
}

/**
* Creates a new list containing elements from the original list that match the given predicate,
* up to a specified maximum size.
Expand Down Expand Up @@ -243,7 +289,11 @@ public static <T extends Endpoint> int filter(List<T> endpoints, Predicate<Endpo

// Remove the remaining elements if any
if ((writeIndex > 0 || nullable) && writeIndex < size) {
endpoints.subList(writeIndex, size).clear();
if (writeIndex == 0) {
endpoints.clear();
} else {
endpoints.subList(writeIndex, size).clear();
}
}
return writeIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,19 @@ public <T extends InboundRequest> void filter(InboundInvocation<T> invocation, I
}

protected <T extends InboundRequest> CellAction cellAction(InboundInvocation<T> invocation) {
LiveMetadata liveMetadata = invocation.getLiveMetadata();
Unit localUnit = liveMetadata.getLocalUnit();
Cell localCell = liveMetadata.getLocalCell();
String variable = liveMetadata.getVariable();
UnitRule unitRule = liveMetadata.getRule();
if (unitRule == null) {
return new CellAction(CellActionType.FORWARD, null);
LiveMetadata metadata = invocation.getLiveMetadata();
if (metadata.getLocalSpace() == null) {
// liveless
return new CellAction(CellActionType.FORWARD);
}
UnitRoute unitRoute = localUnit == null ? null : unitRule.getUnitRoute(localUnit.getCode());
UnitRule rule = metadata.getRule();
if (rule == null) {
return new CellAction(CellActionType.FORWARD);
}
Unit localUnit = metadata.getLocalUnit();
Cell localCell = metadata.getLocalCell();
String variable = metadata.getVariable();
UnitRoute unitRoute = localUnit == null ? null : rule.getUnitRoute(localUnit.getCode());
CellRoute cellRoute = null;
if (unitRoute != null) {
cellRoute = unitRoute.getCellRouteByVariable(variable);
Expand All @@ -85,7 +89,7 @@ protected <T extends InboundRequest> CellAction cellAction(InboundInvocation<T>
}
if (invocation.isAccessible(localCell) && (cellRoute == null
|| !cellRoute.isEmpty() && invocation.isAccessible(cellRoute.getAccessMode()))) {
return new CellAction(CellActionType.FORWARD, null);
return new CellAction(CellActionType.FORWARD);
}
return new CellAction(CellActionType.FAILOVER, invocation.getError(FAILOVER_CELL_NOT_ACCESSIBLE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,15 @@ private <T extends InboundRequest> UnitAction unitAction(InboundInvocation<T> in
UnitAction action = validateSpace(invocation);
if (action != null) {
return action;
} else if (metadata.getLocalSpace() == null) {
// liveless
return onLiveless(invocation);
} else if (rule == null) {
return onMissingRule(invocation);
} else if (unitPolicy == UnitPolicy.NONE) {
return onNone(invocation);
} else if (metadata.getLocalUnit() == null) {
return onMissingLocalUnit(invocation);
} else if (unitPolicy == UnitPolicy.NONE) {
return onNone(invocation);
} else if (unitPolicy == UnitPolicy.CENTER) {
return onCenter(invocation);
} else if (unitPolicy == UnitPolicy.PREFER_LOCAL_UNIT) {
Expand Down Expand Up @@ -113,6 +116,16 @@ private <T extends InboundRequest> UnitAction validateSpace(InboundInvocation<T>
return null;
}

/**
* Handles the case when no other options are enabled.
*
* @param invocation the inbound invocation
* @return a UnitAction indicating the action to take
*/
private <T extends InboundRequest> UnitAction onLiveless(InboundInvocation<T> invocation) {
return new UnitAction(UnitActionType.FORWARD);
}

/**
* Handles the case when the local unit is missing.
*
Expand Down Expand Up @@ -190,7 +203,7 @@ private <T extends InboundRequest> UnitAction onPreferLocal(InboundInvocation<T>
private <T extends InboundRequest> UnitAction onCenter(InboundInvocation<T> invocation) {
Unit local = invocation.getLiveMetadata().getLocalUnit();
if (local.getType() == UnitType.CENTER) {
return invocation.isAccessible(local) ? new UnitAction(UnitActionType.FORWARD, null) :
return invocation.isAccessible(local) ? new UnitAction(UnitActionType.FORWARD) :
new UnitAction(UnitActionType.REJECT, invocation.getError(FAILOVER_UNIT_NOT_ACCESSIBLE));
} else {
return new UnitAction(UnitActionType.FAILOVER_CENTER, invocation.getError(REJECT_UNIT_NOT_CENTER));
Expand All @@ -205,7 +218,7 @@ private <T extends InboundRequest> UnitAction onCenter(InboundInvocation<T> invo
*/
private <T extends InboundRequest> UnitAction onNone(InboundInvocation<T> invocation) {
Unit local = invocation.getLiveMetadata().getLocalUnit();
return invocation.isAccessible(local) ? new UnitAction(UnitActionType.FORWARD, null) :
return invocation.isAccessible(local) ? new UnitAction(UnitActionType.FORWARD) :
new UnitAction(UnitActionType.FAILOVER, invocation.getError(FAILOVER_UNIT_NOT_ACCESSIBLE));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ public <T extends OutboundRequest> void filter(OutboundInvocation<T> invocation,
String group = serviceMetadata.getServiceGroup();
RouteTarget target = invocation.getRouteTarget();
if (group != null && !group.isEmpty()) {
target.filter(endpoint -> endpoint.isGroup(group), -1, true);
// target group
target.filter(endpoint -> endpoint.isGroup(group));
} else if (serviceConfig != null && !serviceConfig.isServiceGroupOpen()) {
target.filter(endpoint -> endpoint.isGroup(null), -1, true);
// default group
target.filter(endpoint -> endpoint.isGroup(null));
}
chain.filter(invocation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ public <T extends OutboundRequest> void filter(OutboundInvocation<T> invocation,
* @return true if a match is found and the filter is applied, false otherwise.
*/
private <T extends OutboundRequest> boolean match(OutboundInvocation<T> invocation, RoutePolicy policy) {
RouteTarget target = invocation.getRouteTarget();
for (TagRule rule : policy.getTagRules()) {
if (rule.match(invocation)) {
TagDestination destination = RandomWeight.choose(rule.getDestinations(), TagDestination::getWeight);
if (destination != null) {
invocation.getRouteTarget().filter(destination::match);
target.filter(destination::match);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.config.ServiceConfig;
import com.jd.live.agent.governance.instance.Endpoint;
import com.jd.live.agent.governance.instance.EndpointGroup;
import com.jd.live.agent.governance.instance.UnitGroup;
import com.jd.live.agent.governance.invoke.OutboundInvocation;
Expand All @@ -45,7 +44,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;

import static com.jd.live.agent.core.util.StringUtils.isEmpty;
import static com.jd.live.agent.governance.invoke.Invocation.*;

/**
Expand Down Expand Up @@ -106,30 +104,33 @@ private <T extends OutboundRequest> RouteTarget route(OutboundInvocation<T> invo
* @return The route target indicating the action to be taken (forward, reject, etc.).
*/
private RouteTarget routeNone(OutboundInvocation<?> invocation) {
RouteTarget target = invocation.getRouteTarget();
target.filter(e -> isEmpty(e.getLiveSpaceId()), 0, true);
List<? extends Endpoint> endpoints = target.getEndpoints();
UnitRule rule = invocation.getLiveMetadata().getRule();
LiveMetadata metadata = invocation.getLiveMetadata();
String targetSpaceId = metadata.getTargetSpaceId();
UnitRule rule = metadata.getRule();
List<UnitRoute> routes = rule == null ? null : rule.getUnitRoutes();
Set<String> units = getAvailableUnits(invocation, routes);
return RouteTarget.forward(invocation.getRouteTarget().filtrate(e -> e.isUnit(targetSpaceId, units) || e.isLiveless()));

}

/**
* Gets the available units for the given outbound invocation based on the provided routes.
*
* @param invocation the outbound invocation
* @param routes the list of unit routes
* @return a set of available unit codes, or null if no routes are provided or all units are inaccessible
*/
private Set<String> getAvailableUnits(OutboundInvocation<?> invocation, List<UnitRoute> routes) {
if (routes != null && !routes.isEmpty()) {
Set<String> availableUnits = new HashSet<>(routes.size());
for (UnitRoute route : routes) {
if (invocation.isAccessible(route.getUnit())) {
availableUnits.add(route.getCode());
}
}
if (availableUnits.isEmpty()) {
return RouteTarget.reject(invocation.getError(REJECT_UNIT_NOT_ACCESSIBLE));
} else if (routes.size() == 1) {
return RouteTarget.forward(endpoints, routes.get(0));
} else if (routes.size() != availableUnits.size()) {
RouteTarget.filter(endpoints, e -> availableUnits.contains(e.getUnit()));
if (availableUnits.size() == 1) {
return RouteTarget.forward(endpoints, rule.getUnitRoute(availableUnits.iterator().next()));
}
}
return availableUnits;
}
return RouteTarget.forward(endpoints);
return null;
}

/**
Expand All @@ -152,8 +153,7 @@ private RouteTarget routeCenter(OutboundInvocation<?> invocation) {
}
String targetSpaceId = metadata.getTargetSpaceId();
RouteTarget target = invocation.getRouteTarget();
target.filter(e -> e.isLiveSpace(targetSpaceId), 0, true);
return RouteTarget.forward(target.getEndpoints(), route);
return RouteTarget.forward(target.filtrate(e -> e.isLiveSpace(targetSpaceId)), route);
}

/**
Expand All @@ -174,8 +174,7 @@ private RouteTarget routeUnit(OutboundInvocation<?> invocation) {
}
String targetSpaceId = invocation.getLiveMetadata().getTargetSpaceId();
RouteTarget target = invocation.getRouteTarget();
target.filter(e -> e.isLiveSpace(targetSpaceId), 0, true);
return RouteTarget.forward(target.getEndpoints(), route);
return RouteTarget.forward(target.filtrate(e -> e.isLiveSpace(targetSpaceId)), route);
}

/**
Expand All @@ -186,9 +185,7 @@ private RouteTarget routeUnit(OutboundInvocation<?> invocation) {
*/
private RouteTarget routeLocal(OutboundInvocation<?> invocation) {
String targetSpaceId = invocation.getLiveMetadata().getTargetSpaceId();
RouteTarget routeTarget = invocation.getRouteTarget();
routeTarget.filter(e -> e.isLiveSpace(targetSpaceId), 0, true);
EndpointGroup group = new EndpointGroup(routeTarget.getEndpoints());
EndpointGroup group = new EndpointGroup(invocation.getRouteTarget().filtrate(e -> e.isLiveSpace(targetSpaceId)));
Election election = getPreferUnits(invocation, group);
List<Candidate> candidates = election.getCandidates();
if (election.isEmpty()) {
Expand Down
Loading

0 comments on commit 977a990

Please sign in to comment.