Skip to content

Commit

Permalink
SHS-NG M4.4: Port JobsTab and StageTab to the new backend.
Browse files Browse the repository at this point in the history
This change is a little larger because there's a whole lot of logic
behind these pages, all really tied to internal types and listeners.
There's also a lot of code that was moved to the new module.

- Added missing StageData and ExecutorStageSummary fields which are
  used by the UI. Some json golden files needed to be updated to account
  for new fields.

- Save RDD graph data in the store. This tries to re-use existing types as
  much as possible, so that the code doesn't need to be re-written. So it's
  probably not very optimal.

- Some old classes (e.g. JobProgressListener) still remain, since they're used
  in other parts of the code; they're not used by the UI anymore, though, and
  will be cleaned up in a separate change.

- Save information about active pools in the disk store; this could potentially
  be avoided, since it's most probably not much data, but it makes it easier
  later to add this kind of information to the API and to history if wanted.

- Because the new store sorts things slightly differently from the previous
  code, some json golden files had some elements within them shuffled around.

- The retention unit test in UISeleniumSuite was disabled because the code
  to throw away old stages / tasks hasn't been added yet. It's less of a
  problem with the new store since it doesn't use memory, but it will be
  added later to avoid a similar issue with unbound disk space usage.

- The job description field in the API tries to follow the old behavior, which
  makes it be empty most of the time, even though there's information to fill it
  in. For stages, a new field was added to hold the description (which is basically
  the job description), so that the UI can be rendered in the old way.

- A new stage status ("SKIPPED") was added to account for the fact that the API
  couldn't represent that state before. Because of the way the new code tracks
  stages, they would end up showing up as "PENDING" in the UI.

TODO: add UIListener unit tests for the new fields.
  • Loading branch information
Marcelo Vanzin committed May 1, 2017
1 parent 6d8e1b2 commit f95d3cc
Show file tree
Hide file tree
Showing 51 changed files with 2,120 additions and 2,333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,16 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
} else {
this.end = null;
if (it.hasNext()) {
it.next();
// When descending, the caller may have set up the start of iteration at a non-existant
// entry that is guaranteed to be after the desired entry. For example, if you have a
// compound key (a, b) where b is a, integer, you may seek to the end of the elements that
// have the same "a" value by specifying Integer.MAX_VALUE for "b", and that value may not
// exist in the database. So need to check here whether the next value actually belongs to
// the set being returned by the iterator before advancing.
byte[] nextKey = it.peekNext().getKey();
if (compare(nextKey, indexKeyPrefix) <= 0) {
it.next();
}
}
}

Expand Down Expand Up @@ -258,4 +267,17 @@ private byte[] stitch(byte[]... comps) {
return dest;
}

private int compare(byte[] a, byte[] b) {
int diff = 0;
int minLen = Math.min(a.length, b.length);
for (int i = 0; i < minLen; i++) {
diff += (a[i] - b[i]);
if (diff != 0) {
return diff;
}
}

return a.length - b.length;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public enum StageStatus {
ACTIVE,
COMPLETE,
FAILED,
PENDING;
PENDING,
SKIPPED;

public static StageStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(StageStatus.class, str);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ class SparkContext(config: SparkConf) extends Logging {

_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.create(Some(this), _stateStore, _conf, listenerBus, _env.securityManager,
Some(SparkUI.create(Some(this), _stateStore, _conf, _env.securityManager,
appName, "", startTime))
} else {
// For tests, do not enable the UI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
return None
}

val path = uiStorePath(appId, attemptId)
if (!path.exists()) {
throw new IllegalStateException(
s"Application entry for $appId / $attemptId found, but UI not available.")
}

val conf = this.conf.clone()
val secManager = new SecurityManager(conf)

Expand All @@ -301,13 +295,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
attempt.adminAclsGroups.getOrElse(""))
secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse(""))

val path = uiStorePath(appId, attemptId)
if (!path.exists()) {
throw new IllegalStateException(
s"Application entry for $appId / $attemptId found, but UI not available.")
}

val replayBus = new ReplayListenerBus()

// Create the UI under a lock so that a valid disk store is used, in case the update thread
// is writing a new disk store for the application (see replaceStore()).
val loadedUI = synchronized {
val ui = SparkUI.create(None, AppStateStore.loadStore(conf, path), conf, replayBus,
secManager,
val ui = SparkUI.create(None, AppStateStore.loadStore(conf, path), conf, secManager,
app.info.name,
HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
attempt.info.startTime.getTime())
Expand Down
Loading

0 comments on commit f95d3cc

Please sign in to comment.