Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make WorkerInfo enum (and some small refactors) #18460

Merged
merged 7 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions dora/core/common/src/main/java/alluxio/grpc/GrpcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import alluxio.wire.WorkerIdentity;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
import alluxio.wire.WorkerState;

import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
Expand Down Expand Up @@ -317,7 +318,8 @@ public static WorkerInfo fromProto(alluxio.grpc.WorkerInfo workerInfo) {
.setCapacityBytes(workerInfo.getCapacityBytes())
.setCapacityBytesOnTiers(workerInfo.getCapacityBytesOnTiers()).setId(workerInfo.getId())
.setLastContactSec(workerInfo.getLastContactSec())
.setStartTimeMs(workerInfo.getStartTimeMs()).setState(workerInfo.getState())
.setStartTimeMs(workerInfo.getStartTimeMs())
.setState(WorkerState.of(workerInfo.getState()))
.setUsedBytes(workerInfo.getUsedBytes())
.setUsedBytesOnTiers(workerInfo.getUsedBytesOnTiersMap())
.setVersion(workerInfo.getBuildVersion().getVersion())
Expand Down Expand Up @@ -569,7 +571,8 @@ public static alluxio.grpc.WorkerInfo toProto(WorkerInfo workerInfo) {
return alluxio.grpc.WorkerInfo.newBuilder().setId(workerInfo.getId())
.setIdentity(workerInfo.getIdentity().toProto())
.setAddress(toProto(workerInfo.getAddress()))
.setLastContactSec(workerInfo.getLastContactSec()).setState(workerInfo.getState())
.setLastContactSec(workerInfo.getLastContactSec())
.setState(workerInfo.getState().toString())
.setCapacityBytes(workerInfo.getCapacityBytes()).setUsedBytes(workerInfo.getUsedBytes())
.setStartTimeMs(workerInfo.getStartTimeMs())
.putAllCapacityBytesOnTiers(workerInfo.getCapacityBytesOnTiers())
Expand Down
32 changes: 0 additions & 32 deletions dora/core/common/src/main/java/alluxio/master/WorkerState.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public NodeInfo(WorkerInfo workerInfo) {
}
mWebPort = workerInfo.getAddress().getWebPort();
mLastContactSec = Integer.toString(workerInfo.getLastContactSec());
mWorkerState = workerInfo.getState();
mWorkerState = workerInfo.getState().toString();
mCapacityBytes = workerInfo.getCapacityBytes();
mUsedBytes = workerInfo.getUsedBytes();
if (mCapacityBytes != 0) {
Expand Down
39 changes: 36 additions & 3 deletions dora/core/common/src/main/java/alluxio/wire/WorkerInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public final class WorkerInfo implements Serializable {
private WorkerIdentity mIdentity;
private WorkerNetAddress mAddress = new WorkerNetAddress();
private int mLastContactSec;
private String mState = "";
private WorkerState mState = WorkerState.UNRECOGNIZED;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to assign a default value for worker state. A UNRECOGNIZED state means either the code that creates the WorkerInfo object did not set its state explicitly, or the worker is unknown to the membership service (e.g. never registered).

private long mCapacityBytes;
private long mUsedBytes;
private long mStartTimeMs;
Expand All @@ -43,6 +43,39 @@ public final class WorkerInfo implements Serializable {
private String mVersion = "";
private String mRevision = "";

/**
* Creates a new, empty instance.
*/
public WorkerInfo() {
}

/**
* Copy constructor.
*
* @param copyFrom instance to copy from
*/
public WorkerInfo(WorkerInfo copyFrom) {
mId = copyFrom.mId;
mIdentity = copyFrom.mIdentity; // identity is immutable so ok to reuse
mAddress = copyFrom.mAddress != null
? new WorkerNetAddress(copyFrom.mAddress)
: null;
mLastContactSec = copyFrom.mLastContactSec;
mState = copyFrom.mState;
mCapacityBytes = copyFrom.mCapacityBytes;
mUsedBytes = copyFrom.mUsedBytes;
mStartTimeMs = copyFrom.mStartTimeMs;
mCapacityBytesOnTiers = copyFrom.mCapacityBytesOnTiers != null
? new HashMap<>(copyFrom.mCapacityBytesOnTiers)
: null;
mUsedBytesOnTiers = copyFrom.mUsedBytesOnTiers != null
? new HashMap<>(copyFrom.mUsedBytesOnTiers)
: null;
mBlockCount = copyFrom.mBlockCount;
mVersion = copyFrom.mVersion;
mRevision = copyFrom.mRevision;
}

/**
* @return the worker id
*/
Expand Down Expand Up @@ -83,7 +116,7 @@ public int getLastContactSec() {
* @return the worker state
*/
@ApiModelProperty(value = "Operation state of the worker", example = "In Service")
public String getState() {
public WorkerState getState() {
return mState;
}

Expand Down Expand Up @@ -184,7 +217,7 @@ public WorkerInfo setLastContactSec(int lastContactSec) {
* @param state the worker state to use
* @return the worker information
*/
public WorkerInfo setState(String state) {
public WorkerInfo setState(WorkerState state) {
Preconditions.checkNotNull(state, "state");
mState = state;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,24 @@ public final class WorkerNetAddress implements Serializable {
/**
* Creates a new instance of {@link WorkerNetAddress}.
*/
public WorkerNetAddress() {}
public WorkerNetAddress() {
}

/**
* Copy constructor.
*
* @param copyFrom instance to copy from
*/
public WorkerNetAddress(WorkerNetAddress copyFrom) {
mHost = copyFrom.mHost;
mContainerHost = copyFrom.mContainerHost;
mRpcPort = copyFrom.mRpcPort;
mDataPort = copyFrom.mDataPort;
mSecureRpcPort = copyFrom.mSecureRpcPort;
mNettyDataPort = copyFrom.mNettyDataPort;
mWebPort = copyFrom.mWebPort;
mDomainSocketPath = copyFrom.mDomainSocketPath;
}

/**
* @return the secure rpc port
Expand Down
60 changes: 60 additions & 0 deletions dora/core/common/src/main/java/alluxio/wire/WorkerState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.wire;

/***
* The worker state maintained by master.
*/
public enum WorkerState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anchor: moved from dora/core/common/src/main/java/alluxio/master/WorkerState.java

LIVE("LIVE"),
LOST("LOST"),
DECOMMISSIONED("Decommissioned"),
DISABLED("Disabled"),
// an unknown worker which is not recognized by the cluster membership manager,
// e.g. a worker before it registers to the manager
UNRECOGNIZED("UNRECOGNIZED");

private final String mState;

WorkerState(String s) {
mState = s;
}

/**
* Converts from string to worker state.
*
* @param workerState string representation of worker state
* @return worker state
* @throws IllegalArgumentException if the state is unknown
*/
public static WorkerState of(String workerState) throws IllegalArgumentException {
switch (workerState) {
case "LIVE":
return LIVE;
case "LOST":
return LOST;
case "Decommissioned":
return DECOMMISSIONED;
case "Disabled":
return DISABLED;
case "UNRECOGNIZED":
return UNRECOGNIZED;
default:
throw new IllegalArgumentException("Unknown worker state: " + workerState);
}
}

@Override
public String toString() {
return mState;
}
}
41 changes: 40 additions & 1 deletion dora/core/common/src/test/java/alluxio/wire/WorkerInfoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,21 @@

package alluxio.wire;

import static org.junit.Assert.assertNotEquals;

import alluxio.Constants;
import alluxio.grpc.GrpcUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Defaults;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Assert;
import org.junit.Test;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -51,6 +57,39 @@ public void lastContactSecComparator() {
Assert.assertTrue(compareLostWorkersWithTimes(1, -1) > 0);
}

@Test
public void copyConstructor() throws IllegalAccessException {
WorkerInfo original = new WorkerInfo()
.setId(1)
.setIdentity(WorkerIdentityTestUtils.ofLegacyId(1))
.setAddress(new WorkerNetAddress().setHost("host1"))
.setBlockCount(1)
.setCapacityBytes(1)
.setUsedBytes(1)
.setCapacityBytesOnTiers(ImmutableMap.of())
.setUsedBytesOnTiers(ImmutableMap.of())
.setLastContactSec(1)
.setStartTimeMs(1)
.setState(WorkerState.LIVE)
.setRevision("rev1")
.setVersion("ver1");
WorkerInfo copied = new WorkerInfo(original);
// copied instance should contain exactly the same content
checkEquality(original, copied);
// mutate any non-final field in the copy,
// and the change should not be reflected in the original
for (Field field : WorkerInfo.class.getDeclaredFields()) {
int fieldModifiers = field.getModifiers();
if (Modifier.isStatic(fieldModifiers) || Modifier.isFinal(fieldModifiers)) {
continue;
}
field.setAccessible(true);
// set fields in the copy to their default value
field.set(copied, Defaults.defaultValue(field.getType()));
Comment on lines +83 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, works for me

assertNotEquals(field.getName(), field.get(original), field.get(copied));
}
}

public void checkEquality(WorkerInfo a, WorkerInfo b) {
Assert.assertEquals(a.getId(), b.getId());
Assert.assertEquals(a.getAddress(), b.getAddress());
Expand Down Expand Up @@ -88,7 +127,7 @@ public static WorkerInfo createRandom() {
capacityBytesOnTiers.put(Constants.MEDIUM_MEM, capacityBytes);
Map<String, Long> usedBytesOnTiers = new HashMap<>();
usedBytesOnTiers.put(Constants.MEDIUM_MEM, usedBytes);
String state = random.nextInt(2) == 1 ? "In Service" : "Out of Service";
WorkerState state = random.nextInt(2) == 1 ? WorkerState.LIVE : WorkerState.LOST;
String version = String.format("%d.%d.%d", random.nextInt(10),
random.nextInt(20), random.nextInt(10));
String revision = DigestUtils.sha1Hex(RandomStringUtils.random(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@

package alluxio.wire;

import static org.junit.Assert.assertNotEquals;

import alluxio.grpc.GrpcUtils;
import alluxio.util.CommonUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Defaults;
import org.junit.Assert;
import org.junit.Test;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Random;

public class WorkerNetAddressTest {
Expand All @@ -38,6 +43,34 @@ public void proto() {
checkEquality(workerNetAddress, other);
}

@Test
public void copyConstructor() throws IllegalAccessException {
WorkerNetAddress original = new WorkerNetAddress()
.setHost("host")
.setContainerHost("container")
.setRpcPort(1)
.setDataPort(1)
.setNettyDataPort(1)
.setSecureRpcPort(1)
.setWebPort(1)
.setDomainSocketPath("path");
WorkerNetAddress copied = new WorkerNetAddress(original);
// copied instance should contain exactly the same content
checkEquality(original, copied);
// mutate any non-final field in the copy,
// and the change should not be reflected in the original
for (Field field : WorkerNetAddress.class.getDeclaredFields()) {
int fieldModifiers = field.getModifiers();
if (Modifier.isStatic(fieldModifiers) || Modifier.isFinal(fieldModifiers)) {
continue;
}
field.setAccessible(true);
// set fields in the copy to their default value
field.set(copied, Defaults.defaultValue(field.getType()));
assertNotEquals(field.getName(), field.get(original), field.get(copied));
}
}

public void checkEquality(WorkerNetAddress a, WorkerNetAddress b) {
Assert.assertEquals(a.getHost(), b.getHost());
Assert.assertEquals(a.getRpcPort(), b.getRpcPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import alluxio.heartbeat.HeartbeatThread;
import alluxio.master.CoreMaster;
import alluxio.master.CoreMasterContext;
import alluxio.master.WorkerState;
import alluxio.master.block.meta.MasterWorkerInfo;
import alluxio.master.block.meta.WorkerMetaLockSection;
import alluxio.master.journal.JournalContext;
Expand Down Expand Up @@ -85,6 +84,7 @@
import alluxio.wire.RegisterLease;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
import alluxio.wire.WorkerState;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import alluxio.client.block.options.GetWorkerReportOptions.WorkerInfoField;
import alluxio.grpc.BuildVersion;
import alluxio.grpc.StorageList;
import alluxio.master.WorkerState;
import alluxio.master.block.DefaultBlockMaster;
import alluxio.resource.LockResource;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
import alluxio.wire.WorkerState;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -330,7 +330,7 @@ public WorkerInfo generateWorkerInfo(Set<WorkerInfoField> fieldRange, WorkerStat
info.setStartTimeMs(mMeta.mStartTimeMs);
break;
case STATE:
info.setState(workerState.toString());
info.setState(workerState);
break;
case WORKER_USED_BYTES:
info.setUsedBytes(mUsage.mUsedBytes);
Expand Down
Loading
Loading