Skip to content

Commit

Permalink
perf: 作业执行结果回调日志级别优化 TencentBlueKing#3097
Browse files Browse the repository at this point in the history
  • Loading branch information
liuliaozhong committed Sep 13, 2024
1 parent 80d35a2 commit 45ae89f
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ public class CommonMetricNames {
* 统计调用 GSE API整个过程,含反序列化
*/
public static final String ESB_GSE_API = "job.client.gse.api";

/**
* 统计任务执行结束回调HTTP的请求状态
*/
public static final String TASK_CALLBACK_HTTP_STATUS = "job.client.task.callback.http.status";
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ public class CommonMetricTags {
* 蓝鲸应用 ID
*/
public static final String KEY_APP_CODE = "app_code";

/**
* HTTP请求状态
*/
public static final String KEY_HTTP_STATUS = "http_status";
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class HttpConPoolUtil {
* @param contentType 默认传null则为"application/x-www-form-urlencoded"
* @return 响应
*/
public static String post(String url, String content, String contentType) {
public static HttpResponse post(String url, String content, String contentType) {
return post(url, CHARSET, content, contentType);
}

Expand All @@ -105,15 +105,11 @@ public static String post(String url, String content, String contentType) {
* @param contentType 默认传null则为"application/x-www-form-urlencoded"
* @return 响应
*/
public static String post(String url, String charset, String content, String contentType) {
public static HttpResponse post(String url, String charset, String content, String contentType) {
try {
byte[] resp = post(url, content.getBytes(charset), contentType);
if (null == resp) {
return null;
}
return new String(resp, charset);
return post(url, content.getBytes(charset), contentType);
} catch (IOException e) {
log.error("Post request fail", e);
log.warn("Post request fail", e);
throw new InternalException(e, ErrorCode.API_ERROR);
}
}
Expand All @@ -125,7 +121,7 @@ public static String post(String url, String charset, String content, String con
* @param content 提交的内容字符串
* @return 响应
*/
public static String post(String url, String content) {
public static HttpResponse post(String url, String content) {
return post(url, CHARSET, content, "application/x-www-form-urlencoded");
}

Expand All @@ -137,7 +133,7 @@ public static String post(String url, String content) {
* @param headers 自定义请求头
* @return 响应
*/
public static String post(String url, String content, Header... headers) {
public static HttpResponse post(String url, String content, Header... headers) {
return post(url, CHARSET, content, headers);
}

Expand All @@ -150,15 +146,11 @@ public static String post(String url, String content, Header... headers) {
* @param headers 自定义请求头
* @return 响应
*/
public static String post(String url, String charset, String content, Header... headers) {
public static HttpResponse post(String url, String charset, String content, Header... headers) {
try {
byte[] resp = post(url, new ByteArrayEntity(content.getBytes(charset)), headers);
if (null == resp) {
return null;
}
return new String(resp, charset);
return post(url, new ByteArrayEntity(content.getBytes(charset)), headers);
} catch (IOException e) {
log.error("Post request fail", e);
log.warn("Post request fail", e);
throw new InternalException(e, ErrorCode.API_ERROR);
}
}
Expand All @@ -171,7 +163,7 @@ public static String post(String url, String charset, String content, Header...
* @param contentType 默认传null则为"application/x-www-form-urlencoded"
* @return 返回字节数组
*/
public static byte[] post(String url, byte[] content, String contentType) {
public static HttpResponse post(String url, byte[] content, String contentType) {
return post(url, new ByteArrayEntity(content), contentType);
}

Expand All @@ -183,30 +175,30 @@ public static byte[] post(String url, byte[] content, String contentType) {
* @param contentType 默认传null则为"application/x-www-form-urlencoded"
* @return 返回字节数组
*/
public static byte[] post(String url, HttpEntity requestEntity, String contentType) {
public static HttpResponse post(String url, HttpEntity requestEntity, String contentType) {
return post(url, requestEntity,
new BasicHeader("Content-Type", contentType == null ? "application/x-www-form-urlencoded" : contentType));
new BasicHeader("Content-Type",
contentType == null ? "application/x-www-form-urlencoded" : contentType));
}

public static byte[] post(String url, HttpEntity requestEntity, Header... headers) {
public static HttpResponse post(String url, HttpEntity requestEntity, Header... headers) {
HttpPost post = new HttpPost(url);
// 设置为长连接,服务端判断有此参数就不关闭连接。
post.setHeader("Connection", "Keep-Alive");
post.setHeaders(headers);
post.setEntity(requestEntity);
try (CloseableHttpResponse httpResponse = HTTP_CLIENT.execute(post)) {
int statusCode = httpResponse.getStatusLine().getStatusCode();
try (CloseableHttpResponse response = HTTP_CLIENT.execute(post)) {
int statusCode = response.getStatusLine().getStatusCode();
log.info("Post url: {}, statusCode: {}", url, statusCode);
if (statusCode != HttpStatus.SC_OK) {
String errorMsg = buildErrorMessage("Post", url, statusCode,
httpResponse.getStatusLine().getReasonPhrase());
log.error(errorMsg);
throw new InternalException(errorMsg, ErrorCode.API_ERROR);
response.getStatusLine().getReasonPhrase());
log.warn(errorMsg);
}
HttpEntity entity = httpResponse.getEntity();
return EntityUtils.toByteArray(entity);
String entity = response.getEntity() != null ? EntityUtils.toString(response.getEntity()) : null;
return new HttpResponse(statusCode, entity, response.getAllHeaders());
} catch (IOException e) {
log.error("Post request fail", e);
log.warn("Post request fail", e);
throw new InternalException(e, ErrorCode.API_ERROR);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@

package com.tencent.bk.job.execute.engine.listener;

import com.tencent.bk.job.common.metrics.CommonMetricNames;
import com.tencent.bk.job.common.metrics.CommonMetricTags;
import com.tencent.bk.job.common.util.http.HttpConPoolUtil;
import com.tencent.bk.job.common.util.http.HttpResponse;
import com.tencent.bk.job.common.util.json.JsonUtils;
import com.tencent.bk.job.execute.engine.model.JobCallbackDTO;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;
import org.springframework.stereotype.Component;

import java.net.MalformedURLException;
Expand All @@ -39,40 +44,80 @@
@Component
@Slf4j
public class CallbackListener {
private final MeterRegistry meterRegistry;

public CallbackListener(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

/**
* 处理回调请求
*/
public void handleMessage(JobCallbackDTO callbackDTO) {
long taskInstanceId = callbackDTO.getId();
String callbackUrl = callbackDTO.getCallbackUrl();

try {
log.info("Handle callback, taskInstanceId: {}, msg: {}", taskInstanceId, callbackDTO);
String callbackUrl = callbackDTO.getCallbackUrl();
try {
new URL(callbackUrl);
} catch (MalformedURLException var5) {
log.warn("Callback fail, bad url: {}", callbackUrl);
return;
validateUrl(callbackUrl);

HttpResponse response = callbackRequest(callbackUrl, callbackDTO);

// 回调状态码不是200,重试一次
if (response.getStatusCode() != HttpStatus.SC_OK) {
log.warn("Callback failed, retrying. taskInstanceId: {}, statusCode: {}",
taskInstanceId, response.getStatusCode());
response = callbackRequest(callbackUrl, callbackDTO);
}
log.info("Final callback {}, taskInstanceId: {}, statusCode: {}, result: {}",
response.getStatusCode() == HttpStatus.SC_OK ? "success" : "fail",
taskInstanceId,
response.getStatusCode(),
response.getEntity());
} catch (MalformedURLException e) {
log.warn("Invalid callback URL: "+callbackUrl, e);
recordCallbackMetrics("unknown");
}
}

/**
* 校验URL是否合法
*/
private void validateUrl(String callbackUrl) throws MalformedURLException {
new URL(callbackUrl);
}

/**
* 执行回调请求
*/
private HttpResponse callbackRequest(String callbackUrl, JobCallbackDTO callbackDTO) {
try {
callbackDTO.setCallbackUrl(null);
try {
// TODO 需要优化,返回application/json
try {
String rst = HttpConPoolUtil.post(callbackUrl, JsonUtils.toJson(callbackDTO));
log.info("Callback success, taskInstanceId: {}, result: {}", taskInstanceId, rst);
} catch (Throwable e) { //出错重试一次
String errorMsg = "Callback fail, taskInstanceId: " + taskInstanceId;
log.warn(errorMsg, e);
String rst = HttpConPoolUtil.post(callbackUrl, JsonUtils.toJson(callbackDTO));
log.info("Retry callback success, taskInstanceId: {}, result: {}", taskInstanceId, rst);
}
} catch (Throwable e) {
String errorMsg = "Callback fail, taskInstanceId: " + taskInstanceId;
log.warn(errorMsg, e);
}
HttpResponse response = HttpConPoolUtil.post(callbackUrl, JsonUtils.toJson(callbackDTO));
recordCallbackMetrics(response.getStatusCode());
return response;
} catch (Throwable e) {
String errorMsg = "Callback fail, taskInstanceId: " + taskInstanceId;
String errorMsg = String.format("Callback request failed, taskInstanceId: %s, url: %s",
callbackDTO.getId(),
callbackUrl);
log.warn(errorMsg, e);
recordCallbackMetrics("unknown");
return new HttpResponse(500, null, null);
}
}

/**
* 记录回调请求的监控指标
*/
private void recordCallbackMetrics(int statusCode) {
recordCallbackMetrics(String.valueOf(statusCode));
}

private void recordCallbackMetrics(String statusCode) {
meterRegistry.counter(
CommonMetricNames.TASK_CALLBACK_HTTP_STATUS,
CommonMetricTags.KEY_HTTP_STATUS,
statusCode
).increment();
}
}

0 comments on commit 45ae89f

Please sign in to comment.