Skip to content

Commit

Permalink
fix: Correct parsing errors in ProcessorParseApsaraNative with large …
Browse files Browse the repository at this point in the history
…buffer input (alibaba#1255)

* fix 越界问题

* add comment

(cherry picked from commit ff0d88d)
  • Loading branch information
quzard committed Dec 8, 2023
1 parent 7ae34b3 commit 12e0fd3
Show file tree
Hide file tree
Showing 8 changed files with 675 additions and 131 deletions.
2 changes: 2 additions & 0 deletions core/common/JsonUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ string CompactJson(const string& inJson) {
case '\\':
if (++ch == inJson.end()) { // skip next char after escape char
--ch;
} else {
outJson << '\\';
}
break;
default:
Expand Down
113 changes: 96 additions & 17 deletions core/processor/ProcessorParseApsaraNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ void ProcessorParseApsaraNative::Process(PipelineEventGroup& logGroup) {
return;
}

/*
* 处理单个日志事件。
* @param logPath - 日志文件的路径。
* @param e - 指向待处理日志事件的智能指针。
* @param lastLogTime - 上一条日志的时间戳(秒)。
* @param timeStrCache - 缓存时间字符串,用于比较和更新。
* @return 如果事件被处理且保留,则返回true,如果事件被丢弃,则返回false。
*/
bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
PipelineEventPtr& e,
LogtailTime& lastLogTime,
Expand All @@ -99,6 +107,9 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
}
mSourceKeyOverwritten = false;
StringView buffer = sourceEvent.GetContent(mSourceKey);
if (buffer.size() == 0) {
return true;
}
mProcParseInSizeBytes->Add(buffer.size());
int64_t logTime_in_micro = 0;
time_t logTime = ApsaraEasyReadLogTimeParser(buffer, timeStrCache, lastLogTime, logTime_in_micro);
Expand Down Expand Up @@ -168,13 +179,14 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
int32_t colon_index = -1;
int32_t index = -1;
index = ParseApsaraBaseFields(buffer, sourceEvent);
if (buffer.data()[index] != 0) {
do {
++index;
if (buffer.data()[index] == '\t' || buffer.data()[index] == '\0') {
int32_t length = buffer.size();
if (index < length) {
for (index = index + 1; index <= length; ++index) {
if (index == length || buffer.data()[index] == '\t') {
if (colon_index >= 0) {
StringView key(buffer.data() + beg_index, colon_index - beg_index);
AddLog(key, StringView(buffer.data() + colon_index + 1, index - colon_index - 1), sourceEvent);
StringView data(buffer.data() + colon_index + 1, index - colon_index - 1);
AddLog(key, data, sourceEvent);
if (key == mSourceKey) {
mSourceKeyOverwritten = true;
}
Expand All @@ -184,7 +196,7 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
} else if (buffer.data()[index] == ':' && colon_index == -1) {
colon_index = index;
}
} while (buffer.data()[index]);
}
}
logTime_in_micro = (int64_t)logTime_in_micro - (int64_t)mLogTimeZoneOffsetSecond * (int64_t)1000000;
StringBuffer sb = sourceEvent.GetSourceBuffer()->AllocateStringBuffer(20);
Expand All @@ -203,6 +215,14 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
return true;
}

/*
* 解析Apsara格式日志的时间。
* @param buffer - 包含日志数据的字符串视图。
* @param timeStr - 解析后的时间字符串。
* @param lastLogTime - 上一条日志的时间戳(秒)。
* @param microTime - 解析出的微秒时间戳。
* @return 解析出的时间戳(秒),如果解析失败,则返回0。
*/
time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffer,
StringView& timeStr,
LogtailTime& lastLogTime,
Expand All @@ -213,7 +233,13 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
if (buffer[1] == '1') // for normal time, e.g 1378882630, starts with '1'
{
int nanosecondLength = 0;
auto strptimeResult = Strptime(buffer.data() + 1, "%s", &lastLogTime, nanosecondLength);
size_t pos = buffer.find(']', 1);
if (pos == std::string::npos) {
LOG_WARNING(sLogger, ("parse apsara log time", "fail")("string", buffer));
return 0;
}
std::string strTime = buffer.substr(1, pos).to_string();
auto strptimeResult = Strptime(strTime.c_str(), "%s", &lastLogTime, nanosecondLength);
if (NULL == strptimeResult || strptimeResult[0] != ']') {
LOG_WARNING(sLogger, ("parse apsara log time", "fail")("string", buffer)("timeformat", "%s"));
return 0;
Expand All @@ -223,14 +249,20 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
}
// test other date format case
{
if (IsPrefixString(buffer.data() + 1, timeStr) == true) {
size_t pos = buffer.find(']', 1);
if (pos == std::string::npos) {
LOG_WARNING(sLogger, ("parse apsara log time", "fail")("string", buffer));
return 0;
}
std::string strTime = buffer.substr(1, pos).to_string();
if (IsPrefixString(strTime.c_str(), timeStr) == true) {
microTime = (int64_t)lastLogTime.tv_sec * 1000000 + lastLogTime.tv_nsec / 1000;
return lastLogTime.tv_sec;
}
struct tm tm;
memset(&tm, 0, sizeof(tm));
int nanosecondLength = 0;
auto strptimeResult = Strptime(buffer.data() + 1, "%Y-%m-%d %H:%M:%S.%f", &lastLogTime, nanosecondLength);
auto strptimeResult = Strptime(strTime.c_str(), "%Y-%m-%d %H:%M:%S.%f", &lastLogTime, nanosecondLength);
if (NULL == strptimeResult || strptimeResult[0] != ']') {
LOG_WARNING(sLogger,
("parse apsara log time", "fail")("string", buffer)("timeformat", "%Y-%m-%d %H:%M:%S.%f"));
Expand All @@ -243,6 +275,12 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
}
}

/*
* 检查字符串是否包含指定的前缀。
* @param all - 完整的字符串。
* @param prefix - 要检查的前缀。
* @return 如果字符串以指定前缀开头,则返回true;否则返回false。
*/
bool ProcessorParseApsaraNative::IsPrefixString(const char* all, const StringView& prefix) {
if (prefix.size() == 0)
return false;
Expand All @@ -255,13 +293,20 @@ bool ProcessorParseApsaraNative::IsPrefixString(const char* all, const StringVie
return true;
}

static int32_t FindBaseFields(StringView& buffer, int32_t beginIndexArray[], int32_t endIndexArray[]) {
/*
* 查找Apsara格式日志的基础字段。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndexArray - 字段开始索引的数组。
* @param endIndexArray - 字段结束索引的数组。
* @return 解析到的基础字段数量。
*/
static int32_t FindBaseFields(const StringView& buffer, int32_t beginIndexArray[], int32_t endIndexArray[]) {
int32_t baseFieldNum = 0;
for (int32_t i = 0; buffer[i] != 0; i++) {
for (size_t i = 0; i < buffer.size(); i++) {
if (buffer[i] == '[') {
beginIndexArray[baseFieldNum] = i + 1;
} else if (buffer[i] == ']') {
if (buffer[i + 1] == '\t' || buffer[i + 1] == '\0' || buffer[i + 1] == '\n') {
if (buffer[i + 1] == '\t' || buffer[i + 1] == '\n') {
endIndexArray[baseFieldNum] = i;
baseFieldNum++;
}
Expand All @@ -276,7 +321,14 @@ static int32_t FindBaseFields(StringView& buffer, int32_t beginIndexArray[], int
return baseFieldNum;
}

static bool IsFieldLevel(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
/*
* 检查是否为日志级别字段。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndex - 字段开始的索引。
* @param endIndex - 字段结束的索引。
* @return 如果字段是日志级别,则返回true;否则返回false。
*/
static bool IsFieldLevel(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] > 'Z' || buffer[i] < 'A') {
return false;
Expand All @@ -285,7 +337,14 @@ static bool IsFieldLevel(StringView& buffer, int32_t beginIndex, int32_t endInde
return true;
}

static bool IsFieldThread(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
/*
* 检查是否为线程ID字段。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndex - 字段开始的索引。
* @param endIndex - 字段结束的索引。
* @return 如果字段是线程ID,则返回true;否则返回false。
*/
static bool IsFieldThread(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] > '9' || buffer[i] < '0') {
return false;
Expand All @@ -294,7 +353,14 @@ static bool IsFieldThread(StringView& buffer, int32_t beginIndex, int32_t endInd
return true;
}

static bool IsFieldFileLine(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
/*
* 检查是否为文件和行号字段。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndex - 字段开始的索引。
* @param endIndex - 字段结束的索引。
* @return 如果字段是文件和行号,则返回true;否则返回false。
*/
static bool IsFieldFileLine(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] == '/' || buffer[i] == '.') {
return true;
Expand All @@ -303,7 +369,14 @@ static bool IsFieldFileLine(StringView& buffer, int32_t beginIndex, int32_t endI
return false;
}

static int32_t FindColonIndex(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
/*
* 查找冒号字符的索引。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndex - 字段开始的索引。
* @param endIndex - 字段结束的索引。
* @return 冒号字符的索引,如果未找到,则返回endIndex。
*/
static int32_t FindColonIndex(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] == ':') {
return i;
Expand All @@ -312,7 +385,13 @@ static int32_t FindColonIndex(StringView& buffer, int32_t beginIndex, int32_t en
return endIndex;
}

int32_t ProcessorParseApsaraNative::ParseApsaraBaseFields(StringView& buffer, LogEvent& sourceEvent) {
/*
* 解析Apsara日志的基础字段并添加到日志事件中。
* @param buffer - 包含日志数据的字符串视图。
* @param sourceEvent - 引用到日志事件对象,用于添加解析出的字段。
* @return 返回处理完基础字段后的索引位置。
*/
int32_t ProcessorParseApsaraNative::ParseApsaraBaseFields(const StringView& buffer, LogEvent& sourceEvent) {
int32_t beginIndexArray[MAX_BASE_FIELD_NUM] = {0};
int32_t endIndexArray[MAX_BASE_FIELD_NUM] = {0};
int32_t baseFieldNum = FindBaseFields(buffer, beginIndexArray, endIndexArray);
Expand Down
2 changes: 1 addition & 1 deletion core/processor/ProcessorParseApsaraNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ProcessorParseApsaraNative : public Processor {
time_t
ApsaraEasyReadLogTimeParser(StringView& buffer, StringView& timeStr, LogtailTime& lastLogTime, int64_t& microTime);
bool IsPrefixString(const char* all, const StringView& prefix);
int32_t ParseApsaraBaseFields(StringView& buffer, LogEvent& sourceEvent);
int32_t ParseApsaraBaseFields(const StringView& buffer, LogEvent& sourceEvent);

int32_t mLogTimeZoneOffsetSecond = 0;
bool mSourceKeyOverwritten = false;
Expand Down
Loading

0 comments on commit 12e0fd3

Please sign in to comment.