-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use cluster view to replace list of workers #18451
Conversation
The branch depends on the other PR #18441 , will rebase once that one gets in. |
# Conflicts: # dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java # dora/core/common/src/main/java/alluxio/membership/WorkerClusterView.java
@lucyge2022 @JiamingMai @jiacheliu3 can you please take a look? thanks. @jja725 I left the scheduler related APIs unchanged but I think it can also benefit from this refactor. Let me know what you think. |
Set<WorkerIdentity> liveWorkerIds = parseWorkersFromEtcdKvPairs( | ||
mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices()) | ||
.map(WorkerServiceEntity::getIdentity) | ||
.collect(Collectors.toSet()); | ||
Predicate<WorkerInfo> isLive = w -> liveWorkerIds.contains(w.getIdentity()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to figure out the liveness of a worker, I had to first get a set of all live workers, so that live and lost workers can be correctly differentiate. @lucyge2022 I'd appreciate your comment on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah u have to make separate calls to know the state of a worker
List<BlockWorkerInfo> liveWorkers = mMembershipManager.getLiveMembers().stream() | ||
.map(w -> new BlockWorkerInfo(w.getIdentity(), w.getAddress(), w.getCapacityBytes(), | ||
w.getUsedBytes(), true)).collect(toList()); | ||
List<BlockWorkerInfo> lostWorkers = mMembershipManager.getFailedMembers().stream() | ||
.map(w -> new BlockWorkerInfo( | ||
w.getIdentity(), w.getAddress(), w.getCapacityBytes(), w.getUsedBytes(), | ||
false)).collect(toList()); | ||
// avoid duplicate elements in list | ||
return combineAllWorkers(liveWorkers, lostWorkers); | ||
return mMembershipManager.getAllMembers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was confused why we are taking trouble to combine a list of live workers with a list of lost workers to get the list of all workers, while all this time there is a getAllMembers
method to do exactly this.
I think this change has a subtle difference, as the combineAllWorkers
method used to de-duplicate workers by their net address. Now mMembershipManager.getAllMembers
does so by checking the worker IDs. But workers shouldn't have conflicting net addresses either so it's no big problem IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed we can use mMembershipManager.getAllMembers
given the subtle difference. But the property that workers shouldn't have conflicting net addresses
is not guaranteed IMO? I suggest we double check that in the worker register code (in a separate PR if necessary). At least if some workers are using dup IDs or addresses we should check somewhere and log some warnings, so that people can realize at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workers shouldn't have conflicting net addresses is not guaranteed
Right. So there used to be some potential bugs due to conflicting worker addresses. But this is not relevant any more. Depending on net addresses to differentiate workers is not reliable, hence the introduction of worker IDs.
we double check that in the worker register code
I think Lucy did exactly that in #18454
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiacheliu3 @dbw9580 actually if one worker register with workerid1 and then remove its workeridentity file and restart, it will register as workerid2, if no one removes workerid1 key from etcd, getAllMembers will have both worker entity bearing same addresses, but that's not sth code base should be guarding against, its a deployment issue. getAllMembers could be thought of as all the distinct members forming the ring.
List<BlockWorkerInfo> blockWorkerInfoList = workerClusterView.stream() | ||
.map(w -> new BlockWorkerInfo(w.getIdentity(), w.getAddress(), w.getCapacityBytes(), | ||
w.getUsedBytes(), w.getState() == WorkerState.LIVE)) | ||
.collect(Collectors.toList()); | ||
HASH_PROVIDER.refresh(blockWorkerInfoList, mNumVirtualNodes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactoring ConsistentHashProvider
to accept WorkerClusterView
instead of List<BlockWorkerInfo>
will be done in a separate PR: #18434
* @param fileId | ||
* @param count | ||
* @return a list of preferred workers | ||
* @throws ResourceExhaustedException if unable to return exactly #{count} workers | ||
*/ | ||
List<BlockWorkerInfo> getPreferredWorkers(List<BlockWorkerInfo> blockWorkerInfos, | ||
List<BlockWorkerInfo> getPreferredWorkers(WorkerClusterView workers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the return type is unchanged because the order of the workers returned is important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM. Good to merge once the comments are looked at. Thanks for the work!
dora/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java
Outdated
Show resolved
Hide resolved
|| mWorkerRefreshPolicy.attempt()) { | ||
switch (type) { | ||
case ALL: | ||
mWorkerInfoList.set(getAllWorkers()); | ||
mCachedWorkerClusterView.set(getAllWorkers()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh interestingly, this same cache object is caching results for 3 types? how do we know if a get wants the same type as is cached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch! I think this is a potential bug. Since the only caller to this overloaded getCachedWorkers(GetWorkerListType)
is DoraCacheClient.getWorkerNetAddress
and it's argument GetWorkerListType
is tied to mEnableDynamicHashRing
which is currently a runtime constant, this does not cause any visible bug.
IMO since there are already getLiveWorkers
getAllWorkers
and getLostWorkers
methods on FileSystemContext
, I'd propose to simply deprecate/remove this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to fix this in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch
List<BlockWorkerInfo> liveWorkers = mMembershipManager.getLiveMembers().stream() | ||
.map(w -> new BlockWorkerInfo(w.getIdentity(), w.getAddress(), w.getCapacityBytes(), | ||
w.getUsedBytes(), true)).collect(toList()); | ||
List<BlockWorkerInfo> lostWorkers = mMembershipManager.getFailedMembers().stream() | ||
.map(w -> new BlockWorkerInfo( | ||
w.getIdentity(), w.getAddress(), w.getCapacityBytes(), w.getUsedBytes(), | ||
false)).collect(toList()); | ||
// avoid duplicate elements in list | ||
return combineAllWorkers(liveWorkers, lostWorkers); | ||
return mMembershipManager.getAllMembers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed we can use mMembershipManager.getAllMembers
given the subtle difference. But the property that workers shouldn't have conflicting net addresses
is not guaranteed IMO? I suggest we double check that in the worker register code (in a separate PR if necessary). At least if some workers are using dup IDs or addresses we should check somewhere and log some warnings, so that people can realize at all.
# Conflicts: # dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java
/** | ||
* The policy to refresh workers list. | ||
*/ | ||
@GuardedBy("mCachedWorkerClusterView") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this policy guarded by the workerlist? there's no update to this policy? can u remove this if its a legacy thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RefreshPolicy
objects are not thread safe, and TimeoutRefresh.attempt
actually mutates its internal state. Therefore access to it must be serialized across different threads.
This policy object is only used in getCachedWorkers
so it's fine to guard it with mCachedWorkerClusterView
's lock.
dora/core/client/fs/src/main/java/alluxio/client/file/dora/ConsistentHashPolicy.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM other than some minor comments
alluxio-bot, merge this please |
merge failed: |
alluxio-bot, merge this please |
### What changes are proposed in this pull request? Replace `List<BlockWorkerInfo>` with `WorkerClusterView` in APIs. Important APIs that are changed: 1. `FileSystemContext.getCachedWorkers` now returns `WorkerClusterView` 2. `WorkerLocationPolicy.getPreferredWorkers` (as well as all its implementations) now accepts a `WorkerClusterView` as the first argument (but still returns `List<BlockWorkerInfo>` as the returned list must be ordered) APIs that are using `List<BlockWorkerInfo>` (or `List<WorkerInfo>`) but *not* migrated to `WorkerClusterView`: 1. `alluxio.master.scheduler.WorkerProvider.getWorkerInfos` returns `List<WorkerInfo>`. 2. Job service related APIs, e.g. `alluxio.job.plan.PlanDefinition.selectExecutors` Notable behavior change: Now `EtcdMembershipManager` assigns the correct state (`LIVE` or `LOST`) for all workers in its `WorkerInfo` struct. Before this change, this information is not available and the state defaults to `UNRECOGNIZED`. ### Why are the changes needed? Allow more efficient indexing and filtering workers by worker ID. ### Does this PR introduce any user facing changes? No. pr-link: Alluxio#18451 change-id: cid-5052d2faa506f4de6e4b0df7062c5def3e09df1c
What changes are proposed in this pull request?
Replace
List<BlockWorkerInfo>
withWorkerClusterView
in APIs.Important APIs that are changed:
FileSystemContext.getCachedWorkers
now returnsWorkerClusterView
WorkerLocationPolicy.getPreferredWorkers
(as well as all its implementations) now accepts aWorkerClusterView
as the first argument (but still returnsList<BlockWorkerInfo>
as the returned list must be ordered)APIs that are using
List<BlockWorkerInfo>
(orList<WorkerInfo>
) but not migrated toWorkerClusterView
:alluxio.master.scheduler.WorkerProvider.getWorkerInfos
returnsList<WorkerInfo>
.alluxio.job.plan.PlanDefinition.selectExecutors
Notable behavior change:
Now
EtcdMembershipManager
assigns the correct state (LIVE
orLOST
) for all workers in itsWorkerInfo
struct. Before this change, this information is not available and the state defaults toUNRECOGNIZED
.Why are the changes needed?
Allow more efficient indexing and filtering workers by worker ID.
Does this PR introduce any user facing changes?
No.