Skip to content

Commit

Permalink
feat: support async-profiler feature
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengziyi committed Sep 30, 2024
1 parent e78dd7c commit 2283de3
Show file tree
Hide file tree
Showing 79 changed files with 4,873 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.apache.skywalking.oap.server.network.trace.component.command;

import org.apache.skywalking.apm.network.common.v3.Command;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;

import java.util.List;
import java.util.Objects;

public class AsyncProfilerTaskCommand extends BaseCommand implements Serializable, Deserializable<AsyncProfilerTaskCommand> {
public static final Deserializable<AsyncProfilerTaskCommand> DESERIALIZER = new AsyncProfilerTaskCommand("", "", 0, null, "", 0);
public static final String NAME = "AsyncProfileTaskQuery";

private final String taskId;
private final int duration;
private final String execArgs;
private final long createTime;

public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration,
List<String> events, String execArgs, long createTime) {
super(NAME, serialNumber);
this.taskId = taskId;
this.duration = duration;
this.createTime = createTime;
String comma = ",";
StringBuilder sb = new StringBuilder();
if (Objects.nonNull(events) && !events.isEmpty()) {
sb.append("event=")
.append(String.join(comma, events))
.append(comma);
}
if (execArgs != null && !execArgs.isEmpty()) {
sb.append(execArgs);
}
this.execArgs = sb.toString();
}

public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration,
String execArgs, long createTime) {
super(NAME, serialNumber);
this.taskId = taskId;
this.duration = duration;
this.execArgs = execArgs;
this.createTime = createTime;
}

@Override
public AsyncProfilerTaskCommand deserialize(Command command) {
final List<KeyStringValuePair> argsList = command.getArgsList();
String taskId = null;
int duration = 0;
String execArgs = null;
long createTime = 0;
String serialNumber = null;
for (final KeyStringValuePair pair : argsList) {
if ("SerialNumber".equals(pair.getKey())) {
serialNumber = pair.getValue();
} else if ("TaskId".equals(pair.getKey())) {
taskId = pair.getValue();
} else if ("Duration".equals(pair.getKey())) {
duration = Integer.parseInt(pair.getValue());
} else if ("ExecArgs".equals(pair.getKey())) {
execArgs = pair.getValue();
} else if ("CreateTime".equals(pair.getKey())) {
createTime = Long.parseLong(pair.getValue());
}
}
return new AsyncProfilerTaskCommand(serialNumber, taskId, duration, execArgs, createTime);
}

@Override
public Command.Builder serialize() {
final Command.Builder builder = commandBuilder();
builder.addArgs(KeyStringValuePair.newBuilder().setKey("TaskId").setValue(taskId))
.addArgs(KeyStringValuePair.newBuilder().setKey("Duration").setValue(String.valueOf(duration)))
.addArgs(KeyStringValuePair.newBuilder().setKey("ExecArgs").setValue(execArgs))
.addArgs(KeyStringValuePair.newBuilder().setKey("CreateTime").setValue(String.valueOf(createTime)));
return builder;
}

public String getTaskId() {
return taskId;
}

public int getDuration() {
return duration;
}

public String getExecArgs() {
return execArgs;
}

public long getCreateTime() {
return createTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public static BaseCommand deserialize(final Command command) {
return ProfileTaskCommand.DESERIALIZER.deserialize(command);
} else if (ConfigurationDiscoveryCommand.NAME.equals(commandName)) {
return ConfigurationDiscoveryCommand.DESERIALIZER.deserialize(command);
} else if (AsyncProfilerTaskCommand.NAME.equals(commandName)) {
return AsyncProfilerTaskCommand.DESERIALIZER.deserialize(command);
}
throw new UnsupportedCommandException(command);
}
Expand Down
5 changes: 5 additions & 0 deletions oap-server/server-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>library-module</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-async-profiler-jfr-parser</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>telemetry-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.cache.AsyncProfilerTaskCache;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.command.CommandService;
Expand All @@ -33,6 +34,8 @@
import org.apache.skywalking.oap.server.core.management.ui.menu.UIMenuManagementService;
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplateManagementService;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
import org.apache.skywalking.oap.server.core.profiling.asyncprofiler.AsyncProfilerMutationService;
import org.apache.skywalking.oap.server.core.profiling.asyncprofiler.AsyncProfilerQueryService;
import org.apache.skywalking.oap.server.core.profiling.continuous.ContinuousProfilingMutationService;
import org.apache.skywalking.oap.server.core.profiling.continuous.ContinuousProfilingQueryService;
import org.apache.skywalking.oap.server.core.profiling.ebpf.EBPFProfilingMutationService;
Expand Down Expand Up @@ -100,6 +103,7 @@ public Class[] services() {
addOALService(classes);
addManagementService(classes);
addEBPFProfilingService(classes);
addAsyncProfilerService(classes);

classes.add(CommandService.class);
classes.add(HierarchyService.class);
Expand All @@ -124,6 +128,12 @@ private void addProfileService(List<Class> classes) {
classes.add(ProfileTaskCache.class);
}

private void addAsyncProfilerService(List<Class> classes) {
classes.add(AsyncProfilerMutationService.class);
classes.add(AsyncProfilerQueryService.class);
classes.add(AsyncProfilerTaskCache.class);
}

private void addOALService(List<Class> classes) {
classes.add(OALEngineLoaderService.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.AsyncProfilerTaskCache;
import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
Expand All @@ -58,6 +59,8 @@
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplateManagementService;
import org.apache.skywalking.oap.server.core.oal.rt.DisableOALDefine;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
import org.apache.skywalking.oap.server.core.profiling.asyncprofiler.AsyncProfilerMutationService;
import org.apache.skywalking.oap.server.core.profiling.asyncprofiler.AsyncProfilerQueryService;
import org.apache.skywalking.oap.server.core.profiling.continuous.ContinuousProfilingMutationService;
import org.apache.skywalking.oap.server.core.profiling.continuous.ContinuousProfilingQueryService;
import org.apache.skywalking.oap.server.core.profiling.ebpf.EBPFProfilingMutationService;
Expand Down Expand Up @@ -309,6 +312,12 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
ProfileTaskQueryService.class, new ProfileTaskQueryService(getManager(), moduleConfig));
this.registerServiceImplementation(ProfileTaskCache.class, new ProfileTaskCache(getManager(), moduleConfig));

this.registerServiceImplementation(
AsyncProfilerMutationService.class, new AsyncProfilerMutationService(getManager()));
this.registerServiceImplementation(
AsyncProfilerQueryService.class, new AsyncProfilerQueryService(getManager()));
this.registerServiceImplementation(
AsyncProfilerTaskCache.class, new AsyncProfilerTaskCache(getManager(), moduleConfig));
this.registerServiceImplementation(
EBPFProfilingMutationService.class, new EBPFProfilingMutationService(getManager()));
this.registerServiceImplementation(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.cache;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.query.type.AsyncProfilerTask;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.profiling.asyncprofiler.IAsyncProfilerTaskQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class AsyncProfilerTaskCache implements Service {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProfilerTaskCache.class);

private final Cache<String, AsyncProfilerTask> serviceId2taskCache;

private final ModuleManager moduleManager;

private IAsyncProfilerTaskQueryDAO taskQueryDAO;

public AsyncProfilerTaskCache(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
this.moduleManager = moduleManager;
long initialSize = moduleConfig.getMaxSizeOfProfileTask() / 10L;
int initialCapacitySize = (int) (initialSize > Integer.MAX_VALUE ? Integer.MAX_VALUE : initialSize);

serviceId2taskCache = CacheBuilder.newBuilder()
.initialCapacity(initialCapacitySize)
.maximumSize(moduleConfig.getMaxSizeOfProfileTask())
// remove old profile task data
.expireAfterWrite(Duration.ofMinutes(1))
.build();
}

private IAsyncProfilerTaskQueryDAO getTaskQueryDAO() {
if (Objects.isNull(taskQueryDAO)) {
taskQueryDAO = moduleManager.find(StorageModule.NAME)
.provider()
.getService(IAsyncProfilerTaskQueryDAO.class);
}
return taskQueryDAO;
}

public AsyncProfilerTask getAsyncProfilerTask(String serviceId) {
return serviceId2taskCache.getIfPresent(serviceId);
}

public void saveTask(String serviceId, AsyncProfilerTask task) {
if (task == null) {
return ;
}

serviceId2taskCache.put(serviceId, task);
}

/**
* use for every db query, -5 start time
*/
public long getCacheStartTimeBucket() {
return TimeBucket.getMinuteTimeBucket(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
}

/**
* use for every db query, +5 end time(because use task start time to search)
*/
public long getCacheEndTimeBucket() {
return TimeBucket.getMinuteTimeBucket(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.profiling.asyncprofiler.storage.AsyncProfilerTaskRecord;
import org.apache.skywalking.oap.server.core.query.type.AsyncProfilerTask;
import org.apache.skywalking.oap.server.core.storage.profiling.asyncprofiler.IAsyncProfilerTaskQueryDAO;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
Expand Down Expand Up @@ -61,6 +65,10 @@ private void update(ModuleDefineHolder moduleDefineHolder) {
if (!DisableRegister.INSTANCE.include(ProfileTaskRecord.INDEX_NAME)) {
updateProfileTask(moduleDefineHolder);
}

if (!DisableRegister.INSTANCE.include(AsyncProfilerTaskRecord.INDEX_NAME)) {
updateAsyncProfilerTask(moduleDefineHolder);
}
}

/**
Expand Down Expand Up @@ -113,4 +121,31 @@ private void updateProfileTask(ModuleDefineHolder moduleDefineHolder) {
log.warn("Unable to update profile task cache", e);
}
}

private void updateAsyncProfilerTask(ModuleDefineHolder moduleDefineHolder) {
AsyncProfilerTaskCache taskCache = moduleDefineHolder.find(CoreModule.NAME)
.provider()
.getService(AsyncProfilerTaskCache.class);
IAsyncProfilerTaskQueryDAO taskQueryDAO = moduleDefineHolder.find(StorageModule.NAME)
.provider()
.getService(IAsyncProfilerTaskQueryDAO.class);
try {
List<AsyncProfilerTask> taskList = taskQueryDAO.getTaskList(
null, taskCache.getCacheStartTimeBucket(), taskCache.getCacheEndTimeBucket(), null
);
if (CollectionUtils.isEmpty(taskList)) {
return;
}
// List<String> taskIds = taskList.stream().map(AsyncProfilerTask::getId).collect(Collectors.toList());
// Map<String, List<AsyncProfilerTaskLogRecord>> taskId2Log = taskCache.getTaskLogQueryDAO().getTaskLogByTaskId(taskIds);
for (AsyncProfilerTask task : taskList) {
taskCache.saveTask(task.getServiceId(), task);
}

} catch (IOException e) {
log.warn("Unable to update async profiler task cache", e);
}

return;
}
}
Loading

0 comments on commit 2283de3

Please sign in to comment.