Skip to content

Commit

Permalink
Support --ignore-sleeping with async-profiler (#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirYwell authored Nov 2, 2024
1 parent 41a1117 commit ddb0097
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ private void profilerStart(SparkPlatform platform, CommandSender sender, Command
}

boolean ignoreSleeping = arguments.boolFlag("ignore-sleeping");
boolean ignoreNative = arguments.boolFlag("ignore-native");
boolean forceJavaSampler = arguments.boolFlag("force-java-sampler");

Set<String> threads = arguments.stringFlag("thread");
Expand Down Expand Up @@ -239,7 +238,6 @@ private void profilerStart(SparkPlatform platform, CommandSender sender, Command
}
builder.samplingInterval(interval);
builder.ignoreSleeping(ignoreSleeping);
builder.ignoreNative(ignoreNative);
builder.forceJavaSampler(forceJavaSampler);
builder.allocLiveOnly(allocLiveOnly);
if (ticksOver != -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.logging.Level;

/**
* Builds {@link Sampler} instances.
Expand All @@ -39,8 +40,7 @@ public class SamplerBuilder {
private SamplerMode mode = SamplerMode.EXECUTION;
private double samplingInterval = -1;
private boolean ignoreSleeping = false;
private boolean ignoreNative = false;
private boolean useAsyncProfiler = true;
private boolean forceJavaSampler = false;
private boolean allocLiveOnly = false;
private long autoEndTime = -1;
private boolean background = false;
Expand Down Expand Up @@ -97,13 +97,8 @@ public SamplerBuilder ignoreSleeping(boolean ignoreSleeping) {
return this;
}

public SamplerBuilder ignoreNative(boolean ignoreNative) {
this.ignoreNative = ignoreNative;
return this;
}

public SamplerBuilder forceJavaSampler(boolean forceJavaSampler) {
this.useAsyncProfiler = !forceJavaSampler;
this.forceJavaSampler = forceJavaSampler;
return this;
}

Expand All @@ -117,32 +112,41 @@ public Sampler start(SparkPlatform platform) throws UnsupportedOperationExceptio
throw new IllegalArgumentException("samplingInterval = " + this.samplingInterval);
}

boolean canUseAsyncProfiler = AsyncProfilerAccess.getInstance(platform).checkSupported(platform);
boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null;
boolean canUseAsyncProfiler = this.useAsyncProfiler &&
!onlyTicksOverMode &&
!(this.ignoreSleeping || this.ignoreNative) &&
AsyncProfilerAccess.getInstance(platform).checkSupported(platform);

if (this.mode == SamplerMode.ALLOCATION && (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform))) {
throw new UnsupportedOperationException("Allocation profiling is not supported on your system. Check the console for more info.");
if (this.mode == SamplerMode.ALLOCATION) {
if (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform)) {
throw new UnsupportedOperationException("Allocation profiling is not supported on your system. Check the console for more info.");
}
if (this.ignoreSleeping) {
platform.getPlugin().log(Level.WARNING, "Ignoring sleeping threads is not supported in allocation profiling mode. Sleeping threads will be included in the results.");
}
if (onlyTicksOverMode) {
platform.getPlugin().log(Level.WARNING, "'Only-ticks-over' is not supported in allocation profiling mode.");
}
}

if (onlyTicksOverMode || this.forceJavaSampler) {
canUseAsyncProfiler = false;
}

int interval = (int) (this.mode == SamplerMode.EXECUTION ?
this.samplingInterval * 1000d : // convert to microseconds
this.samplingInterval
);

SamplerSettings settings = new SamplerSettings(interval, this.threadDumper, this.threadGrouper.get(), this.autoEndTime, this.background);
SamplerSettings settings = new SamplerSettings(interval, this.threadDumper, this.threadGrouper.get(), this.autoEndTime, this.background, this.ignoreSleeping);

Sampler sampler;
if (this.mode == SamplerMode.ALLOCATION) {
sampler = new AsyncSampler(platform, settings, new SampleCollector.Allocation(interval, this.allocLiveOnly));
} else if (canUseAsyncProfiler) {
sampler = new AsyncSampler(platform, settings, new SampleCollector.Execution(interval));
} else if (onlyTicksOverMode) {
sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
sampler = new JavaSampler(platform, settings, this.tickHook, this.ticksOver);
} else {
sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative);
sampler = new JavaSampler(platform, settings);
}

sampler.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ public class SamplerSettings {
private final ThreadGrouper threadGrouper;
private final long autoEndTime;
private final boolean runningInBackground;
private final boolean ignoreSleeping;

public SamplerSettings(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long autoEndTime, boolean runningInBackground) {
public SamplerSettings(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long autoEndTime, boolean runningInBackground, boolean ignoreSleeping) {
this.interval = interval;
this.threadDumper = threadDumper;
this.threadGrouper = threadGrouper;
this.autoEndTime = autoEndTime;
this.runningInBackground = runningInBackground;
this.ignoreSleeping = ignoreSleeping;
}

public int interval() {
Expand All @@ -58,4 +60,8 @@ public long autoEndTime() {
public boolean runningInBackground() {
return this.runningInBackground;
}

public boolean ignoreSleeping() {
return this.ignoreSleeping;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ public abstract class AbstractDataAggregator implements DataAggregator {
/** The instance used to group threads together */
protected final ThreadGrouper threadGrouper;

protected AbstractDataAggregator(ThreadGrouper threadGrouper) {
/** If sleeping threads should be ignored */
protected final boolean ignoreSleeping;

protected AbstractDataAggregator(ThreadGrouper threadGrouper, boolean ignoreSleeping) {
this.threadGrouper = threadGrouper;
this.ignoreSleeping = ignoreSleeping;
}

protected ThreadNode getNode(String group) {
Expand All @@ -65,4 +69,13 @@ public List<ThreadNode> exportData() {
}
return data;
}

protected static boolean isSleeping(String clazz, String method) {
// java.lang.Thread.yield()
// jdk.internal.misc.Unsafe.park()
// sun.misc.Unsafe.park()
return (clazz.equals("java.lang.Thread") && method.equals("yield")) ||
(clazz.equals("jdk.internal.misc.Unsafe") && method.equals("park")) ||
(clazz.equals("sun.misc.Unsafe") && method.equals("park"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class AsyncDataAggregator extends AbstractDataAggregator {
private static final StackTraceNode.Describer<AsyncStackTraceElement> STACK_TRACE_DESCRIBER = (element, parent) ->
new StackTraceNode.AsyncDescription(element.getClassName(), element.getMethodName(), element.getMethodDescription());

protected AsyncDataAggregator(ThreadGrouper threadGrouper) {
super(threadGrouper);
protected AsyncDataAggregator(ThreadGrouper threadGrouper, boolean ignoreSleeping) {
super(threadGrouper, ignoreSleeping);
}

@Override
Expand All @@ -48,6 +48,9 @@ public SamplerMetadata.DataAggregator getMetadata() {
}

public void insertData(ProfileSegment element, int window) {
if (this.ignoreSleeping && isSleeping(element)) {
return;
}
try {
ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName()));
node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getValue(), window);
Expand All @@ -56,4 +59,24 @@ public void insertData(ProfileSegment element, int window) {
}
}

private static boolean isSleeping(ProfileSegment element) {
// thread states written by async-profiler:
// https://github.com/async-profiler/async-profiler/blob/116504c9f75721911b2f561e29eda065c224caf6/src/flightRecorder.cpp#L1017-L1023
String threadState = element.getThreadState();
if (threadState.equals("STATE_SLEEPING")) {
return true;
}

// async-profiler includes native frames - let's check more than just the top frame
AsyncStackTraceElement[] stackTrace = element.getStackTrace();
for (int i = 0; i < Math.min(3, stackTrace.length); i++) {
String clazz = stackTrace[i].getClassName();
String method = stackTrace[i].getMethodName();
if (isSleeping(clazz, method)) {
return true;
}
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleColl
super(platform, settings);
this.sampleCollector = collector;
this.profilerAccess = AsyncProfilerAccess.getInstance(platform);
this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper());
this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper(), settings.ignoreSleeping());
this.scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("spark-async-sampler-worker-thread")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
*/
public class ProfileSegment {

private static final String UNKNOWN_THREAD_STATE = "<unknown>";

/** The native thread id (does not correspond to Thread#getId) */
private final int nativeThreadId;
/** The name of the thread */
Expand All @@ -39,12 +41,15 @@ public class ProfileSegment {
private final AsyncStackTraceElement[] stackTrace;
/** The time spent executing this segment in microseconds */
private final long value;
/** The state of the thread. {@value #UNKNOWN_THREAD_STATE} if state is unknown */
private final String threadState;

public ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value) {
private ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value, String threadState) {
this.nativeThreadId = nativeThreadId;
this.threadName = threadName;
this.stackTrace = stackTrace;
this.value = value;
this.threadState = threadState;
}

public int getNativeThreadId() {
Expand All @@ -63,6 +68,10 @@ public long getValue() {
return this.value;
}

public String getThreadState() {
return this.threadState;
}

public static ProfileSegment parseSegment(JfrReader reader, JfrReader.Event sample, String threadName, long value) {
JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId);
int len = stackTrace != null ? stackTrace.methods.length : 0;
Expand All @@ -71,8 +80,13 @@ public static ProfileSegment parseSegment(JfrReader reader, JfrReader.Event samp
for (int i = 0; i < len; i++) {
stack[i] = parseStackFrame(reader, stackTrace.methods[i]);
}
String threadState = UNKNOWN_THREAD_STATE;
if (sample instanceof JfrReader.ExecutionSample) {
JfrReader.ExecutionSample executionSample = (JfrReader.ExecutionSample) sample;
threadState = reader.threadStates.get(executionSample.threadState);
}

return new ProfileSegment(sample.tid, threadName, stack, value);
return new ProfileSegment(sample.tid, threadName, stack, value, threadState);
}

private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,10 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator {
/** The interval to wait between sampling, in microseconds */
protected final int interval;

/** If sleeping threads should be ignored */
private final boolean ignoreSleeping;

/** If threads executing native code should be ignored */
private final boolean ignoreNative;

public JavaDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) {
super(threadGrouper);
public JavaDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping) {
super(threadGrouper, ignoreSleeping);
this.workerPool = workerPool;
this.interval = interval;
this.ignoreSleeping = ignoreSleeping;
this.ignoreNative = ignoreNative;
}

/**
Expand All @@ -74,9 +66,6 @@ protected void writeData(ThreadInfo threadInfo, int window) {
if (this.ignoreSleeping && isSleeping(threadInfo)) {
return;
}
if (this.ignoreNative && threadInfo.isInNative()) {
return;
}

try {
ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName()));
Expand Down Expand Up @@ -113,12 +102,7 @@ static boolean isSleeping(ThreadInfo thread) {
String clazz = call.getClassName();
String method = call.getMethodName();

// java.lang.Thread.yield()
// jdk.internal.misc.Unsafe.park()
// sun.misc.Unsafe.park()
return (clazz.equals("java.lang.Thread") && method.equals("yield")) ||
(clazz.equals("jdk.internal.misc.Unsafe") && method.equals("park")) ||
(clazz.equals("sun.misc.Unsafe") && method.equals("park"));
return isSleeping(clazz, method);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ public class JavaSampler extends AbstractSampler implements Runnable {
/** The last window that was profiled */
private final AtomicInteger lastWindow = new AtomicInteger();

public JavaSampler(SparkPlatform platform, SamplerSettings settings, boolean ignoreSleeping, boolean ignoreNative) {
public JavaSampler(SparkPlatform platform, SamplerSettings settings) {
super(platform, settings);
this.dataAggregator = new SimpleDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), ignoreSleeping, ignoreNative);
this.dataAggregator = new SimpleJavaDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), settings.ignoreSleeping());
}

public JavaSampler(SparkPlatform platform, SamplerSettings settings, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) {
public JavaSampler(SparkPlatform platform, SamplerSettings settings, TickHook tickHook, int tickLengthThreshold) {
super(platform, settings);
this.dataAggregator = new TickedDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold);
this.dataAggregator = new TickedJavaDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), settings.ignoreSleeping(), tickHook, tickLengthThreshold);
}

@Override
Expand All @@ -89,9 +89,9 @@ public void start() {

TickHook tickHook = this.platform.getTickHook();
if (tickHook != null) {
if (this.dataAggregator instanceof TickedDataAggregator) {
if (this.dataAggregator instanceof TickedJavaDataAggregator) {
WindowStatisticsCollector.ExplicitTickCounter counter = this.windowStatisticsCollector.startCountingTicksExplicit(tickHook);
((TickedDataAggregator) this.dataAggregator).setTickCounter(counter);
((TickedJavaDataAggregator) this.dataAggregator).setTickCounter(counter);
} else {
this.windowStatisticsCollector.startCountingTicks(tickHook);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
/**
* Basic implementation of {@link DataAggregator}.
*/
public class SimpleDataAggregator extends JavaDataAggregator {
public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) {
super(workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative);
public class SimpleJavaDataAggregator extends JavaDataAggregator {
public SimpleJavaDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping) {
super(workerPool, threadGrouper, interval, ignoreSleeping);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* Implementation of {@link DataAggregator} which supports only including sampling data from "ticks"
* which exceed a certain threshold in duration.
*/
public class TickedDataAggregator extends JavaDataAggregator {
public class TickedJavaDataAggregator extends JavaDataAggregator {

/** Used to monitor the current "tick" of the server */
private final TickHook tickHook;
Expand All @@ -59,8 +59,8 @@ public class TickedDataAggregator extends JavaDataAggregator {
// guards currentData
private final Object mutex = new Object();

public TickedDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) {
super(workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative);
public TickedJavaDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, TickHook tickHook, int tickLengthThreshold) {
super(workerPool, threadGrouper, interval, ignoreSleeping);
this.tickHook = tickHook;
this.tickLengthThreshold = TimeUnit.MILLISECONDS.toMicros(tickLengthThreshold);
// 50 millis in a tick, plus 10 so we have a bit of room to go over
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import me.lucko.spark.common.monitor.tick.TickStatistics;
import me.lucko.spark.common.platform.world.AsyncWorldInfoProvider;
import me.lucko.spark.common.platform.world.WorldInfoProvider;
import me.lucko.spark.common.sampler.java.TickedJavaDataAggregator;
import me.lucko.spark.common.tick.TickHook;
import me.lucko.spark.proto.SparkProtos;

Expand Down Expand Up @@ -287,7 +288,7 @@ public int getCountedTicksThisWindowAndReset() {
* Counts the number of ticks in a window according to the number of times
* {@link #increment()} is called.
*
* Used by the {@link me.lucko.spark.common.sampler.java.TickedDataAggregator}.
* Used by the {@link TickedJavaDataAggregator}.
*/
public static final class ExplicitTickCounter extends BaseTickCounter {
private final AtomicInteger counted = new AtomicInteger();
Expand Down

0 comments on commit ddb0097

Please sign in to comment.