Skip to content

Commit

Permalink
Merge pull request #121
Browse files Browse the repository at this point in the history
1. none unit services prioritize routing to instances within the current space 2. fix liveless inbound error.
  • Loading branch information
chenzhiguo authored Oct 24, 2024
2 parents 3df272c + 5841ea8 commit f5dd188
Show file tree
Hide file tree
Showing 22 changed files with 261 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void labelRegistry(BiConsumer<String, String> consumer) {
labelInstance(consumer);
if (location != null) {
labelZone(consumer);
LabelLiveSpace(consumer);
labelLiveSpace(consumer);
labelLane(consumer);
}
labelService(consumer);
Expand All @@ -176,7 +176,7 @@ public void labelSync(BiConsumer<String, String> consumer) {
labelInstance(consumer);
if (location != null) {
labelZone(consumer);
LabelLiveSpace(consumer);
labelLiveSpace(consumer);
labelLane(consumer);
accept(consumer, Constants.LABEL_INSTANCE_IP, location.getIp());
}
Expand Down Expand Up @@ -228,8 +228,8 @@ private void labelZone(BiConsumer<String, String> consumer) {
*
* @param consumer the consumer to use for labeling
*/
private void LabelLiveSpace(BiConsumer<String, String> consumer) {
if (location.getLiveSpaceId() != null && !location.getLiveSpaceId().isEmpty()) {
private void labelLiveSpace(BiConsumer<String, String> consumer) {
if (!location.isLiveless()) {
accept(consumer, Constants.LABEL_LIVE_SPACE_ID, location.getLiveSpaceId());
accept(consumer, Constants.LABEL_RULE_ID, location.getUnitRuleId());
accept(consumer, Constants.LABEL_UNIT, location.getUnit());
Expand All @@ -243,7 +243,7 @@ private void LabelLiveSpace(BiConsumer<String, String> consumer) {
* @param consumer the consumer to use for labeling
*/
private void labelLane(BiConsumer<String, String> consumer) {
if (location.getLaneSpaceId() != null && !location.getLaneSpaceId().isEmpty()) {
if (!location.isLaneless()) {
accept(consumer, Constants.LABEL_LANE_SPACE_ID, location.getLaneSpaceId());
accept(consumer, Constants.LABEL_LANE, location.getLane());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,13 @@ public boolean inLane(String lane) {
public boolean inLane(String spaceId, String lane) {
return inLaneSpace(spaceId) && inLane(lane);
}

public boolean isLiveless() {
return liveSpaceId == null || liveSpaceId.isEmpty();
}

public boolean isLaneless() {
return laneSpaceId == null || laneSpaceId.isEmpty();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.jd.live.agent.governance.rule.tag.TagCondition;

import java.util.List;
import java.util.Set;

/**
* Represents an endpoint in a distributed system, providing methods to access its properties and match against tag conditions.
Expand Down Expand Up @@ -204,6 +205,16 @@ default String getGroup() {
return getLabel(Constants.LABEL_SERVICE_GROUP, PolicyId.DEFAULT_GROUP);
}

/**
* Checks if the live space ID is null or empty.
*
* @return true if the live space ID is null or empty, false otherwise
*/
default boolean isLiveless() {
String spaceId = getLiveSpaceId();
return spaceId == null || spaceId.isEmpty();
}

/**
* Determines if the live space ID matches the specified live space ID.
*
Expand All @@ -224,6 +235,20 @@ default boolean isUnit(String unit) {
return unit != null && unit.equals(getUnit());
}

/**
* Determines if the unit matches the specified units.
*
* @param units The units to match.
* @return true if the unit matches, false otherwise.
*/
default boolean isUnit(Set<String> units) {
if (units == null || units.isEmpty()) {
return false;
}
String unit = getUnit();
return unit != null && units.contains(unit);
}

/**
* Determines if the live space and unit match the specified live space ID and unit.
*
Expand All @@ -235,6 +260,17 @@ default boolean isUnit(String liveSpaceId, String unit) {
return isLiveSpace(liveSpaceId) && isUnit(unit);
}

/**
* Determines if the live space and unit match the specified live space ID and units.
*
* @param liveSpaceId The live space ID to match.
* @param units The units to match.
* @return true if both the live space ID and unit match, false otherwise.
*/
default boolean isUnit(String liveSpaceId, Set<String> units) {
return isLiveSpace(liveSpaceId) && isUnit(units);
}

/**
* Determines if the cell matches the specified cell.
*
Expand Down Expand Up @@ -305,8 +341,9 @@ default boolean isCloud(String cloud) {
*/
default boolean isGroup(String group) {
String label = getGroup();
if (group == null) {
return PolicyId.DEFAULT_GROUP.equals(label);
// default group
if (group == null || group.isEmpty() || PolicyId.DEFAULT_GROUP.equals(group)) {
return label == null || label.isEmpty() || PolicyId.DEFAULT_GROUP.equals(label);
} else {
return group.equals(label);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public class CellAction {
*/
private final String message;

public CellAction(CellActionType type) {
this(type, null);
}

/**
* Creates a new CellAction with the specified type and message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,50 @@ public void choose(Function<List<? extends Endpoint>, List<? extends Endpoint>>
}
}

/**
* Filters the list of endpoints based on the provided predicate.
*
* @param predicate The predicate to use for filtering.
* @return The filtered list of endpoints.
*/
public List<? extends Endpoint> filtrate(Predicate<Endpoint> predicate) {
filter(endpoints, predicate, -1, true);
return endpoints;
}

/**
* Filters the list of endpoints based on the provided predicate and maximum size.
*
* @param predicate The predicate to use for filtering.
* @param maxSize The maximum size of the list to return.
* @return The filtered list of endpoints.
*/
public List<? extends Endpoint> filtrate(Predicate<Endpoint> predicate, int maxSize) {
filter(endpoints, predicate, maxSize, true);
return endpoints;
}

/**
* Filters the list of endpoints based on the provided predicate and maximum size.
*
* @param predicate The predicate to use for filtering.
* @param maxSize The maximum size of the list to return.
* @param nullable Whether a null list is acceptable.
* @return The filtered list of endpoints.
*/
public List<? extends Endpoint> filtrate(Predicate<Endpoint> predicate, int maxSize, boolean nullable) {
filter(endpoints, predicate, maxSize, nullable);
return endpoints;
}

/**
* Filters the list of endpoints based on the provided predicate.
*
* @param predicate The predicate to use for filtering.
* @return The count of endpoints that matched the predicate.
*/
public int filter(Predicate<Endpoint> predicate) {
return filter(predicate, -1, false);
return filter(endpoints, predicate, -1, true);
}

/**
Expand All @@ -165,7 +201,7 @@ public int filter(Predicate<Endpoint> predicate) {
* @return The count of endpoints that matched the predicate.
*/
public int filter(Predicate<Endpoint> predicate, int maxSize) {
return filter(predicate, maxSize, false);
return filter(endpoints, predicate, maxSize, true);
}

/**
Expand All @@ -180,6 +216,16 @@ public int filter(Predicate<Endpoint> predicate, int maxSize, boolean nullable)
return filter(endpoints, predicate, maxSize, nullable);
}

/**
* Creates a new list containing elements from the original list that match the given predicate.
*
* @param predicate The predicate to use for filtering. If null, the method returns the original list.
* @return A new list containing the filtered endpoints. If the input list or predicate is null, returns the original list.
*/
public List<? extends Endpoint> tryCopy(Predicate<Endpoint> predicate) {
return tryCopy(endpoints, predicate, 0);
}

/**
* Creates a new list containing elements from the original list that match the given predicate,
* up to a specified maximum size.
Expand Down Expand Up @@ -243,7 +289,11 @@ public static <T extends Endpoint> int filter(List<T> endpoints, Predicate<Endpo

// Remove the remaining elements if any
if ((writeIndex > 0 || nullable) && writeIndex < size) {
endpoints.subList(writeIndex, size).clear();
if (writeIndex == 0) {
endpoints.clear();
} else {
endpoints.subList(writeIndex, size).clear();
}
}
return writeIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,19 @@ public <T extends InboundRequest> void filter(InboundInvocation<T> invocation, I
}

protected <T extends InboundRequest> CellAction cellAction(InboundInvocation<T> invocation) {
LiveMetadata liveMetadata = invocation.getLiveMetadata();
Unit localUnit = liveMetadata.getLocalUnit();
Cell localCell = liveMetadata.getLocalCell();
String variable = liveMetadata.getVariable();
UnitRule unitRule = liveMetadata.getRule();
if (unitRule == null) {
return new CellAction(CellActionType.FORWARD, null);
LiveMetadata metadata = invocation.getLiveMetadata();
if (metadata.isLocalLiveless()) {
// liveless
return new CellAction(CellActionType.FORWARD);
}
UnitRoute unitRoute = localUnit == null ? null : unitRule.getUnitRoute(localUnit.getCode());
UnitRule rule = metadata.getRule();
if (rule == null) {
return new CellAction(CellActionType.FORWARD);
}
Unit localUnit = metadata.getLocalUnit();
Cell localCell = metadata.getLocalCell();
String variable = metadata.getVariable();
UnitRoute unitRoute = localUnit == null ? null : rule.getUnitRoute(localUnit.getCode());
CellRoute cellRoute = null;
if (unitRoute != null) {
cellRoute = unitRoute.getCellRouteByVariable(variable);
Expand All @@ -85,7 +89,7 @@ protected <T extends InboundRequest> CellAction cellAction(InboundInvocation<T>
}
if (invocation.isAccessible(localCell) && (cellRoute == null
|| !cellRoute.isEmpty() && invocation.isAccessible(cellRoute.getAccessMode()))) {
return new CellAction(CellActionType.FORWARD, null);
return new CellAction(CellActionType.FORWARD);
}
return new CellAction(CellActionType.FAILOVER, invocation.getError(FAILOVER_CELL_NOT_ACCESSIBLE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,15 @@ private <T extends InboundRequest> UnitAction unitAction(InboundInvocation<T> in
UnitAction action = validateSpace(invocation);
if (action != null) {
return action;
} else if (metadata.isLocalLiveless()) {
// liveless
return onLiveless(invocation);
} else if (rule == null) {
return onMissingRule(invocation);
} else if (unitPolicy == UnitPolicy.NONE) {
return onNone(invocation);
} else if (metadata.getLocalUnit() == null) {
return onMissingLocalUnit(invocation);
} else if (unitPolicy == UnitPolicy.NONE) {
return onNone(invocation);
} else if (unitPolicy == UnitPolicy.CENTER) {
return onCenter(invocation);
} else if (unitPolicy == UnitPolicy.PREFER_LOCAL_UNIT) {
Expand Down Expand Up @@ -113,6 +116,16 @@ private <T extends InboundRequest> UnitAction validateSpace(InboundInvocation<T>
return null;
}

/**
* Handles the case when no other options are enabled.
*
* @param invocation the inbound invocation
* @return a UnitAction indicating the action to take
*/
private <T extends InboundRequest> UnitAction onLiveless(InboundInvocation<T> invocation) {
return new UnitAction(UnitActionType.FORWARD);
}

/**
* Handles the case when the local unit is missing.
*
Expand Down Expand Up @@ -190,7 +203,7 @@ private <T extends InboundRequest> UnitAction onPreferLocal(InboundInvocation<T>
private <T extends InboundRequest> UnitAction onCenter(InboundInvocation<T> invocation) {
Unit local = invocation.getLiveMetadata().getLocalUnit();
if (local.getType() == UnitType.CENTER) {
return invocation.isAccessible(local) ? new UnitAction(UnitActionType.FORWARD, null) :
return invocation.isAccessible(local) ? new UnitAction(UnitActionType.FORWARD) :
new UnitAction(UnitActionType.REJECT, invocation.getError(FAILOVER_UNIT_NOT_ACCESSIBLE));
} else {
return new UnitAction(UnitActionType.FAILOVER_CENTER, invocation.getError(REJECT_UNIT_NOT_CENTER));
Expand All @@ -205,7 +218,7 @@ private <T extends InboundRequest> UnitAction onCenter(InboundInvocation<T> invo
*/
private <T extends InboundRequest> UnitAction onNone(InboundInvocation<T> invocation) {
Unit local = invocation.getLiveMetadata().getLocalUnit();
return invocation.isAccessible(local) ? new UnitAction(UnitActionType.FORWARD, null) :
return invocation.isAccessible(local) ? new UnitAction(UnitActionType.FORWARD) :
new UnitAction(UnitActionType.FAILOVER, invocation.getError(FAILOVER_UNIT_NOT_ACCESSIBLE));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ public <T extends OutboundRequest> void filter(OutboundInvocation<T> invocation,
String group = serviceMetadata.getServiceGroup();
RouteTarget target = invocation.getRouteTarget();
if (group != null && !group.isEmpty()) {
target.filter(endpoint -> endpoint.isGroup(group), -1, true);
// target group
target.filter(endpoint -> endpoint.isGroup(group));
} else if (serviceConfig != null && !serviceConfig.isServiceGroupOpen()) {
target.filter(endpoint -> endpoint.isGroup(null), -1, true);
// default group
target.filter(endpoint -> endpoint.isGroup(null));
}
chain.filter(invocation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ public <T extends OutboundRequest> void filter(OutboundInvocation<T> invocation,
* @return true if a match is found and the filter is applied, false otherwise.
*/
private <T extends OutboundRequest> boolean match(OutboundInvocation<T> invocation, RoutePolicy policy) {
RouteTarget target = invocation.getRouteTarget();
for (TagRule rule : policy.getTagRules()) {
if (rule.match(invocation)) {
TagDestination destination = RandomWeight.choose(rule.getDestinations(), TagDestination::getWeight);
if (destination != null) {
invocation.getRouteTarget().filter(destination::match);
target.filter(destination::match);
}
return true;
}
Expand Down
Loading

0 comments on commit f5dd188

Please sign in to comment.