Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Correct parsing errors in ProcessorParseApsaraNative with large buffer input #1255

Merged
merged 12 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/common/JsonUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ std::string CompactJson(const std::string& inJson) {
case '\\':
if (++ch == inJson.end()) { // skip next char after escape char
--ch;
} else {
outJson << '\\';
}
break;
default:
Expand Down
12 changes: 6 additions & 6 deletions core/config/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,26 @@ class Config {
std::shared_ptr<LogFilterRule> mFilterRule;
bool mLocalStorage;
int mVersion;
bool mDiscardNoneUtf8;
bool mDiscardNoneUtf8 = false;
std::string mAliuid;
std::string mRegion;
std::string mStreamLogTag;
bool mDiscardUnmatch;
bool mDiscardUnmatch = false;
std::vector<std::string> mColumnKeys;
std::string mSeparator;
char mQuote;
// for delimiter log, accept logs without enough keys or not
// eg, keys -> [a, b, c], raw log "xx|yy", log -> [a->xx, b->yy]
bool mAcceptNoEnoughKeys;
bool mAutoExtend;
bool mAcceptNoEnoughKeys = false;
bool mAutoExtend = true;
std::string mTimeKey;
std::vector<std::string> mShardHashKey;
bool mTailExisted;
std::unordered_map<std::string, std::vector<SensitiveWordCastOption>> mSensitiveWordCastOptions;
bool mUploadRawLog; // true to update raw log to sls
bool mUploadRawLog = false; // true to update raw log to sls
bool mSimpleLogFlag;
bool mTimeZoneAdjust;
int mLogTimeZoneOffsetSecond;
int mLogTimeZoneOffsetSecond = 0;
int32_t mCreateTime; // create time of this config
int32_t
mMaxSendBytesPerSecond; // limit for logstore, not just this config. so if we have multi configs with different
Expand Down
113 changes: 96 additions & 17 deletions core/processor/ProcessorParseApsaraNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ void ProcessorParseApsaraNative::Process(PipelineEventGroup& logGroup) {
return;
}

/*
* 处理单个日志事件。
* @param logPath - 日志文件的路径。
* @param e - 指向待处理日志事件的智能指针。
* @param lastLogTime - 上一条日志的时间戳(秒)。
* @param timeStrCache - 缓存时间字符串,用于比较和更新。
* @return 如果事件被处理且保留,则返回true,如果事件被丢弃,则返回false。
*/
bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, PipelineEventPtr& e, LogtailTime& lastLogTime, StringView& timeStrCache) {
if (!IsSupportedEvent(e)) {
return true;
Expand All @@ -79,6 +87,9 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
return true;
}
StringView buffer = sourceEvent.GetContent(mSourceKey);
if (buffer.size() == 0) {
return true;
}
mProcParseInSizeBytes->Add(buffer.size());
int64_t logTime_in_micro = 0;
time_t logTime = ApsaraEasyReadLogTimeParser(buffer, timeStrCache, lastLogTime, logTime_in_micro);
Expand Down Expand Up @@ -148,13 +159,14 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
index = ParseApsaraBaseFields(buffer, sourceEvent);
bool sourceKeyOverwritten = mSourceKeyOverwritten;
bool rawLogTagOverwritten = false;
if (buffer.data()[index] != 0) {
do {
++index;
if (buffer.data()[index] == '\t' || buffer.data()[index] == '\0') {
int32_t length = buffer.size();
if (index < length) {
for (index = index + 1; index <= length; ++index) {
if (index == length || buffer.data()[index] == '\t') {
if (colon_index >= 0) {
StringView key(buffer.data() + beg_index, colon_index - beg_index);
AddLog(key, StringView(buffer.data() + colon_index + 1, index - colon_index - 1), sourceEvent);
StringView data(buffer.data() + colon_index + 1, index - colon_index - 1);
AddLog(key, data, sourceEvent);
if (key == mSourceKey) {
sourceKeyOverwritten = true;
}
Expand All @@ -167,7 +179,7 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
} else if (buffer.data()[index] == ':' && colon_index == -1) {
colon_index = index;
}
} while (buffer.data()[index]);
}
}
// TODO: deprecated
if (mAdjustApsaraMicroTimezone) {
Expand All @@ -189,14 +201,28 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
return true;
}

/*
* 解析Apsara格式日志的时间。
* @param buffer - 包含日志数据的字符串视图。
* @param timeStr - 解析后的时间字符串。
* @param lastLogTime - 上一条日志的时间戳(秒)。
* @param microTime - 解析出的微秒时间戳。
* @return 解析出的时间戳(秒),如果解析失败,则返回0。
*/
time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffer, StringView& timeStr, LogtailTime& lastLogTime, int64_t& microTime) {
if (buffer[0] != '[') {
return 0;
}
if (buffer[1] == '1') // for normal time, e.g 1378882630, starts with '1'
{
int nanosecondLength = 0;
auto strptimeResult = Strptime(buffer.data() + 1, "%s", &lastLogTime, nanosecondLength);
size_t pos = buffer.find(']', 1);
if (pos == std::string::npos) {
LOG_WARNING(sLogger, ("parse apsara log time", "fail")("string", buffer));
return 0;
}
std::string strTime = buffer.substr(1, pos).to_string();
auto strptimeResult = Strptime(strTime.c_str(), "%s", &lastLogTime, nanosecondLength);
if (NULL == strptimeResult || strptimeResult[0] != ']') {
LOG_WARNING(sLogger,
("parse apsara log time", "fail")("string", buffer)("timeformat", "%s"));
Expand All @@ -207,14 +233,20 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
}
// test other date format case
{
if (IsPrefixString(buffer.data() + 1, timeStr) == true) {
size_t pos = buffer.find(']', 1);
if (pos == std::string::npos) {
LOG_WARNING(sLogger, ("parse apsara log time", "fail")("string", buffer));
return 0;
}
std::string strTime = buffer.substr(1, pos).to_string();
if (IsPrefixString(strTime.c_str(), timeStr) == true) {
microTime = (int64_t)lastLogTime.tv_sec * 1000000 + lastLogTime.tv_nsec / 1000;
return lastLogTime.tv_sec;
}
struct tm tm;
memset(&tm, 0, sizeof(tm));
int nanosecondLength = 0;
auto strptimeResult = Strptime(buffer.data() + 1, "%Y-%m-%d %H:%M:%S.%f", &lastLogTime, nanosecondLength);
auto strptimeResult = Strptime(strTime.c_str(), "%Y-%m-%d %H:%M:%S.%f", &lastLogTime, nanosecondLength);
if (NULL == strptimeResult || strptimeResult[0] != ']') {
LOG_WARNING(sLogger,
("parse apsara log time", "fail")("string", buffer)("timeformat", "%Y-%m-%d %H:%M:%S.%f"));
Expand All @@ -227,6 +259,12 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
}
}

/*
* 检查字符串是否包含指定的前缀。
* @param all - 完整的字符串。
* @param prefix - 要检查的前缀。
* @return 如果字符串以指定前缀开头,则返回true;否则返回false。
*/
bool ProcessorParseApsaraNative::IsPrefixString(const char* all, const StringView& prefix) {
if (prefix.size() == 0)
return false;
Expand All @@ -239,13 +277,20 @@ bool ProcessorParseApsaraNative::IsPrefixString(const char* all, const StringVie
return true;
}

static int32_t FindBaseFields(StringView& buffer, int32_t beginIndexArray[], int32_t endIndexArray[]) {
/*
* 查找Apsara格式日志的基础字段。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndexArray - 字段开始索引的数组。
* @param endIndexArray - 字段结束索引的数组。
* @return 解析到的基础字段数量。
*/
static int32_t FindBaseFields(const StringView& buffer, int32_t beginIndexArray[], int32_t endIndexArray[]) {
int32_t baseFieldNum = 0;
for (int32_t i = 0; buffer[i] != 0; i++) {
for (size_t i = 0; i < buffer.size(); i++) {
if (buffer[i] == '[') {
beginIndexArray[baseFieldNum] = i + 1;
} else if (buffer[i] == ']') {
if (buffer[i + 1] == '\t' || buffer[i + 1] == '\0' || buffer[i + 1] == '\n') {
if (buffer[i + 1] == '\t' || buffer[i + 1] == '\n') {
endIndexArray[baseFieldNum] = i;
baseFieldNum++;
}
Expand All @@ -260,7 +305,14 @@ static int32_t FindBaseFields(StringView& buffer, int32_t beginIndexArray[], int
return baseFieldNum;
}

static bool IsFieldLevel(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
/*
* 检查是否为日志级别字段。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndex - 字段开始的索引。
* @param endIndex - 字段结束的索引。
* @return 如果字段是日志级别,则返回true;否则返回false。
*/
static bool IsFieldLevel(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] > 'Z' || buffer[i] < 'A') {
return false;
Expand All @@ -269,7 +321,14 @@ static bool IsFieldLevel(StringView& buffer, int32_t beginIndex, int32_t endInde
return true;
}

static bool IsFieldThread(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
/*
* 检查是否为线程ID字段。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndex - 字段开始的索引。
* @param endIndex - 字段结束的索引。
* @return 如果字段是线程ID,则返回true;否则返回false。
*/
static bool IsFieldThread(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] > '9' || buffer[i] < '0') {
return false;
Expand All @@ -278,7 +337,14 @@ static bool IsFieldThread(StringView& buffer, int32_t beginIndex, int32_t endInd
return true;
}

static bool IsFieldFileLine(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
/*
* 检查是否为文件和行号字段。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndex - 字段开始的索引。
* @param endIndex - 字段结束的索引。
* @return 如果字段是文件和行号,则返回true;否则返回false。
*/
static bool IsFieldFileLine(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] == '/' || buffer[i] == '.') {
return true;
Expand All @@ -287,7 +353,14 @@ static bool IsFieldFileLine(StringView& buffer, int32_t beginIndex, int32_t endI
return false;
}

static int32_t FindColonIndex(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
/*
* 查找冒号字符的索引。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndex - 字段开始的索引。
* @param endIndex - 字段结束的索引。
* @return 冒号字符的索引,如果未找到,则返回endIndex。
*/
static int32_t FindColonIndex(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] == ':') {
return i;
Expand All @@ -296,7 +369,13 @@ static int32_t FindColonIndex(StringView& buffer, int32_t beginIndex, int32_t en
return endIndex;
}

int32_t ProcessorParseApsaraNative::ParseApsaraBaseFields(StringView& buffer, LogEvent& sourceEvent) {
/*
* 解析Apsara日志的基础字段并添加到日志事件中。
* @param buffer - 包含日志数据的字符串视图。
* @param sourceEvent - 引用到日志事件对象,用于添加解析出的字段。
* @return 返回处理完基础字段后的索引位置。
*/
int32_t ProcessorParseApsaraNative::ParseApsaraBaseFields(const StringView& buffer, LogEvent& sourceEvent) {
int32_t beginIndexArray[LogParser::MAX_BASE_FIELD_NUM] = {0};
int32_t endIndexArray[LogParser::MAX_BASE_FIELD_NUM] = {0};
int32_t baseFieldNum = FindBaseFields(buffer, beginIndexArray, endIndexArray);
Expand Down
2 changes: 1 addition & 1 deletion core/processor/ProcessorParseApsaraNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ProcessorParseApsaraNative : public Processor {
time_t ApsaraEasyReadLogTimeParser(StringView& buffer, StringView& timeStr, LogtailTime& lastLogTime, int64_t& microTime);
int32_t GetApsaraLogMicroTime(StringView& buffer);
bool IsPrefixString(const char* all, const StringView& prefix);
int32_t ParseApsaraBaseFields(StringView& buffer, LogEvent& sourceEvent);
int32_t ParseApsaraBaseFields(const StringView& buffer, LogEvent& sourceEvent);

std::string mSourceKey;
std::string mRawLogTag;
Expand Down
Loading
Loading