Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix consumer progress #296

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions consumer/shard_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (consumer *ShardConsumerWorker) consume() {
consumer.updateStatus(false)
return
}
err := consumer.nextFetchTask()
consumer.updateStatus(err == nil && consumer.lastFetchGroupCount > 0)
hasProgress, err := consumer.nextFetchTask()
consumer.updateStatus(err == nil && hasProgress)
}()
case PROCESSING:
go func() {
Expand Down
22 changes: 12 additions & 10 deletions consumer/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) {
if err != nil {
return "", err
}
if checkpoint != "" && err == nil {
if checkpoint != "" {
Copy link
Collaborator Author

@crimson-gao crimson-gao Nov 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove redundant err check

consumer.consumerCheckPointTracker.initCheckPoint(checkpoint)
return checkpoint, nil
}
Expand Down Expand Up @@ -45,23 +45,23 @@ func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) {
return "", errors.New("CursorPositionError")
}

func (consumer *ShardConsumerWorker) nextFetchTask() error {
func (consumer *ShardConsumerWorker) nextFetchTask() (hasProgress bool, err error) {
// update last fetch time, for control fetch frequency
consumer.lastFetchTime = time.Now()

logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor)
cursor := consumer.nextFetchCursor
logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, cursor)
if err != nil {
return err
return false, err
}
// set cursors user to decide whether to save according to the execution of `process`
consumer.consumerCheckPointTracker.setCurrentCursor(consumer.nextFetchCursor)
consumer.lastFetchLogGroupList = logGroup
consumer.nextFetchCursor = pullLogMeta.NextCursor
consumer.lastFetchRawSize = pullLogMeta.RawSize
consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList)
consumer.lastFetchGroupCount = pullLogMeta.Count
if consumer.client.option.Query != "" {
consumer.lastFetchRawSizeBeforeQuery = pullLogMeta.RawSizeBeforeQuery
consumer.lastFetchGroupCountBeforeQuery = pullLogMeta.RawDataCountBeforeQuery
consumer.lastFetchGroupCountBeforeQuery = pullLogMeta.DataCountBeforeQuery
if consumer.lastFetchRawSizeBeforeQuery == -1 {
consumer.lastFetchRawSizeBeforeQuery = 0
}
Expand All @@ -74,13 +74,15 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error {
"shardId", consumer.shardId,
"fetch log count", consumer.lastFetchGroupCount,
)
if consumer.lastFetchGroupCount == 0 {

// if cursor == nextCursor, no progress is needed
if cursor == pullLogMeta.NextCursor {
consumer.lastFetchLogGroupList = nil
// may no new data can be pulled, no process func can trigger checkpoint saving
consumer.saveCheckPointIfNeeded()
return false, nil
}

return nil
return true, nil
}

func (consumer *ShardConsumerWorker) consumerProcessTask() (rollBackCheckpoint string, err error) {
Expand Down
36 changes: 30 additions & 6 deletions consumer/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ func TestStartAndStop(t *testing.T) {
CursorPosition: BEGIN_CURSOR,
}

worker := InitConsumerWorker(option, process)
worker := InitConsumerWorkerWithCheckpointTracker(option, process)

worker.Start()
worker.StopAndWait()
}

func process(shardId int, logGroupList *sls.LogGroupList) string {
fmt.Printf("shardId %d processing works sucess", shardId)
return ""
func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) {
fmt.Printf("shardId %d processing works sucess, logGroupSize: %d\n", shardId, len(logGroupList.LogGroups))
checkpointTracker.SaveCheckPoint(true)
return "", nil
}

func TestStartAndStopCredentialsProvider(t *testing.T) {
Expand All @@ -46,12 +47,35 @@ func TestStartAndStopCredentialsProvider(t *testing.T) {
ConsumerName: "test-consumer-1",
// This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
// Could be "begin", "end", "specific time format in time stamp", it's log receiving time.
CursorPosition: BEGIN_CURSOR,
CursorPosition: BEGIN_CURSOR,
AutoCommitDisabled: false,
}

worker := InitConsumerWorker(option, process)
worker := InitConsumerWorkerWithCheckpointTracker(option, process)

worker.Start()
time.Sleep(time.Second * 20)
worker.StopAndWait()
}

func TestConsumerQueryNoData(t *testing.T) {
option := LogHubConfig{
Endpoint: os.Getenv("LOG_TEST_ENDPOINT"),
CredentialsProvider: sls.NewStaticCredentialsProvider(
os.Getenv("LOG_TEST_ACCESS_KEY_ID"),
os.Getenv("LOG_TEST_ACCESS_KEY_SECRET"), ""),
Project: os.Getenv("LOG_TEST_PROJECT"),
Logstore: os.Getenv("LOG_TEST_LOGSTORE"),
ConsumerGroupName: "test-consumer",
ConsumerName: "test-consumer-1",
CursorPosition: END_CURSOR,
Query: "* | where \"request_method\" = 'GET'",
}

worker := InitConsumerWorkerWithCheckpointTracker(option, process)

worker.Start()
time.Sleep(time.Second * 2000)
worker.StopAndWait()

}
132 changes: 57 additions & 75 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) ([]byte, string, error) {
// GetLogsBytes gets logs binary data from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, pullLogMeta *PullLogMeta, err error) {
func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) ([]byte, *PullLogMeta, error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Accept": "application/x-protobuf",
Expand All @@ -514,102 +514,84 @@ func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, pullL

r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return
return nil, nil, err
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
return nil, nil, err
}
pullLogMeta = &PullLogMeta{}
pullLogMeta.Netflow = len(buf)
if r.StatusCode != http.StatusOK {
errMsg := &Error{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get cursor")
dump, _ := httputil.DumpResponse(r, true)
if IsDebugLevelMatched(1) {
level.Error(Logger).Log("msg", string(dump))
}
return
return nil, nil, fmt.Errorf("failed parse errorCode json: %w", err)
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
return nil, nil, fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
}
v, ok := r.Header["X-Log-Compresstype"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-log-compresstype' header")
return
netflow := len(buf)

nextCursor, err := parseHeaderString(r.Header, "X-Log-Cursor")
if err != nil {
return nil, nil, err
}
var compressType = Compress_None
if v[0] == "lz4" {
compressType = Compress_LZ4
} else if v[0] == "zstd" {
compressType = Compress_ZSTD
} else {
err = fmt.Errorf("unexpected compress type:%v", compressType)
return
rawSize, err := ParseHeaderInt(r, "X-Log-Bodyrawsize")
if err != nil {
return nil, nil, err
}

v, ok = r.Header["X-Log-Cursor"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-log-cursor' header")
return
count, err := ParseHeaderInt(r, "X-Log-Count")
if err != nil {
return nil, nil, err
}
pullLogMeta.NextCursor = v[0]
pullLogMeta.RawSize, err = ParseHeaderInt(r, "X-Log-Bodyrawsize")
pullMeta := &PullLogMeta{
RawSize: rawSize,
NextCursor: nextCursor,
Netflow: netflow,
Count: count,
}
// If query is not nil, extract more headers
if plr.Query != "" {
pullMeta.RawSizeBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatasize")
pullMeta.DataCountBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatacount")
pullMeta.Lines, _ = ParseHeaderInt(r, "X-Log-Resultlines")
pullMeta.LinesBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatalines")
pullMeta.FailedLines, _ = ParseHeaderInt(r, "X-Log-Failedlines")
}
if rawSize == 0 {
return make([]byte, 0), pullMeta, nil
}

// decompress data
out := make([]byte, rawSize)
compressType, err := parseHeaderString(r.Header, "X-Log-Compresstype")
if err != nil {
return
return nil, nil, err
}
if pullLogMeta.RawSize > 0 {
out = make([]byte, pullLogMeta.RawSize)
switch compressType {
case Compress_LZ4:
uncompressedSize := 0
if uncompressedSize, err = lz4.UncompressBlock(buf, out); err != nil {
return
}
if uncompressedSize != pullLogMeta.RawSize {
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", uncompressedSize, pullLogMeta.RawSize)
}
case Compress_ZSTD:
out, err = slsZstdCompressor.Decompress(buf, out)
if err != nil {
return nil, nil, err
}
if len(out) != pullLogMeta.RawSize {
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", len(out), pullLogMeta.RawSize)
}
default:
return nil, nil, fmt.Errorf("unexpected compress type: %d", compressType)
switch compressType {
case "lz4":
uncompressedSize := 0
if uncompressedSize, err = lz4.UncompressBlock(buf, out); err != nil {
return nil, nil, err
}
if uncompressedSize != rawSize {
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", uncompressedSize, rawSize)
}
case "zstd":
out, err = slsZstdCompressor.Decompress(buf, out)
if err != nil {
return nil, nil, err
}
if len(out) != rawSize {
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", len(out), rawSize)
}
default:
return nil, nil, fmt.Errorf("unexpected compress type: %s", compressType)
}
// todo: add query meta
// If query is not nil, extract more headers
// if plr.Query != "" {
// pullLogMeta.RawSizeBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatasize")
// if err != nil {
// return
// }
// pullLogMeta.DataCountBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatacount")
// if err != nil {
// return
// }
// pullLogMeta.Lines, err = ParseHeaderInt(r, "X-Log-Resultlines")
// if err != nil {
// return
// }
// pullLogMeta.LinesBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatalines")
// if err != nil {
// return
// }
// pullLogMeta.FailedLines, err = ParseHeaderInt(r, "X-Log-Failedlines")
// if err != nil {
// return
// }
// }
return
return out, pullMeta, nil
}

// LogsBytesDecode decodes logs binary data returned by GetLogsBytes API
Expand Down
19 changes: 10 additions & 9 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,16 @@ func (plr *PullLogRequest) ToURLParams() url.Values {
}

type PullLogMeta struct {
NextCursor string
Netflow int
RawSize int
RawDataCountBeforeQuery int
RawSizeBeforeQuery int
Lines int
LinesBeforeQuery int
FailedLines int
DataCountBeforeQuery int
NextCursor string
Netflow int
RawSize int
Count int
// these fields are presents only when query is set
RawSizeBeforeQuery int // processed raw size before query
Lines int // result lines after query
LinesBeforeQuery int // processed lines before query
FailedLines int // failed lines during query
DataCountBeforeQuery int // processed logGroup count before query
}

// GetHistogramsResponse defines response from GetHistograms call
Expand Down
8 changes: 8 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ func ParseHeaderInt(r *http.Response, headerName string) (int, error) {
}
return -1, fmt.Errorf("can't find '%s' header", strings.ToLower(headerName))
}

func parseHeaderString(header http.Header, headerName string) (string, error) {
v, ok := header[headerName]
if !ok || len(v) == 0 {
return "", fmt.Errorf("can't find '%s' header", strings.ToLower(headerName))
}
return v[0], nil
}
Loading