Skip to content

Commit

Permalink
Introduce multitenant reserved label
Browse files Browse the repository at this point in the history
  • Loading branch information
vlvasilev committed Sep 27, 2021
1 parent 7e44425 commit 02047d3
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ It also adds additional configurations that aim to improve plugin's performance
| SendLogsToDefaultClientWhenClusterIsInDeletionState | Send log to the default URL when it is in deletion state | `true`
| SendLogsToDefaultClientWhenClusterIsInRestoreState | Send log to the default URL when it is in restoration state | `true`
| SendLogsToDefaultClientWhenClusterIsInMigrationState | Send log to the default URL when it is in migration state | `true`
| `__gardener_multitenant_id__` | A reserved label for multiple tenants separated by semicolon(e.g. "operator;user") | empty string

### Labels

Expand Down
10 changes: 9 additions & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ func NewClient(cfg *config.Config, logger log.Logger) (types.LokiClient, error)
return newSortedClient(c, cfg.ClientConfig.NumberOfBatchIDs, logger)
}
} else {
ncf = NewPromtailClient
ncf = func(cfg client.Config, logger log.Logger) (types.LokiClient, error) {
c, err := NewPromtailClient(cfg, logger)
if err != nil {
return nil, err
}
return NewMultiTenantClientWrapper(c, false), nil
}
}

if cfg.ClientConfig.BufferConfig.Buffer {
Expand Down Expand Up @@ -105,9 +111,11 @@ func NewRemoveTenantIdClient(clientToWrap types.LokiClient) types.LokiClient {
}

func (c *removeTenantIdClient) Handle(ls model.LabelSet, t time.Time, s string) error {
//If `__tenant_id__` exist the log is dropped because we assume it was re-emitted
if _, ok := ls[client.ReservedLabelTenantID]; ok {
return nil
}
delete(ls, MultiTenantClientLabel)
return c.lokiclient.Handle(ls, t, s)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/client/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *FakeLokiClient) Handle(labels model.LabelSet, timestamp time.Time, line
return fmt.Errorf("client has been stopped")
}
c.Entries = append(c.Entries, Entry{
Labels: labels,
Labels: labels.Clone(),
Entry: logproto.Entry{Timestamp: timestamp, Line: line},
})
return nil
Expand Down
108 changes: 108 additions & 0 deletions pkg/client/multi_tenant_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"strings"
"time"

giterrors "github.com/pkg/errors"

"github.com/gardener/logging/pkg/types"

"github.com/grafana/loki/pkg/promtail/client"
"github.com/prometheus/common/model"
)

type multiTenantClient struct {
lokiclient types.LokiClient
copyLabelSet bool
}

const (
MultiTenantClientLabel = "__gardener_multitenant_id__"
MultiTenantClientsSeparator = ";"
)

// NewMultiTenantClientWrapper returns Loki client which support more than one tenant id specified
// under `_gardener_multitenamt_id__` labale separated with semicolumn.
func NewMultiTenantClientWrapper(clientToWrap types.LokiClient, copyLabelSet bool) types.LokiClient {
return &multiTenantClient{
lokiclient: clientToWrap,
copyLabelSet: copyLabelSet,
}
}

func (c *multiTenantClient) Handle(ls model.LabelSet, t time.Time, s string) error {
ids, ok := ls[MultiTenantClientLabel]
if !ok {
return c.lokiclient.Handle(ls, t, s)
}

tenants := getTenants(string(ids))
delete(ls, MultiTenantClientLabel)
if len(tenants) < 1 {
return c.lokiclient.Handle(ls, t, s)
}

var errs []error
for _, tenant := range tenants {
tmpLs := ls
if c.copyLabelSet {
tmpLs = ls.Clone()
}
tmpLs[client.ReservedLabelTenantID] = model.LabelValue(tenant)
err := c.lokiclient.Handle(tmpLs, t, s)
if err != nil {
errs = append(errs, err)
}
}

if len(errs) > 0 {
var combineErr error
for _, er := range errs {
combineErr = giterrors.Wrap(combineErr, er.Error())
}
return combineErr
}

return nil
}

func getTenants(rawIdsStr string) []string {
rawIdsStr = strings.TrimSpace(rawIdsStr)
multiTenantIDs := strings.Split(rawIdsStr, MultiTenantClientsSeparator)
numberOfEmptyTenants := 0
for idx, tenant := range multiTenantIDs {
tenant = strings.TrimSpace(tenant)
if tenant == "" {
numberOfEmptyTenants++
continue
}
multiTenantIDs[idx-numberOfEmptyTenants] = tenant
}

return multiTenantIDs[:len(multiTenantIDs)-numberOfEmptyTenants]
}

// Stop the client.
func (c *multiTenantClient) Stop() {
c.lokiclient.Stop()
}

// StopWait stops the client waiting all saved logs to be sent.
func (c *multiTenantClient) StopWait() {
c.lokiclient.StopWait()
}
145 changes: 145 additions & 0 deletions pkg/client/multi_tenant_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"time"

"github.com/gardener/logging/pkg/types"
"github.com/grafana/loki/pkg/promtail/client"
. "github.com/onsi/ginkgo"
ginkotable "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
"github.com/prometheus/common/model"
)

var _ = Describe("Multi Tenant Client", func() {
var (
fakeClient *FakeLokiClient
mtc types.LokiClient
)

BeforeEach(func() {
fakeClient = &FakeLokiClient{}
mtc = NewMultiTenantClientWrapper(fakeClient, false)
})

Describe("#NewMultiTenantClientWrapper", func() {

It("Should return wrapper client which does not copy the label set before sending it to the wrapped client", func() {
mtc := NewMultiTenantClientWrapper(fakeClient, false)
Expect(mtc).NotTo(BeNil())
Expect(mtc.(*multiTenantClient).lokiclient).To(Equal(fakeClient))
Expect(mtc.(*multiTenantClient).copyLabelSet).To(BeFalse())
})

It("Should return wrapper client which copies the label set before sending it to the wrapped client", func() {
mtc := NewMultiTenantClientWrapper(fakeClient, true)
Expect(mtc).NotTo(BeNil())
Expect(mtc.(*multiTenantClient).lokiclient).To(Equal(fakeClient))
Expect(mtc.(*multiTenantClient).copyLabelSet).To(BeTrue())
})
})

type handleArgs struct {
ls model.LabelSet
t time.Time
s string
wantedTenants []model.LabelValue
}
ginkotable.DescribeTable("#Handle", func(args handleArgs) {
err := mtc.Handle(args.ls, args.t, args.s)
Expect(err).ToNot(HaveOccurred())
Expect(len(fakeClient.Entries) > 0).To(BeTrue())
var gotTenants []model.LabelValue
for _, entry := range fakeClient.Entries {
// __gardener_multitenant_id__ should be removed after Handle() call
_, ok := entry.Labels[MultiTenantClientLabel]
Expect(ok).To(BeFalse())
// Each tenant in the MultiTenantClientLabel should be transferred to __tenant_id__
if tenant, ok := entry.Labels[client.ReservedLabelTenantID]; ok {
gotTenants = append(gotTenants, tenant)
}
}
// Check if all multitenants are parsed and used for forwarding to the wrapped Handle func
Expect(len(args.wantedTenants)).To(Equal(len(gotTenants)))
for _, tenant := range args.wantedTenants {
Expect(tenant).To(BeElementOf(gotTenants))
}
// Sanity check if the timestamp and the messages re the same
for _, entry := range fakeClient.Entries {
Expect(entry.Timestamp).To(Equal(args.t))
Expect(entry.Line).To(Equal(args.s))
}
},
ginkotable.Entry("Handle record without reserved labels", handleArgs{
ls: model.LabelSet{"hostname": "test"},
t: time.Now(),
s: "test1",
wantedTenants: []model.LabelValue{},
}),
ginkotable.Entry("Handle record without __gardener_multitenant_id__ reserved label", handleArgs{
ls: model.LabelSet{"hostname": "test", "__tenant_id__": "user"},
t: time.Now(),
s: "test1",
wantedTenants: []model.LabelValue{"user"},
}),
ginkotable.Entry("Handle record with __gardener_multitenant_id__ reserved label. Separator \"; \"", handleArgs{
ls: model.LabelSet{"hostname": "test", "__gardener_multitenant_id__": "operator; user"},
t: time.Now(),
s: "test1",
wantedTenants: []model.LabelValue{"operator", "user"},
}),
ginkotable.Entry("Handle record with __gardener_multitenant_id__ reserved label. Separator \";\"", handleArgs{
ls: model.LabelSet{"hostname": "test", "__gardener_multitenant_id__": "operator;user"},
t: time.Now(),
s: "test1",
wantedTenants: []model.LabelValue{"operator", "user"},
}),
ginkotable.Entry("Handle record with __gardener_multitenant_id__ reserved label. Separator \" ; \" and leading and trailing spaces", handleArgs{
ls: model.LabelSet{"hostname": "test", "__gardener_multitenant_id__": " operator ; user "},
t: time.Now(),
s: "test1",
wantedTenants: []model.LabelValue{"operator", "user"},
}),
ginkotable.Entry("Handle record with __gardener_multitenant_id__ and __tenant_id__ reserved labels.", handleArgs{
ls: model.LabelSet{"hostname": "test", "__tenant_id__": "pinokio", "__gardener_multitenant_id__": "operator; user"},
t: time.Now(),
s: "test1",
wantedTenants: []model.LabelValue{"operator", "user"},
}),
)

Describe("#Stop", func() {
It("should stop", func() {
Expect(mtc.(*multiTenantClient).lokiclient.(*FakeLokiClient).IsGracefullyStopped).To(BeFalse())
Expect(mtc.(*multiTenantClient).lokiclient.(*FakeLokiClient).IsStopped).To(BeFalse())
mtc.Stop()
Expect(mtc.(*multiTenantClient).lokiclient.(*FakeLokiClient).IsGracefullyStopped).To(BeFalse())
Expect(mtc.(*multiTenantClient).lokiclient.(*FakeLokiClient).IsStopped).To(BeTrue())
})
})

Describe("#StopWait", func() {
It("should stop", func() {
Expect(mtc.(*multiTenantClient).lokiclient.(*FakeLokiClient).IsGracefullyStopped).To(BeFalse())
Expect(mtc.(*multiTenantClient).lokiclient.(*FakeLokiClient).IsStopped).To(BeFalse())
mtc.StopWait()
Expect(mtc.(*multiTenantClient).lokiclient.(*FakeLokiClient).IsGracefullyStopped).To(BeTrue())
Expect(mtc.(*multiTenantClient).lokiclient.(*FakeLokiClient).IsStopped).To(BeFalse())
})
})

})
1 change: 1 addition & 0 deletions pkg/client/sorted_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func newSortedClient(cfg client.Config, numberOfBatchIds uint64, logger log.Logg
if err != nil {
return nil, err
}
lokiclient = NewMultiTenantClientWrapper(lokiclient, false)

c := &sortedClient{
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
Expand Down

0 comments on commit 02047d3

Please sign in to comment.