-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Use WAL Manager #13491
feat: Use WAL Manager #13491
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ import ( | |
"github.com/grafana/loki/v3/pkg/distributor/writefailures" | ||
"github.com/grafana/loki/v3/pkg/loghttp/push" | ||
"github.com/grafana/loki/v3/pkg/logproto" | ||
"github.com/grafana/loki/v3/pkg/storage/wal" | ||
"github.com/grafana/loki/v3/pkg/util/flagext" | ||
"github.com/grafana/loki/v3/pkg/validation" | ||
) | ||
|
@@ -130,21 +131,24 @@ func (s *stream) consumeChunk(_ context.Context, _ *logproto.Chunk) error { | |
|
||
func (s *stream) Push( | ||
ctx context.Context, | ||
wal *wal.Manager, | ||
entries []logproto.Entry, | ||
// Whether nor not to ingest all at once or not. It is a per-tenant configuration. | ||
rateLimitWholeStream bool, | ||
|
||
usageTracker push.UsageTracker, | ||
flushCtx *flushCtx, | ||
) (int, error) { | ||
) (int, *wal.AppendResult, error) { | ||
toStore, invalid := s.validateEntries(ctx, entries, rateLimitWholeStream, usageTracker) | ||
if rateLimitWholeStream && hasRateLimitErr(invalid) { | ||
return 0, errorForFailedEntries(s, invalid, len(entries)) | ||
return 0, nil, errorForFailedEntries(s, invalid, len(entries)) | ||
} | ||
|
||
bytesAdded := s.storeEntries(ctx, toStore, usageTracker, flushCtx) | ||
bytesAdded, res, err := s.storeEntries(ctx, wal, toStore, usageTracker) | ||
if err != nil { | ||
return 0, nil, err | ||
} | ||
|
||
return bytesAdded, errorForFailedEntries(s, invalid, len(entries)) | ||
return bytesAdded, res, errorForFailedEntries(s, invalid, len(entries)) | ||
} | ||
|
||
func errorForFailedEntries(s *stream, failedEntriesWithError []entryWithError, totalEntries int) error { | ||
|
@@ -195,7 +199,7 @@ func hasRateLimitErr(errs []entryWithError) bool { | |
return ok | ||
} | ||
|
||
func (s *stream) storeEntries(ctx context.Context, entries []*logproto.Entry, usageTracker push.UsageTracker, flushCtx *flushCtx) int { | ||
func (s *stream) storeEntries(ctx context.Context, w *wal.Manager, entries []*logproto.Entry, usageTracker push.UsageTracker) (int, *wal.AppendResult, error) { | ||
if sp := opentracing.SpanFromContext(ctx); sp != nil { | ||
sp.LogKV("event", "stream started to store entries", "labels", s.labelsString) | ||
defer sp.LogKV("event", "stream finished to store entries") | ||
|
@@ -213,9 +217,18 @@ func (s *stream) storeEntries(ctx context.Context, entries []*logproto.Entry, us | |
|
||
bytesAdded += len(entries[i].Line) | ||
} | ||
flushCtx.segmentWriter.Append(s.tenant, s.labels.String(), s.labels, entries) | ||
|
||
res, err := w.Append(wal.AppendRequest{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be more performant to use a pointer for Append here - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A lot of the time There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Benchmark for using the stack:
type AppendRequest struct {
TenantID string
Labels map[string]string
LabelsStr string
}
func doRequest(r AppendRequest) {
_ = r
}
func BenchmarkDoRequest(t *testing.B) {
labels := map[string]string{"foo": "bar"}
labelsStr := "{foo=\"bar\"}"
for i := 0; i < t.N; i++ {
doRequest(AppendRequest{
TenantID: "1",
Labels: labels,
LabelsStr: labelsStr,
})
}
} Benchmark for using the heap:
type AppendRequest struct {
TenantID string
Labels map[string]string
LabelsStr string
}
func doRequest(r *AppendRequest) {
_ = r
}
func BenchmarkDoRequest(t *testing.B) {
labels := map[string]string{"foo": "bar"}
labelsStr := "{foo=\"bar\"}"
for i := 0; i < t.N; i++ {
doRequest(&AppendRequest{
TenantID: "1",
Labels: labels,
LabelsStr: labelsStr,
})
}
} |
||
TenantID: s.tenant, | ||
Labels: s.labels, | ||
LabelsStr: s.labels.String(), | ||
Entries: entries, | ||
}) | ||
if err != nil { | ||
return 0, nil, err | ||
} | ||
s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, 0, 0, usageTracker) | ||
return bytesAdded | ||
return bytesAdded, res, nil | ||
} | ||
|
||
func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]*logproto.Entry, []entryWithError) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could remove the err return from NextPending if we don't look at it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so too, I'll do it for both
NextPending
andPut
in another PR. 👍