Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport]Fix timestamp parsing compatibility issue for apsara log #1285 #1302

Merged
merged 1 commit into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions core/processor/ProcessorParseApsaraNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
LOG_WARNING(sLogger, ("parse apsara log time", "fail")("string", buffer));
return 0;
}
// strTime is the content between '[' and ']' and ends with '\0'
std::string strTime = buffer.substr(1, pos).to_string();
auto strptimeResult = Strptime(strTime.c_str(), "%s", &lastLogTime, nanosecondLength);
if (NULL == strptimeResult || strptimeResult[0] != ']') {
Expand All @@ -244,6 +245,7 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
LOG_WARNING(sLogger, ("parse apsara log time", "fail")("string", buffer));
return 0;
}
// strTime is the content between '[' and ']' and ends with '\0'
std::string strTime = buffer.substr(1, pos).to_string();
if (IsPrefixString(strTime.c_str(), timeStr) == true) {
microTime = (int64_t)lastLogTime.tv_sec * 1000000 + lastLogTime.tv_nsec / 1000;
Expand All @@ -252,12 +254,21 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
struct tm tm;
memset(&tm, 0, sizeof(tm));
int nanosecondLength = 0;
auto strptimeResult = Strptime(strTime.c_str(), "%Y-%m-%d %H:%M:%S.%f", &lastLogTime, nanosecondLength);
if (NULL == strptimeResult || strptimeResult[0] != ']') {
// parse second part
auto strptimeResult = Strptime(strTime.c_str(), "%Y-%m-%d %H:%M:%S", &lastLogTime, nanosecondLength);
if (NULL == strptimeResult) {
LOG_WARNING(sLogger,
("parse apsara log time", "fail")("string", buffer)("timeformat", "%Y-%m-%d %H:%M:%S.%f"));
("parse apsara log time", "fail")("string", buffer)("timeformat", "%Y-%m-%d %H:%M:%S"));
return 0;
}
// parse nanosecond part (optional)
if (*strptimeResult != '\0') {
strptimeResult = Strptime(strptimeResult + 1, "%f", &lastLogTime, nanosecondLength);
if (NULL == strptimeResult) {
LOG_WARNING(sLogger,
("parse apsara log time", "fail")("string", buffer)("timeformat", "%Y-%m-%d %H:%M:%S.%f"));
}
}
// if the time is valid (strptime not return NULL), the date value size must be 19 ,like '2013-09-11 03:11:05'
timeStr = StringView(buffer.data() + 1, 19);
lastLogTime.tv_sec = lastLogTime.tv_sec - mLogTimeZoneOffsetSecond;
Expand Down
161 changes: 152 additions & 9 deletions core/unittest/processor/ProcessorParseApsaraNativeUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ProcessorParseApsaraNativeUnittest : public ::testing::Test {
void TestProcessEventKeepUnmatch();
void TestProcessEventDiscardUnmatch();
void TestMultipleLines();
void TestProcessEventMicrosecondUnmatch();

PipelineContext mContext;
};
Expand All @@ -57,6 +58,7 @@ UNIT_TEST_CASE(ProcessorParseApsaraNativeUnittest, TestAddLog);
UNIT_TEST_CASE(ProcessorParseApsaraNativeUnittest, TestProcessEventKeepUnmatch);
UNIT_TEST_CASE(ProcessorParseApsaraNativeUnittest, TestProcessEventDiscardUnmatch);
UNIT_TEST_CASE(ProcessorParseApsaraNativeUnittest, TestMultipleLines);
UNIT_TEST_CASE(ProcessorParseApsaraNativeUnittest, TestProcessEventMicrosecondUnmatch);

void ProcessorParseApsaraNativeUnittest::TestMultipleLines() {
// 第一个contents 测试多行下的解析,第二个contents测试多行下time的解析
Expand Down Expand Up @@ -401,16 +403,43 @@ void ProcessorParseApsaraNativeUnittest::TestProcessWholeLinePart() {
processorInstance.Process(eventGroup);
// judge result
std::string outJson = eventGroup.ToJsonString();
APSARA_TEST_STREQ_FATAL("null", CompactJson(outJson).c_str());
std::string expectJson = R"({
"events": [
{
"contents": {
"/ilogtail/AppConfigBase.cpp": "100",
"AppConfigBase AppConfigBase": "success",
"__LEVEL__": "INFO",
"__THREAD__": "385658",
"log.file.offset":"0",
"microtime": "1693833300000000"
},
"timestamp": 1693833300,
"timestampNanosecond": 0,
"type": 1
},
{
"contents": {
"/ilogtail/AppConfigBase.cpp": "100",
"AppConfigBase AppConfigBase": "success",
"__THREAD__": "385658",
"log.file.offset":"0",
"microtime": "1693833360000000"
},
"timestamp": 1693833360,
"timestampNanosecond": 0,
"type": 1
}
]
})";
APSARA_TEST_STREQ_FATAL(CompactJson(expectJson).c_str(), CompactJson(outJson).c_str());
// check observablity
int count = 3;
APSARA_TEST_EQUAL_FATAL(count, processor.GetContext().GetProcessProfile().parseFailures);
APSARA_TEST_EQUAL_FATAL(uint64_t(count), processorInstance.mProcInRecordsTotal->GetValue());
// discard unmatch, so output is 0
APSARA_TEST_EQUAL_FATAL(uint64_t(0), processorInstance.mProcOutRecordsTotal->GetValue());
APSARA_TEST_EQUAL_FATAL(uint64_t(0), processor.mProcParseOutSizeBytes->GetValue());
APSARA_TEST_EQUAL_FATAL(uint64_t(count), processor.mProcDiscardRecordsTotal->GetValue());
APSARA_TEST_EQUAL_FATAL(uint64_t(count), processor.mProcParseErrorTotal->GetValue());
APSARA_TEST_EQUAL_FATAL(1, processor.GetContext().GetProcessProfile().parseFailures);
APSARA_TEST_EQUAL_FATAL(uint64_t(3), processorInstance.mProcInRecordsTotal->GetValue());
// only timestamp failed, so output is 2
APSARA_TEST_EQUAL_FATAL(uint64_t(2), processorInstance.mProcOutRecordsTotal->GetValue());
APSARA_TEST_EQUAL_FATAL(uint64_t(1), processor.mProcDiscardRecordsTotal->GetValue());
APSARA_TEST_EQUAL_FATAL(uint64_t(1), processor.mProcParseErrorTotal->GetValue());
}

void ProcessorParseApsaraNativeUnittest::TestProcessKeyOverwritten() {
Expand Down Expand Up @@ -817,6 +846,120 @@ void ProcessorParseApsaraNativeUnittest::TestProcessEventDiscardUnmatch() {
APSARA_TEST_EQUAL_FATAL(uint64_t(count), processor.mProcParseErrorTotal->GetValue());
}

void ProcessorParseApsaraNativeUnittest::TestProcessEventMicrosecondUnmatch() {
// make config
Config config;
config.mDiscardUnmatch = false;
config.mUploadRawLog = false;

// make events
auto sourceBuffer = std::make_shared<SourceBuffer>();
PipelineEventGroup eventGroup(sourceBuffer);
std::string inJson = R"({
"events" :
[
{
"contents" :
{
"content" : "[2023-09-04 13:15:04,862181] [INFO] [385658] /ilogtail/AppConfigBase.cpp:100 AppConfigBase AppConfigBase:success"
},
"timestamp" : 12345678901,
"type" : 1
},
{
"contents" :
{
"content" : "[2023-09-04 13:16:04] [INFO] [385658] /ilogtail/AppConfigBase.cpp:100 AppConfigBase AppConfigBase:success"
},
"timestamp" : 12345678901,
"type" : 1
},
{
"contents" :
{
"content" : "[2023-09-04 13:17:04,1] [INFO] [385658] /ilogtail/AppConfigBase.cpp:100 AppConfigBase AppConfigBase:success"
},
"timestamp" : 12345678901,
"type" : 1
},
{
"contents" :
{
"content" : "[2023-09-04 13:18:04"
},
"timestamp" : 12345678901,
"type" : 1
}
]
})";
eventGroup.FromJsonString(inJson);
// run function
std::string pluginId = "testID";
ProcessorParseApsaraNative* processor = new ProcessorParseApsaraNative;
ProcessorInstance processorInstance(processor, pluginId);
ComponentConfig componentConfig(pluginId, config);
APSARA_TEST_TRUE_FATAL(processorInstance.Init(componentConfig, mContext));
processorInstance.Process(eventGroup);

// judge result
std::string expectJson = R"({
"events": [
{
"contents": {
"/ilogtail/AppConfigBase.cpp": "100",
"AppConfigBase AppConfigBase": "success",
"__LEVEL__": "INFO",
"__THREAD__": "385658",
"microtime": "1693833304862181"
},
"timestamp": 1693833304,
"timestampNanosecond": 862181000,
"type": 1
},
{
"contents": {
"/ilogtail/AppConfigBase.cpp": "100",
"AppConfigBase AppConfigBase": "success",
"__LEVEL__": "INFO",
"__THREAD__": "385658",
"microtime": "1693833364000000"
},
"timestamp": 1693833364,
"timestampNanosecond": 0,
"type": 1
},
{
"contents": {
"/ilogtail/AppConfigBase.cpp": "100",
"AppConfigBase AppConfigBase": "success",
"__LEVEL__": "INFO",
"__THREAD__": "385658",
"microtime": "1693833424100000"
},
"timestamp": 1693833424,
"timestampNanosecond": 100000000,
"type": 1
},
{
"contents": {
"__raw_log__": "[2023-09-04 13:18:04",
"content":"[2023-09-04 13:18:04"
},
"timestamp": 12345678901,
"timestampNanosecond": 0,
"type": 1
}
]
})";
std::string outJson = eventGroup.ToJsonString();
APSARA_TEST_STREQ_FATAL(CompactJson(expectJson).c_str(), CompactJson(outJson).c_str());
// check observablity
APSARA_TEST_EQUAL_FATAL(1, processor->GetContext().GetProcessProfile().parseFailures);
APSARA_TEST_EQUAL_FATAL(uint64_t(4), processorInstance.mProcInRecordsTotal->GetValue());
APSARA_TEST_EQUAL_FATAL(uint64_t(4), processorInstance.mProcOutRecordsTotal->GetValue());
APSARA_TEST_EQUAL_FATAL(uint64_t(0), processor->mProcDiscardRecordsTotal->GetValue());
APSARA_TEST_EQUAL_FATAL(uint64_t(1), processor->mProcParseErrorTotal->GetValue());
}
} // namespace logtail

UNIT_TEST_MAIN
Loading