Skip to content

Commit

Permalink
SHS-NG M3: Add initial listener implementation to collect app state.
Browse files Browse the repository at this point in the history
The initial listener is based on the existing JobProgressListener (and others),
and tries to mimin their behavior as much as possible. The change also includes
some minor code movement so that some types and methods from the initial history
server code code can be reused.

Note the code here is not 100% correct. This is meant as a building ground for
the UI integration in the next milestones. As different parts of the UI are
ported, fixes will be made to the different parts of this code to account
for the needed behavior.

I also added annotations to API types so that Jackson is able to correctly
deserialize options, sequences and maps that store primitive types.
  • Loading branch information
Marcelo Vanzin committed May 27, 2017
1 parent 2fa4f3f commit 19217ff
Show file tree
Hide file tree
Showing 13 changed files with 1,928 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,23 @@ public KVTypeInfo(Class<?> type) throws Exception {
this.accessors = new HashMap<>();
this.indices = new HashMap<>();

for (Field f : type.getFields()) {
for (Field f : type.getDeclaredFields()) {
KVIndex idx = f.getAnnotation(KVIndex.class);
if (idx != null) {
checkIndex(idx, indices);
f.setAccessible(true);
indices.put(idx.value(), idx);
accessors.put(idx.value(), new FieldAccessor(f));
}
}

for (Method m : type.getMethods()) {
for (Method m : type.getDeclaredMethods()) {
KVIndex idx = m.getAnnotation(KVIndex.class);
if (idx != null) {
checkIndex(idx, indices);
Preconditions.checkArgument(m.getParameterTypes().length == 0,
"Annotated method %s::%s should not have any parameters.", type.getName(), m.getName());
m.setAccessible(true);
indices.put(idx.value(), idx);
accessors.put(idx.value(), new MethodAccessor(m));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
this.types = new ConcurrentHashMap<>();

Options options = new Options();
options.createIfMissing(!path.exists());
options.createIfMissing(true);
this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options));

byte[] versionData = db().get(STORE_VERSION_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public enum StageStatus {
ACTIVE,
COMPLETE,
FAILED,
PENDING;
PENDING,
SKIPPED;

public static StageStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(StageStatus.class, str);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.kvstore._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
Expand Down Expand Up @@ -698,18 +699,6 @@ private[history] object FsHistoryProvider {
private val CURRENT_VERSION = 1L
}

/**
* A KVStoreSerializer that provides Scala types serialization too, and uses the same options as
* the API serializer.
*/
private class KVStoreScalaSerializer extends KVStoreSerializer {

mapper.registerModule(DefaultScalaModule)
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)

}

case class KVStoreMetadata(
val version: Long,
val logDir: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,10 @@ package org.apache.spark.deploy.history

import java.util.concurrent.TimeUnit

import scala.annotation.meta.getter

import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.kvstore.KVIndex

private[spark] object config {

/** Use this to annotate constructor params to be used as KVStore indices. */
type KVIndexParam = KVIndex @getter

val DEFAULT_LOG_DIR = "file:/tmp/spark-events"

val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
Expand Down
Loading

0 comments on commit 19217ff

Please sign in to comment.