Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Correct parsing errors in ProcessorParseApsaraNative with large buffer input #1255

Merged
merged 12 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 40 additions & 28 deletions core/processor/ProcessorParseApsaraNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
*/

#include "processor/ProcessorParseApsaraNative.h"

#include <algorithm>

#include "app_config/AppConfig.h"
#include "common/Constants.h"
#include "models/LogEvent.h"
#include "app_config/AppConfig.h"
#include "monitor/MetricConstants.h"
#include "parser/LogParser.h" // for UNMATCH_LOG_KEY
#include "plugin/instance/ProcessorInstance.h"
#include "monitor/MetricConstants.h"
#include <algorithm>


namespace logtail {
const std::string ProcessorParseApsaraNative::sName = "processor_parse_apsara_native";
Expand Down Expand Up @@ -70,15 +71,18 @@ void ProcessorParseApsaraNative::Process(PipelineEventGroup& logGroup) {
return;
}

bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, PipelineEventPtr& e, LogtailTime& lastLogTime, StringView& timeStrCache) {
bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
quzard marked this conversation as resolved.
Show resolved Hide resolved
PipelineEventPtr& e,
LogtailTime& lastLogTime,
StringView& timeStrCache) {
if (!IsSupportedEvent(e)) {
return true;
}
LogEvent& sourceEvent = e.Cast<LogEvent>();
if (!sourceEvent.HasContent(mSourceKey)) {
return true;
}
StringView buffer = sourceEvent.GetContent(mSourceKey);
auto buffer = sourceEvent.GetContent(mSourceKey);
quzard marked this conversation as resolved.
Show resolved Hide resolved
mProcParseInSizeBytes->Add(buffer.size());
int64_t logTime_in_micro = 0;
time_t logTime = ApsaraEasyReadLogTimeParser(buffer, timeStrCache, lastLogTime, logTime_in_micro);
Expand All @@ -98,10 +102,10 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
}

GetContext().GetAlarm().SendAlarm(PARSE_TIME_FAIL_ALARM,
bufOut.to_string() + " $ " + ToString(logTime),
GetContext().GetProjectName(),
GetContext().GetLogstoreName(),
GetContext().GetRegion());
bufOut.to_string() + " $ " + ToString(logTime),
GetContext().GetProjectName(),
GetContext().GetLogstoreName(),
GetContext().GetRegion());
mProcParseErrorTotal->Add(1);
++(*mParseFailures);
if (!mDiscardUnmatch) {
Expand Down Expand Up @@ -130,7 +134,8 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
"logstore", GetContext().GetLogstoreName())("file", logPath));
}
GetContext().GetAlarm().SendAlarm(OUTDATED_LOG_ALARM,
std::string("logTime: ") + ToString(logTime) + ", log:" + bufOut.to_string(),
std::string("logTime: ") + ToString(logTime)
+ ", log:" + bufOut.to_string(),
GetContext().GetProjectName(),
GetContext().GetLogstoreName(),
GetContext().GetRegion());
Expand All @@ -144,17 +149,18 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
sourceEvent.SetTimestamp(logTime, logTime_in_micro * 1000 % 1000000000);
int32_t beg_index = 0;
int32_t colon_index = -1;
int32_t index = -1;
size_t index = -1;
quzard marked this conversation as resolved.
Show resolved Hide resolved
index = ParseApsaraBaseFields(buffer, sourceEvent);
bool sourceKeyOverwritten = mSourceKeyOverwritten;
bool rawLogTagOverwritten = false;
if (buffer.data()[index] != 0) {
do {
++index;
quzard marked this conversation as resolved.
Show resolved Hide resolved
if (buffer.data()[index] == '\t' || buffer.data()[index] == '\0') {
if (buffer.data()[index] == '\t' || buffer.data()[index] == '\0' || index == buffer.size()) {
if (colon_index >= 0) {
StringView key(buffer.data() + beg_index, colon_index - beg_index);
AddLog(key, StringView(buffer.data() + colon_index + 1, index - colon_index - 1), sourceEvent);
StringView data(buffer.data() + colon_index + 1, index - colon_index - 1);
AddLog(key, data, sourceEvent);
if (key == mSourceKey) {
sourceKeyOverwritten = true;
}
Expand All @@ -167,7 +173,7 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
} else if (buffer.data()[index] == ':' && colon_index == -1) {
colon_index = index;
}
} while (buffer.data()[index]);
} while (index <= buffer.size());
quzard marked this conversation as resolved.
Show resolved Hide resolved
}
// TODO: deprecated
if (mAdjustApsaraMicroTimezone) {
Expand All @@ -189,7 +195,10 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
return true;
}

time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffer, StringView& timeStr, LogtailTime& lastLogTime, int64_t& microTime) {
time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffer,
StringView& timeStr,
LogtailTime& lastLogTime,
int64_t& microTime) {
if (buffer[0] != '[') {
return 0;
}
Expand All @@ -198,8 +207,7 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
int nanosecondLength = 0;
auto strptimeResult = Strptime(buffer.data() + 1, "%s", &lastLogTime, nanosecondLength);
if (NULL == strptimeResult || strptimeResult[0] != ']') {
LOG_WARNING(sLogger,
("parse apsara log time", "fail")("string", buffer)("timeformat", "%s"));
LOG_WARNING(sLogger, ("parse apsara log time", "fail")("string", buffer)("timeformat", "%s"));
quzard marked this conversation as resolved.
Show resolved Hide resolved
return 0;
}
microTime = (int64_t)lastLogTime.tv_sec * 1000000 + lastLogTime.tv_nsec / 1000;
Expand Down Expand Up @@ -239,9 +247,9 @@ bool ProcessorParseApsaraNative::IsPrefixString(const char* all, const StringVie
return true;
}

static int32_t FindBaseFields(StringView& buffer, int32_t beginIndexArray[], int32_t endIndexArray[]) {
static int32_t FindBaseFields(const StringView& buffer, int32_t beginIndexArray[], int32_t endIndexArray[]) {
int32_t baseFieldNum = 0;
for (int32_t i = 0; buffer[i] != 0; i++) {
for (size_t i = 0; i < buffer.size(); i++) {
if (buffer[i] == '[') {
beginIndexArray[baseFieldNum] = i + 1;
} else if (buffer[i] == ']') {
Expand All @@ -260,7 +268,7 @@ static int32_t FindBaseFields(StringView& buffer, int32_t beginIndexArray[], int
return baseFieldNum;
}

static bool IsFieldLevel(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
static bool IsFieldLevel(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] > 'Z' || buffer[i] < 'A') {
return false;
Expand All @@ -269,7 +277,7 @@ static bool IsFieldLevel(StringView& buffer, int32_t beginIndex, int32_t endInde
return true;
}

static bool IsFieldThread(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
static bool IsFieldThread(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] > '9' || buffer[i] < '0') {
return false;
Expand All @@ -278,7 +286,7 @@ static bool IsFieldThread(StringView& buffer, int32_t beginIndex, int32_t endInd
return true;
}

static bool IsFieldFileLine(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
static bool IsFieldFileLine(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] == '/' || buffer[i] == '.') {
return true;
Expand All @@ -287,7 +295,7 @@ static bool IsFieldFileLine(StringView& buffer, int32_t beginIndex, int32_t endI
return false;
}

static int32_t FindColonIndex(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
static int32_t FindColonIndex(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] == ':') {
return i;
Expand All @@ -296,7 +304,7 @@ static int32_t FindColonIndex(StringView& buffer, int32_t beginIndex, int32_t en
return endIndex;
}

int32_t ProcessorParseApsaraNative::ParseApsaraBaseFields(StringView& buffer, LogEvent& sourceEvent) {
int32_t ProcessorParseApsaraNative::ParseApsaraBaseFields(const StringView& buffer, LogEvent& sourceEvent) {
int32_t beginIndexArray[LogParser::MAX_BASE_FIELD_NUM] = {0};
int32_t endIndexArray[LogParser::MAX_BASE_FIELD_NUM] = {0};
int32_t baseFieldNum = FindBaseFields(buffer, beginIndexArray, endIndexArray);
Expand All @@ -311,16 +319,20 @@ int32_t ProcessorParseApsaraNative::ParseApsaraBaseFields(StringView& buffer, Lo
endIndex = endIndexArray[i];
if ((findFieldBitMap & 0x1) == 0 && IsFieldLevel(buffer, beginIndex, endIndex)) {
findFieldBitMap |= 0x1;
AddLog(LogParser::SLS_KEY_LEVEL, StringView(buffer.data() + beginIndex, endIndex - beginIndex), sourceEvent);
AddLog(
LogParser::SLS_KEY_LEVEL, StringView(buffer.data() + beginIndex, endIndex - beginIndex), sourceEvent);
} else if ((findFieldBitMap & 0x10) == 0 && IsFieldThread(buffer, beginIndex, endIndex)) {
findFieldBitMap |= 0x10;
AddLog(LogParser::SLS_KEY_THREAD, StringView(buffer.data() + beginIndex, endIndex - beginIndex), sourceEvent);
AddLog(
LogParser::SLS_KEY_THREAD, StringView(buffer.data() + beginIndex, endIndex - beginIndex), sourceEvent);
} else if ((findFieldBitMap & 0x100) == 0 && IsFieldFileLine(buffer, beginIndex, endIndex)) {
findFieldBitMap |= 0x100;
int32_t colonIndex = FindColonIndex(buffer, beginIndex, endIndex);
AddLog(LogParser::SLS_KEY_FILE, StringView(buffer.data() + beginIndex, endIndex - beginIndex), sourceEvent);
if (colonIndex < endIndex) {
AddLog(LogParser::SLS_KEY_LINE, StringView(buffer.data() + colonIndex + 1, endIndex - colonIndex - 1), sourceEvent);
AddLog(LogParser::SLS_KEY_LINE,
StringView(buffer.data() + colonIndex + 1, endIndex - colonIndex - 1),
sourceEvent);
}
}
}
Expand Down
13 changes: 8 additions & 5 deletions core/processor/ProcessorParseApsaraNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
* limitations under the License.
*/

#include "plugin/interface/Processor.h"
#include <string>
#include <boost/regex.hpp>
#include <string>

#include "plugin/interface/Processor.h"

namespace logtail {

Expand All @@ -32,12 +33,14 @@ class ProcessorParseApsaraNative : public Processor {
bool IsSupportedEvent(const PipelineEventPtr& e) const override;

private:
bool ProcessEvent(const StringView& logPath, PipelineEventPtr& e, LogtailTime& lastLogTime, StringView& timeStrCache);
bool
ProcessEvent(const StringView& logPath, PipelineEventPtr& e, LogtailTime& lastLogTime, StringView& timeStrCache);
void AddLog(const StringView& key, const StringView& value, LogEvent& targetEvent);
time_t ApsaraEasyReadLogTimeParser(StringView& buffer, StringView& timeStr, LogtailTime& lastLogTime, int64_t& microTime);
time_t
ApsaraEasyReadLogTimeParser(StringView& buffer, StringView& timeStr, LogtailTime& lastLogTime, int64_t& microTime);
int32_t GetApsaraLogMicroTime(StringView& buffer);
bool IsPrefixString(const char* all, const StringView& prefix);
int32_t ParseApsaraBaseFields(StringView& buffer, LogEvent& sourceEvent);
int32_t ParseApsaraBaseFields(const StringView& buffer, LogEvent& sourceEvent);

std::string mSourceKey;
std::string mRawLogTag;
Expand Down
Loading
Loading