From 5150c54af24fc95b771edbcb43925e9d8e8f1f6e Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Tue, 4 Feb 2020 19:26:05 +0100 Subject: [PATCH 01/19] [SIEM] New o365 input for Office 365 audit logs This input uses Microsoft's Office 365 Management API to fetch audit events. Relates #16196 --- x-pack/filebeat/include/list.go | 1 + x-pack/filebeat/input/o365audit/auth/auth.go | 16 + x-pack/filebeat/input/o365audit/auth/cert.go | 105 +++++ x-pack/filebeat/input/o365audit/config.go | 161 +++++++ .../filebeat/input/o365audit/contentblob.go | 112 +++++ .../input/o365audit/contentblob_test.go | 149 +++++++ x-pack/filebeat/input/o365audit/dates.go | 106 +++++ x-pack/filebeat/input/o365audit/input.go | 294 +++++++++++++ x-pack/filebeat/input/o365audit/listblobs.go | 270 ++++++++++++ .../input/o365audit/listblobs_test.go | 413 ++++++++++++++++++ x-pack/filebeat/input/o365audit/pagination.go | 65 +++ x-pack/filebeat/input/o365audit/poll/poll.go | 259 +++++++++++ x-pack/filebeat/input/o365audit/schema.go | 63 +++ x-pack/filebeat/input/o365audit/state.go | 158 +++++++ x-pack/filebeat/input/o365audit/state_test.go | 105 +++++ x-pack/filebeat/input/o365audit/subscribe.go | 81 ++++ 16 files changed, 2358 insertions(+) create mode 100644 x-pack/filebeat/input/o365audit/auth/auth.go create mode 100644 x-pack/filebeat/input/o365audit/auth/cert.go create mode 100644 x-pack/filebeat/input/o365audit/config.go create mode 100644 x-pack/filebeat/input/o365audit/contentblob.go create mode 100644 x-pack/filebeat/input/o365audit/contentblob_test.go create mode 100644 x-pack/filebeat/input/o365audit/dates.go create mode 100644 x-pack/filebeat/input/o365audit/input.go create mode 100644 x-pack/filebeat/input/o365audit/listblobs.go create mode 100644 x-pack/filebeat/input/o365audit/listblobs_test.go create mode 100644 x-pack/filebeat/input/o365audit/pagination.go create mode 100644 x-pack/filebeat/input/o365audit/poll/poll.go create mode 100644 x-pack/filebeat/input/o365audit/schema.go create mode 100644 x-pack/filebeat/input/o365audit/state.go create mode 100644 x-pack/filebeat/input/o365audit/state_test.go create mode 100644 x-pack/filebeat/input/o365audit/subscribe.go diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index eb5861b4f83..7970538c0c4 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -13,6 +13,7 @@ import ( _ "github.com/elastic/beats/v7/x-pack/filebeat/input/googlepubsub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" + _ "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/s3" _ "github.com/elastic/beats/v7/x-pack/filebeat/module/activemq" _ "github.com/elastic/beats/v7/x-pack/filebeat/module/aws" diff --git a/x-pack/filebeat/input/o365audit/auth/auth.go b/x-pack/filebeat/input/o365audit/auth/auth.go new file mode 100644 index 00000000000..99d8a5c66d9 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/auth/auth.go @@ -0,0 +1,16 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package auth + +// TokenProvider is the interface that wraps an authentication mechanism and +// allows to obtain tokens. +type TokenProvider interface { + // Token returns a valid OAuth token, or an error. + Token() (string, error) + + // Renew must be called to re-authenticate against the oauth2 endpoint if + // when the API returns an Authentication error. + Renew() error +} diff --git a/x-pack/filebeat/input/o365audit/auth/cert.go b/x-pack/filebeat/input/o365audit/auth/cert.go new file mode 100644 index 00000000000..053b5f85784 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/auth/cert.go @@ -0,0 +1,105 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package auth + +import ( + "crypto/rsa" + "crypto/x509" + "fmt" + "sync" + + "github.com/Azure/go-autorest/autorest/adal" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common/transport/tlscommon" +) + +type sptProviderFromCert struct { + sync.Mutex + certs tlscommon.CertificateConfig + applicationID string + endpoint string + resource string + tenantID string + spt *adal.ServicePrincipalToken +} + +// NewProviderFromCertificate returns a TokenProvider that uses certificate-based +// authentication. +func NewProviderFromCertificate( + endpoint, resource, applicationID, tenantID string, + conf tlscommon.CertificateConfig) (sptp TokenProvider, err error) { + provider := &sptProviderFromCert{ + certs: conf, + applicationID: applicationID, + resource: resource, + endpoint: endpoint, + tenantID: tenantID, + } + if provider.spt, err = provider.getServicePrincipalToken(tenantID); err != nil { + return nil, err + } + provider.spt.SetAutoRefresh(true) + return provider, nil +} + +// Token returns an oauth token that can be used for bearer authorization. +func (provider *sptProviderFromCert) Token() (string, error) { + provider.Mutex.Lock() + defer provider.Mutex.Unlock() + if err := provider.spt.EnsureFresh(); err != nil { + return "", errors.Wrap(err, "refreshing spt token") + } + token := provider.spt.Token() + return token.OAuthToken(), nil +} + +// Renew re-authenticates with the oauth2 endpoint to get a new Service Principal Token. +func (provider *sptProviderFromCert) Renew() error { + provider.Mutex.Lock() + defer provider.Mutex.Unlock() + return provider.spt.Refresh() +} + +func (provider *sptProviderFromCert) getServicePrincipalToken(tenantID string) (*adal.ServicePrincipalToken, error) { + cert, privKey, err := loadConfigCerts(provider.certs) + if err != nil { + return nil, errors.Wrap(err, "failed loading certificates") + } + oauth, err := adal.NewOAuthConfig(provider.endpoint, tenantID) + if err != nil { + return nil, errors.Wrap(err, "error generating OAuthConfig") + } + + return adal.NewServicePrincipalTokenFromCertificate( + *oauth, + provider.applicationID, + cert, + privKey, + provider.resource, + ) +} + +func loadConfigCerts(cfg tlscommon.CertificateConfig) (cert *x509.Certificate, key *rsa.PrivateKey, err error) { + tlsCert, err := tlscommon.LoadCertificate(&cfg) + if err != nil { + return nil, nil, errors.Wrapf(err, "error loading X509 certificate from '%s'", cfg.Certificate) + } + if len(tlsCert.Certificate) < 1 { + return nil, nil, fmt.Errorf("no certificates loaded from '%s'", cfg.Certificate) + } + cert, err = x509.ParseCertificate(tlsCert.Certificate[0]) + if err != nil { + return nil, nil, errors.Wrapf(err, "error parsing X509 certificate from '%s'", cfg.Certificate) + } + if tlsCert.PrivateKey == nil { + return nil, nil, fmt.Errorf("failed loading private key from '%s'", cfg.Key) + } + key, ok := tlsCert.PrivateKey.(*rsa.PrivateKey) + if !ok { + return nil, nil, fmt.Errorf("private key at '%s' is not an RSA private key", cfg.Key) + } + return cert, key, nil +} diff --git a/x-pack/filebeat/input/o365audit/config.go b/x-pack/filebeat/input/o365audit/config.go new file mode 100644 index 00000000000..52463c57042 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/config.go @@ -0,0 +1,161 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common/transport/tlscommon" +) + +// Config for the O365 audit API input. +type Config struct { + // CertificateConfig contains the authentication credentials (certificate). + CertificateConfig tlscommon.CertificateConfig `config:",inline"` + + // ApplicationID (aka. client ID) of the Azure application. + ApplicationID string `config:"application_id" validate:"required"` + + // TenantID (aka. Directory ID) is a list of tenants for which to fetch + // the audit logs. This can be a string or a list of strings. + TenantID interface{} `config:"tenant_id,replace" validate:"required"` + + // Content-Type is a list of content-types to fetch. + // This can be a string or a list of strings. + ContentType interface{} `config:"content_type,replace"` + + // API contains settings to adapt to changes on the API. + API APIConfig `config:"api"` + + tenants []string + contentTypes []string +} + +// APIConfig contains advanced settings that are only supposed to be changed +// to diagnose errors or to adapt to changes in the service. +type APIConfig struct { + + // AuthenticationEndpoint to authorize the Azure app. + AuthenticationEndpoint string `config:"authentication_endpoint"` + + // Resource to request authorization for. + Resource string `config:"resource"` + + // MaxRetention determines how far back the input will poll for events. + MaxRetention time.Duration `config:"max_retention" validate:"positive"` + + // AdjustClock controls whether the input will adapt its internal clock + // to the server's clock to compensate for clock differences when the API + // returns an error indicating that the times requests are out of bounds. + AdjustClock bool `config:"adjust_clock"` + + // AdjustClockMinDifference sets the minimum difference between clocks so + // that an adjust is considered. + AdjustClockMinDifference time.Duration `config:"adjust_clock_min_difference" validate:"positive"` + + // AdjustClockWarn controls whether a warning should be printed to the logs + // when a clock difference between the local clock and the server's clock + // is detected, as it can lead to event loss. + AdjustClockWarn bool `config:"adjust_clock_warn"` + + // ErrorRetryInterval sets the interval between retries in the case of + // errors performing a request. + ErrorRetryInterval time.Duration `config:"error_retry_interval" validate:"positive"` + + // LiveWindowSize defines the window of time [now-window, now) that will be + // used to poll for new events. If events are created outside of this window, + // they will be lost. + LiveWindowSize time.Duration `config:"live_window_size" validate:"positive"` + + // LiveWindowPollInterval determines how often the input should poll for new + // data once it has finished scanning for past events and reached the live + // window. + LiveWindowPollInterval time.Duration `config:"live_window_poll_interval" validate:"positive"` + + // MaxRequestsPerMinute sets the limit on the number of API requests that + // can be sent, per tenant. + MaxRequestsPerMinute int `config:"max_requests_per_minute" validate:"positive"` +} + +func defaultConfig() Config { + return Config{ + + // All documented content types. + ContentType: []string{ + "Audit.AzureActiveDirectory", + "Audit.Exchange", + "Audit.SharePoint", + "Audit.General", + "DLP.All", + }, + + API: APIConfig{ + // This is used to bootstrap the input for the first time + // as the API doesn't provide a way to query for the oldest record. + // Currently the API will err on queries older than this, use with care. + MaxRetention: 7 * timeDay, + + AuthenticationEndpoint: "https://login.microsoftonline.com/", + + Resource: "https://manage.office.com", + + AdjustClock: true, + + AdjustClockMinDifference: 5 * time.Minute, + + AdjustClockWarn: true, + + ErrorRetryInterval: 5 * time.Minute, + + LiveWindowPollInterval: time.Minute, + + LiveWindowSize: timeDay, + + // According to the docs this is the max requests that are allowed + // per tenant per minute. + MaxRequestsPerMinute: 2000, + }, + } +} + +// Validate checks that the configuration is correct. +func (c *Config) Validate() (err error) { + if err = c.CertificateConfig.Validate(); err != nil { + return err + } + if c.tenants, err = asStringList(c.TenantID); err != nil { + return errors.Wrap(err, "error validating tenant_id") + } + if c.contentTypes, err = asStringList(c.ContentType); err != nil { + return errors.Wrap(err, "error validating content_type") + } + return nil +} + +// A helper to allow defining a field either as a string or a list of strings. +func asStringList(value interface{}) (list []string, err error) { + switch v := value.(type) { + case string: + list = []string{v} + case []string: + list = v + case []interface{}: + list = make([]string, len(v)) + for idx, ival := range v { + str, ok := ival.(string) + if !ok { + return nil, fmt.Errorf("string value required. Found %v (type %T) at position %d", + ival, ival, idx+1) + } + list[idx] = str + } + default: + return nil, fmt.Errorf("array of strings required. Found %v (type %T)", value, value) + } + return list, nil +} diff --git a/x-pack/filebeat/input/o365audit/contentblob.go b/x-pack/filebeat/input/o365audit/contentblob.go new file mode 100644 index 00000000000..c032fd35acc --- /dev/null +++ b/x-pack/filebeat/input/o365audit/contentblob.go @@ -0,0 +1,112 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "net/http" + "time" + + "github.com/Azure/go-autorest/autorest" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" +) + +// contentBlob is a poll.Transaction that processes "content blobs": +// aggregations of audit event objects returned by the API. +type contentBlob struct { + env apiEnvironment + id, url string + // cursor is used to ACK the resulting events. + cursor cursor + // skipLines is used when resuming from a saved cursor so that already + // acknowledged objects are not duplicated. + skipLines int +} + +// String return a printable representation of this transaction. +func (c contentBlob) String() string { + return fmt.Sprintf("content blob url:%s id:%s", c.url, c.id) +} + +// RequestDecorators returns the decorators used to perform a request. +func (c contentBlob) RequestDecorators() []autorest.PrepareDecorator { + return []autorest.PrepareDecorator{ + autorest.WithBaseURL(c.url), + } +} + +// Delay returns the delay to perform this request. +func (c contentBlob) Delay() time.Duration { + return 0 +} + +// OnResponse parses the response for a content blob. +func (c contentBlob) OnResponse(response *http.Response) (actions []poll.Action) { + if response.StatusCode != 200 { + // TODO: + return append(actions, poll.Terminate( + fmt.Errorf("operation %s returned HTTP code %d %s", + c, response.StatusCode, response.Status))) + } + var js []common.MapStr + if err := readJSONBody(response, &js); err != nil { + return append(actions, poll.Terminate(errors.Wrap(err, "reading body failed"))) + } + for idx, entry := range js { + id, _ := getString(entry, "Id") + ts, _ := getString(entry, "CreationTime") + c.env.Logger.Debugf(" > event %d: created:%s id:%s for %s", idx+1, ts, id, c.cursor) + } + if len(js) > c.skipLines { + for _, entry := range js[:c.skipLines] { + id, _ := getString(entry, "Id") + c.env.Logger.Debugf("Skipping event %s [%s] for %s", c.cursor, id, c.id) + } + for _, entry := range js[c.skipLines:] { + c.cursor = c.cursor.ForNextLine() + c.env.Logger.Debugf("Reporting event %s for %s", c.cursor, c.id) + actions = append(actions, c.env.Report(entry, c.cursor)) + } + c.skipLines = 0 + } else { + for _, entry := range js { + id, _ := getString(entry, "Id") + c.env.Logger.Debugf("Skipping event all %s [%s] for %s", c.cursor, id, c.id) + } + + c.skipLines -= len(js) + } + // The API only documents the use of NextPageUri header for list requests + // but one can't be too careful. + if url, found := getNextPage(response); found { + return append(actions, poll.Fetch(newPager(url, c))) + } + + return actions +} + +// ContentBlob creates a new contentBlob. +func ContentBlob(url string, cursor cursor, env apiEnvironment) contentBlob { + return contentBlob{ + url: url, + env: env, + cursor: cursor, + } +} + +// WithID configures a content blob with the given origin ID. +func (c contentBlob) WithID(id string) contentBlob { + c.id = id + return c +} + +// WithSkipLines configures a content blob with the number of objects to skip. +func (c contentBlob) WithSkipLines(nlines int) contentBlob { + c.skipLines = nlines + return c +} diff --git a/x-pack/filebeat/input/o365audit/contentblob_test.go b/x-pack/filebeat/input/o365audit/contentblob_test.go new file mode 100644 index 00000000000..4026f6ff443 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/contentblob_test.go @@ -0,0 +1,149 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" +) + +type contentStore struct { + events []beat.Event + stopped bool +} + +func (s *contentStore) onEvent(b beat.Event) bool { + s.events = append(s.events, b) + return !s.stopped +} + +func (f *fakePoll) BlobContent(t testing.TB, b poll.Transaction, data []common.MapStr, nextUrl string) poll.Transaction { + urls, next := f.deliverResult(t, b, data, nextUrl) + if !assert.Empty(t, urls) { + t.Fatal("blob returned urls to fetch") + } + return next +} + +func makeEvent(ts time.Time, id string) common.MapStr { + return common.MapStr{ + "CreationTime": ts.Format(apiDateFormat), + "Id": id, + } +} + +func validateBlobs(t testing.TB, store contentStore, expected []string, c cursor) cursor { + assert.Len(t, store.events, len(expected)) + for idx := range expected { + id, err := getString(store.events[idx].Fields, fieldsPrefix+".Id") + if !assert.NoError(t, err) { + t.Fatal(err) + } + assert.Equal(t, expected[idx], id) + } + prev := c + baseLine := c.line + for idx, id := range expected { + ev := store.events[idx] + cursor, ok := ev.Private.(cursor) + if !assert.True(t, ok) { + t.Fatal("no cursor for event id", id) + } + assert.Equal(t, idx+1+baseLine, cursor.line) + assert.True(t, prev.Before(cursor)) + prev = cursor + } + return prev +} + +func TestContentBlob(t *testing.T) { + var f fakePoll + var store contentStore + ctx := apiEnvironment{ + Logger: logp.L(), + Callback: store.onEvent, + } + baseCursor := newCursor(stream{"myTenant", "contentype"}, time.Now()) + query := ContentBlob("http://test.localhost/", baseCursor, ctx) + data := []common.MapStr{ + makeEvent(now.Add(-time.Hour), "e1"), + makeEvent(now.Add(-2*time.Hour), "e2"), + makeEvent(now.Add(-30*time.Minute), "e3"), + makeEvent(now.Add(-10*time.Second), "e4"), + makeEvent(now.Add(-20*time.Minute), "e5"), + } + expected := []string{"e1", "e2", "e3", "e4", "e5"} + next := f.BlobContent(t, query, data, "") + assert.Nil(t, next) + c := validateBlobs(t, store, expected, baseCursor) + assert.Equal(t, len(expected), c.line) +} + +func TestContentBlobResumeToLine(t *testing.T) { + var f fakePoll + var store contentStore + ctx := testConfig() + ctx.Callback = store.onEvent + baseCursor := newCursor(stream{"myTenant", "contentype"}, time.Now()) + const skip = 3 + baseCursor.line = skip + query := ContentBlob("http://test.localhost/", baseCursor, ctx).WithSkipLines(skip) + data := []common.MapStr{ + makeEvent(now.Add(-time.Hour), "e1"), + makeEvent(now.Add(-2*time.Hour), "e2"), + makeEvent(now.Add(-30*time.Minute), "e3"), + makeEvent(now.Add(-10*time.Second), "e4"), + makeEvent(now.Add(-20*time.Minute), "e5"), + } + expected := []string{"e4", "e5"} + next := f.BlobContent(t, query, data, "") + assert.Nil(t, next) + c := validateBlobs(t, store, expected, baseCursor) + assert.Equal(t, len(expected), c.line-skip) +} + +func TestContentBlobPaged(t *testing.T) { + var f fakePoll + var store contentStore + ctx := apiEnvironment{ + Logger: logp.L(), + Callback: store.onEvent, + } + baseCursor := newCursor(stream{"myTenant", "contentype"}, time.Now()) + query := ContentBlob("http://test.localhost/", baseCursor, ctx) + data := []common.MapStr{ + makeEvent(now.Add(-time.Hour), "e1"), + makeEvent(now.Add(-2*time.Hour), "e2"), + makeEvent(now.Add(-30*time.Minute), "e3"), + makeEvent(now.Add(-10*time.Second), "e4"), + makeEvent(now.Add(-20*time.Minute), "e5"), + makeEvent(now.Add(-20*time.Minute), "e6"), + } + expected := []string{"e1", "e2", "e3"} + next := f.BlobContent(t, query, data[:3], "http://test.localhost/page/2") + assert.NotNil(t, next) + assert.IsType(t, paginator{}, next) + c := validateBlobs(t, store, expected, baseCursor) + assert.Equal(t, 3, c.line) + store.events = nil + next = f.BlobContent(t, next, data[3:5], "http://test.localhost/page/3") + assert.IsType(t, paginator{}, next) + expected = []string{"e4", "e5"} + c = validateBlobs(t, store, expected, c) + assert.Equal(t, 5, c.line) + store.events = nil + next = f.BlobContent(t, next, data[5:], "") + assert.Nil(t, next) + expected = []string{"e6"} + c = validateBlobs(t, store, expected, c) + assert.Equal(t, 6, c.line) +} diff --git a/x-pack/filebeat/input/o365audit/dates.go b/x-pack/filebeat/input/o365audit/dates.go new file mode 100644 index 00000000000..aa9eb1374e4 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/dates.go @@ -0,0 +1,106 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "sort" + "time" + + "github.com/joeshaw/multierror" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" +) + +const ( + // Date format used by audit objects. + apiDateFormat = "2006-01-02T15:04:05" + timeDay = time.Hour * 24 +) + +var ( + errTypeCastFailed = errors.New("key is not expected type") +) + +// Date formats used in the JSON objects returned by the API. +// This is just a safeguard in case the date format used by the API is +// updated to include sub-second resolution or timezone information. +var apiDateFormats = dateFormats{ + time.RFC3339Nano, + time.RFC3339, + apiDateFormat, +} + +// Date formats used by HTTP/1.1 servers. +var httpDateFormats = dateFormats{ + time.RFC1123, + time.RFC850, + time.ANSIC, + time.RFC1123Z, +} + +// A helper to parse dates using different formats. +type dateFormats []string + +// Parse will try to parse the given string-formatted date in the formats +// specified in the dateFormats until one succeeds. +func (d dateFormats) Parse(str string) (t time.Time, err error) { + for _, fmt := range d { + if t, err = time.Parse(fmt, str); err == nil { + return t.UTC(), nil + } + } + return time.Now().UTC(), fmt.Errorf("unable to parse date '%s' with formats %v", str, d) +} + +// Get a key from a map and cast it to string. +func getString(m common.MapStr, key string) (string, error) { + iValue, err := m.GetValue(key) + if err != nil { + return "", err + } + str, ok := iValue.(string) + if !ok { + return "", errTypeCastFailed + } + return str, nil +} + +// Parse a date from the given map key. +func getDateKey(m common.MapStr, key string, formats dateFormats) (t time.Time, err error) { + str, err := getString(m, key) + if err != nil { + return t, err + } + return formats.Parse(str) +} + +// Sort a slice of maps by one of its keys parsed as a date in the given format(s). +func sortMapSliceByDate(s []common.MapStr, dateKey string, formats dateFormats) error { + var errs multierror.Errors + sort.Slice(s, func(i, j int) bool { + di, e1 := getDateKey(s[i], dateKey, formats) + dj, e2 := getDateKey(s[j], dateKey, formats) + if e1 != nil { + errs = append(errs, e1) + } + if e2 != nil { + errs = append(errs, e2) + } + return di.Before(dj) + }) + return errors.Wrapf(errs.Err(), "failed sorting by date key:%s", dateKey) +} + +func inRange(d, maxLimit time.Duration) bool { + if maxLimit < 0 { + maxLimit = -maxLimit + } + if d < 0 { + d = -d + } + return d < maxLimit +} diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go new file mode 100644 index 00000000000..4798b762102 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/input.go @@ -0,0 +1,294 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "context" + "sync" + "time" + + "github.com/Azure/go-autorest/autorest" + "github.com/joeshaw/multierror" + "github.com/pkg/errors" + + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/common/useragent" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/filebeat/input/o365audit/auth" + "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" +) + +const ( + inputName = "o365audit" + fieldsPrefix = inputName +) + +func init() { + if err := input.Register(inputName, NewInput); err != nil { + panic(errors.Wrapf(err, "unable to create %s input", inputName)) + } +} + +type o365input struct { + config Config + outlet channel.Outleter + storage *stateStorage + log *logp.Logger + pollers map[stream]*poll.Poller + cancel func() + ctx context.Context + wg sync.WaitGroup + runOnce sync.Once +} + +type apiEnvironment struct { + TenantID string + ContentType string + Config APIConfig + Callback func(beat.Event) bool + Logger *logp.Logger + Clock func() time.Time +} + +// NewInput creates a new o365audit input. +func NewInput( + cfg *common.Config, + connector channel.Connector, + inputContext input.Context, +) (inp input.Input, err error) { + cfgwarn.Beta("The %s input is beta", inputName) + + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, errors.Wrapf(err, "reading %s input config", inputName) + } + + log := logp.NewLogger(inputName) + + // TODO: Update with input v2 state. + storage := newStateStorage(noopPersister{}) + + var out channel.Outleter + out, err = connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: inputContext.DynamicFields, + }, + ACKLastEvent: func(private interface{}) { + // Errors don't have a cursor. + if cursor, ok := private.(cursor); ok { + log.Debugf("ACKed cursor %+v", cursor) + if err := storage.Save(cursor); err != nil && err != errNoUpdate { + log.Errorf("Error saving state: %v", err) + } + } + }, + }) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + if err != nil { + cancel() + } + }() + + pollers := make(map[stream]*poll.Poller) + for _, tenantID := range config.tenants { + // MaxRequestsPerMinute limitation is per tenant. + delay := time.Duration(len(config.contentTypes)) * time.Minute / time.Duration(config.API.MaxRequestsPerMinute) + auth, err := auth.NewProviderFromCertificate( + config.API.AuthenticationEndpoint, + config.API.Resource, + config.ApplicationID, + tenantID, + config.CertificateConfig, + ) + if err != nil { + return nil, err + } + + for _, contentType := range config.contentTypes { + key := stream{ + tenantID: tenantID, + contentType: contentType, + } + poller, err := poll.New( + poll.WithTokenProvider(auth), + poll.WithMinRequestInterval(delay), + poll.WithLogger(log.With("tenantID", tenantID, "contentType", contentType)), + poll.WithContext(ctx), + poll.WithRequestDecorator( + autorest.WithUserAgent(useragent.UserAgent("Filebeat-"+inputName)), + autorest.WithQueryParameters(common.MapStr{ + "publisherIdentifier": tenantID, + }), + ), + ) + if err != nil { + return nil, errors.Wrap(err, "failed to create API poller") + } + pollers[key] = poller + } + } + + return &o365input{ + config: config, + outlet: out, + storage: storage, + log: log, + pollers: pollers, + ctx: ctx, + cancel: cancel, + }, nil +} + +// Run starts the o365input. Only has effect the first time it's called. +func (inp *o365input) Run() { + inp.runOnce.Do(inp.run) +} + +func (inp *o365input) run() { + for stream, poller := range inp.pollers { + start := inp.loadLastLocation(stream) + inp.log.Debugw("Start fetching events", + "cursor", start, + "tenantID", stream.tenantID, + "contentType", stream.contentType) + inp.runPoller(poller, start) + } +} + +func (inp *o365input) runPoller(poller *poll.Poller, start cursor) { + ctx := apiEnvironment{ + TenantID: start.tenantID, + ContentType: start.contentType, + Config: inp.config.API, + Callback: inp.reportEvent, + Logger: poller.Logger(), + Clock: time.Now, + } + inp.wg.Add(1) + go func() { + defer logp.Recover("panic in " + inputName + " runner.") + defer inp.wg.Done() + action := ListBlob(start, ctx) + // When resuming from a saved state, it's necessary to query for the + // same startTime that provided the last ACKed event. Otherwise there's + // the risk of observing partial blobs with different line counts, due to + // how the backend works. + if start.line > 0 { + action = action.WithStartTime(start.startTime) + } + if err := poller.Run(action); err != nil { + inp.log.Errorf("API request failed with error: %v", err.Error()) + msg := common.MapStr{} + msg.Put("error.message", err.Error()) + event := beat.Event{ + Timestamp: time.Now(), + Fields: msg, + } + inp.reportEvent(event) + } + }() +} + +func (inp *o365input) reportEvent(event beat.Event) bool { + return inp.outlet.OnEvent(event) +} + +// Stop terminates the o365 input. +func (inp *o365input) Stop() { + inp.log.Info("Stopping input " + inputName) + defer inp.log.Info(inputName + " stopped.") + defer inp.outlet.Close() + inp.cancel() +} + +// Wait terminates the o365input and waits for all the pollers to finalize. +func (inp *o365input) Wait() { + inp.Stop() + inp.wg.Wait() +} + +func (inp *o365input) loadLastLocation(key stream) cursor { + period := inp.config.API.MaxRetention + retentionLimit := time.Now().UTC().Add(-period) + cursor, err := inp.storage.Load(key) + if err != nil { + if err == errStateNotFound { + inp.log.Infof("No saved state found. Will fetch events for the last %v.", period.String()) + } else { + inp.log.Errorw("Error loading saved state. Will fetch all retained events. "+ + "Depending on max_retention, this can cause event loss or duplication.", + "error", err, + "max_retention", period.String()) + } + cursor.timestamp = retentionLimit + } + if cursor.timestamp.Before(retentionLimit) { + inp.log.Warnw("Last update exceeds the retention limit. "+ + "Probably some events have been lost.", + "resume_since", cursor, + "retention_limit", retentionLimit, + "max_retention", period.String()) + // Due to API limitations, it's necessary to perform a query for each + // day. These avoids performing a lot of queries that will return empty + // when the input hasn't run in a long time. + cursor.timestamp = retentionLimit + } + return cursor +} + +var errTerminated = errors.New("terminated due to output closed") + +// Report returns an action that produces a beat.Event from the given object. +func (env apiEnvironment) Report(doc common.MapStr, private interface{}) poll.Action { + return func(poll.Enqueuer) error { + if !env.Callback(toBeatEvent(doc, private)) { + return errTerminated + } + return nil + } +} + +// ReportAPIError returns an action that produces a beat.Event from an API error. +func (env apiEnvironment) ReportAPIError(err apiError) poll.Action { + return func(poll.Enqueuer) error { + if !env.Callback(err.ToBeatEvent()) { + return errTerminated + } + return nil + } +} + +func toBeatEvent(doc common.MapStr, private interface{}) beat.Event { + var errs multierror.Errors + ts, err := getDateKey(doc, "CreationTime", apiDateFormats) + if err != nil { + ts = time.Now() + errs = append(errs, errors.Wrap(err, "failed parsing CreationTime")) + } + b := beat.Event{ + Timestamp: ts, + Fields: common.MapStr{ + fieldsPrefix: doc, + }, + Private: private, + } + if len(errs) > 0 { + msgs := make([]string, len(errs)) + for idx, e := range errs { + msgs[idx] = e.Error() + } + b.PutValue("error.message", msgs) + } + return b +} diff --git a/x-pack/filebeat/input/o365audit/listblobs.go b/x-pack/filebeat/input/o365audit/listblobs.go new file mode 100644 index 00000000000..715a330c93c --- /dev/null +++ b/x-pack/filebeat/input/o365audit/listblobs.go @@ -0,0 +1,270 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "sort" + "time" + + "github.com/Azure/go-autorest/autorest" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" +) + +// listBlob is a poll.Transaction that handles the content/"blobs" list. +type listBlob struct { + cursor cursor + startTime, endTime time.Time + delay time.Duration + env apiEnvironment +} + +// ListBlob creates a new poll.Transaction that lists content starting from +// the given cursor position. +func ListBlob(cursor cursor, env apiEnvironment) listBlob { + l := listBlob{ + cursor: cursor, + env: env, + } + return l.adjustTimes(cursor.timestamp) +} + +// WithStartTime allows to alter the startTime of a listBlob. This is necessary +// for requests that are resuming from the cursor position of an existing blob, +// as it has been observed that the server won't return the same blob, but a +// partial one, when queried with the time that this blob was created. +func (l listBlob) WithStartTime(start time.Time) listBlob { + return l.adjustTimes(start) +} + +func (l listBlob) adjustTimes(since time.Time) listBlob { + now := l.env.Clock() + // Can't query more than in the past. + fromLimit := now.Add(-l.env.Config.MaxRetention) + if since.Before(fromLimit) { + since = fromLimit + } + // Max query is 24h worth of events. + to := since.Add(timeDay) + // Can't query into the future. Polling for new events every interval. + var delay time.Duration + if to.After(now) { + since = now.Add(-l.env.Config.LiveWindowSize) + if since.Before(l.cursor.timestamp) { + since = l.cursor.timestamp + } + to = now + delay = l.env.Config.LiveWindowPollInterval + } + l.startTime = since.UTC() + l.endTime = to.UTC() + l.delay = delay + return l +} + +// Delay returns the delay before executing a transaction. +func (l listBlob) Delay() time.Duration { + return l.delay +} + +// String returns the printable representation of a listBlob. +func (l listBlob) String() string { + return fmt.Sprintf("list blobs from:%s to:%s", l.startTime, l.endTime) +} + +// RequestDecorators returns the decorators used to perform a request. +func (l listBlob) RequestDecorators() []autorest.PrepareDecorator { + return []autorest.PrepareDecorator{ + autorest.WithBaseURL(l.env.Config.Resource), + autorest.WithPath("api/v1.0"), + autorest.WithPath(l.cursor.tenantID), + autorest.WithPath("activity/feed/subscriptions/content"), + autorest.WithQueryParameters( + map[string]interface{}{ + "contentType": l.cursor.contentType, + "startTime": l.startTime.Format(apiDateFormat), + "endTime": l.endTime.Format(apiDateFormat), + }), + } +} + +// OnResponse handles the output of a list content request. +func (l listBlob) OnResponse(response *http.Response) (actions []poll.Action) { + if response.StatusCode != 200 { + return l.handleError(response) + } + + if delta := getServerTimeDelta(response); l.env.Config.AdjustClockWarn && !inRange(delta, l.env.Config.AdjustClockMinDifference) { + l.env.Logger.Warnf("Server clock is offset by %v: Check system clock to avoid event loss.", delta) + } + + var list []content + if err := readJSONBody(response, &list); err != nil { + return []poll.Action{ + poll.Terminate(err), + } + } + + // Sort content by creation date and then by ID. + sort.Slice(list, func(i, j int) bool { + return list[i].Created.Before(list[j].Created) || (list[i].Created == list[j].Created && list[i].ID < list[j].ID) + }) + + // Save in the cursor the startTime that was used to obtain this blobs. + // In case of resuming retrieval using that cursor, it will be necessary to + // use the same startTime to observe the same blobs. Otherwise there's the + // risk of observing partial blobs. + l.cursor = l.cursor.WithStartTime(l.startTime) + + for _, entry := range list { + // Only fetch blobs that advance the cursor. + if l.cursor.TryAdvance(entry) { + l.env.Logger.Debugf("+ fetch blob date:%v id:%s", entry.Created.UTC(), entry.ID) + actions = append(actions, poll.Fetch( + ContentBlob(entry.URI, l.cursor, l.env). + WithID(entry.ID). + WithSkipLines(l.cursor.line))) + } else { + l.env.Logger.Debugf("- skip blob date:%v id:%s", entry.Created.UTC(), entry.ID) + } + if entry.Created.Before(l.startTime) { + l.env.Logger.Errorf("! Event created before query") + } + if entry.Created.After(l.endTime) { + l.env.Logger.Errorf("! Event created after query") + } + } + // Fetch the next page if a NextPageUri header is found. + if url, found := getNextPage(response); found { + return append(actions, poll.Fetch(newPager(url, l))) + } + // Otherwise fetch the next time window. + return append(actions, poll.Fetch(l.Next())) +} + +// Next returns a listBlob that will fetch events in future. +func (l listBlob) Next() listBlob { + return l.adjustTimes(l.endTime) +} + +var fatalErrors = map[string]struct{}{ + // The permission set ({0}) sent in the request did not include the expected permission ActivityFeed.Read. + "AF10001": {}, + // Missing parameter: {0}. + "AF20001": {}, + // Invalid parameter type: {0}. Expected type: {1} + "AF20002": {}, + // TODO: + // Expiration {0} provided is set to past date and time. + "AF20003": {}, + // The tenant ID passed in the URL ({0}) does not match the tenant ID passed in the access token ({1}). + "AF20010": {}, + // Specified tenant ID ({0}) does not exist in the system or has been deleted. + "AF20011": {}, + // Specified tenant ID ({0}) is incorrectly configured in the system. + "AF20012": {}, + // The tenant ID passed in the URL ({0}) is not a valid GUID. + "AF20013": {}, + // The specified content type is not valid. + "AF20020": {}, + // The webhook endpoint {{0}) could not be validated. {1} + "AF20021": {}, + // Invalid nextPage Input: {0}. + "AF20031": {}, +} + +func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { + var msg apiError + if err := readJSONBody(response, &msg); err != nil { + return []poll.Action{poll.Terminate(err)} + } + l.env.Logger.Warnf("Got error %s: %+v", response.Status, msg) + + if response.StatusCode == 401 { + // Authentication error. Renew oauth token and repeat this op. + l.delay = l.env.Config.LiveWindowPollInterval + return []poll.Action{ + poll.RenewToken(), + poll.Fetch(l), + } + } + + switch msg.Error.Code { + + // AF20022: No subscription found for the specified content type + // AF20023: The subscription was disabled by [..] + case "AF20022", "AF20023": + // Subscribe and retry + return []poll.Action{ + poll.Fetch(Subscribe(l.env)), + poll.Fetch(l), + } + // AF20030: Start time and end time must both be specified (or both omitted) and must + // be less than or equal to 24 hours apart, with the start time no more than + // 7 days in the past. + // AF20055: (Same). + case "AF20030", "AF20055": + // As of writing this, the server fails a request if it's more than + // retention_time(7d)+1h in the past. + // On the other hand, requests can be days into the future without error. + delta := getServerTimeDelta(response) + l.env.Logger.Errorf("Server is complaining about query interval. "+ + "This is usually a problem with the local clock and the server's clock "+ + "being out of sync. Time difference with server is %v.", delta) + if l.env.Config.AdjustClock && !inRange(delta, l.env.Config.AdjustClockMinDifference) { + l.env.Clock = func() time.Time { + return time.Now().Add(delta) + } + l.delay = l.env.Config.ErrorRetryInterval + l.env.Logger.Info("Compensating for time difference") + return []poll.Action{ + poll.Fetch(l.adjustTimes(l.startTime)), + } + } else { + l.env.Logger.Infow("Not adjusting for time offset.", + "api.adjust_clock", l.env.Config.AdjustClock, + "api.adjust_clock_min_difference", l.env.Config.AdjustClockMinDifference, + "difference", delta) + } + case "AF429": + // ... + case "AF50000": + // ... + } + l.delay = l.env.Config.ErrorRetryInterval + return []poll.Action{ + l.env.ReportAPIError(msg), + poll.Fetch(l), + } +} + +func readJSONBody(response *http.Response, dest interface{}) error { + defer autorest.Respond(response, + autorest.ByDiscardingBody(), + autorest.ByClosing()) + body, err := ioutil.ReadAll(response.Body) + if err != nil { + return errors.Wrap(err, "reading body failed") + } + logp.L().Infof(" --> content: %s", string(body)) + if err = json.Unmarshal(body, dest); err != nil { + return errors.Wrap(err, "decoding json failed") + } + return nil +} + +func getServerTimeDelta(response *http.Response) time.Duration { + serverDate, err := httpDateFormats.Parse(response.Header.Get("Date")) + if err != nil { + return 0 + } + return serverDate.Sub(time.Now()) +} diff --git a/x-pack/filebeat/input/o365audit/listblobs_test.go b/x-pack/filebeat/input/o365audit/listblobs_test.go new file mode 100644 index 00000000000..acbcc9b8342 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/listblobs_test.go @@ -0,0 +1,413 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/Azure/go-autorest/autorest" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" +) + +const contentType = "Audit.AzureActiveDirectory" + +var now = time.Now().UTC() + +type blob struct { + Created time.Time `json:"contentCreated"` + Expiration time.Time `json:"contentExpiration"` + Id string `json:"contentId"` + Type string `json:"contentType"` + Uri string `json:"contentUri"` +} + +func idDate(d time.Time) string { + return strings.ReplaceAll(d.Format("20060102150405.999999999"), ".", "") +} + +func makeBlob(c time.Time, path string) blob { + created := c.UTC() + id := fmt.Sprintf("%s$%s$%s$%s$emea0026", + idDate(created), + idDate(created.Add(time.Hour)), + strings.ReplaceAll(strings.ToLower(contentType), ".", "_"), + strings.ReplaceAll(contentType, ".", "_")) + return blob{ + Created: created, + Expiration: created.Add(time.Hour * 24 * 7), + Id: id, + Type: contentType, + Uri: "https://test.localhost/" + path, + } +} + +type fakePoll struct { + queue []poll.Transaction +} + +func (f *fakePoll) RenewToken() error { + return nil +} + +func (f *fakePoll) Enqueue(item poll.Transaction) error { + f.queue = append(f.queue, item) + return nil +} + +func (f *fakePoll) PagedSearchQuery(t testing.TB, lb poll.Transaction, db []blob) (urls []string, next poll.Transaction) { + const pageSize = 3 + n := len(db) + var from, to int + switch v := lb.(type) { + case listBlob: + from = 0 + case paginator: + req, err := autorest.Prepare(&http.Request{}, v.RequestDecorators()...) + if !assert.NoError(t, err) { + t.Fatal(err) + } + nextArray, ok := req.URL.Query()["nextPage"] + if !assert.True(t, ok) || len(nextArray) != 1 { + t.Fatal("nextPage param is missing in pager query") + } + from, err = strconv.Atoi(nextArray[0]) + if !assert.NoError(t, err) { + t.Fatal(err) + } + } + if to = from + pageSize; to > n { + to = n + } + result := db[from:to] + nextUrl := "" + if to < n { + nextUrl = fmt.Sprintf("http://localhost.test/something?nextPage=%d", to) + } + return f.deliverResult(t, lb, result, nextUrl) +} + +func (f *fakePoll) deliverResult(t testing.TB, pl poll.Transaction, msg interface{}, nextUrl string) (urls []string, next poll.Transaction) { + js, err := json.Marshal(msg) + if !assert.NoError(t, err) { + t.Fatal(err) + } + response := &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewReader(js)), + ContentLength: int64(len(js)), + } + if nextUrl != "" { + response.Header = http.Header{ + "NextPageUri": []string{nextUrl}, + } + } + return f.finishQuery(t, pl, response) +} + +func (f *fakePoll) SearchQuery(t testing.TB, lb listBlob, db []blob) (urls []string, next poll.Transaction) { + t.Log("Query start:", now.Sub(lb.startTime), "end:", now.Sub(lb.endTime)) + lowerBound := sort.Search(len(db), func(i int) bool { + return !db[i].Created.Before(lb.startTime) + }) + upperBound := sort.Search(len(db), func(i int) bool { + return !db[i].Created.Before(lb.endTime) + }) + result := db[lowerBound:upperBound] + return f.deliverResult(t, lb, result, "") +} + +func (f *fakePoll) finishQuery(t testing.TB, pl poll.Transaction, resp *http.Response) (urls []string, next poll.Transaction) { + for _, a := range pl.OnResponse(resp) { + if err := a(f); !assert.NoError(t, err) { + t.Fatal(err) + } + } + if n := len(f.queue); n > 0 { + urls = make([]string, n-1) + for i := 0; i < n-1; i++ { + req, err := autorest.Prepare(&http.Request{}, f.queue[i].RequestDecorators()...) + if !assert.NoError(t, err) { + t.Fatal(err) + } + urls[i] = req.URL.Path[1:] + } + next = f.queue[n-1] + } + f.queue = nil + return urls, next +} + +func (f *fakePoll) subscriptionError(t testing.TB, lb listBlob) (subscribe, listBlob) { + t.Log("Query start:", now.Sub(lb.startTime), "end:", now.Sub(lb.endTime)) + var apiErr apiError + apiErr.Error.Code = "AF20022" + apiErr.Error.Message = "No subscription found for the specified content type" + js, err := json.Marshal(apiErr) + if !assert.NoError(t, err) { + t.Fatal(err) + } + t.Log(string(js)) + resp := &http.Response{ + StatusCode: 400, + Body: ioutil.NopCloser(bytes.NewReader(js)), + } + for _, a := range lb.OnResponse(resp) { + if err := a(f); !assert.NoError(t, err) { + t.Fatal(err) + } + } + if !assert.Len(t, f.queue, 2) { + t.Fatal("need 2 actions") + } + if !assert.IsType(t, subscribe{}, f.queue[0]) { + t.Fatal("expected type not found") + } + if !assert.IsType(t, lb, f.queue[1]) { + t.Fatal("expected type not found") + } + return f.queue[0].(subscribe), f.queue[1].(listBlob) +} + +func testConfig() apiEnvironment { + logp.TestingSetup() + config := defaultConfig() + return apiEnvironment{ + Config: config.API, + Logger: logp.NewLogger(inputName + " test"), + Clock: func() time.Time { + return now + }, + } +} + +func TestListBlob(t *testing.T) { + ctx := testConfig() + + db := []blob{ + // 7d+ ago + makeBlob(now.Add(-time.Hour*(1+24*7)), "expired"), + // [7,6d) ago + makeBlob(now.Add(-time.Hour*(8+24*6)), "day1_1"), + makeBlob(now.Add(-time.Hour*(3+24*6)), "day1_2"), + // [6d,5d) ago + makeBlob(now.Add(-time.Hour*(3+24*5)), "day2_1"), + + // [5d-4d) ago + makeBlob(now.Add(-time.Hour*(24*5)), "day3_1_limit"), + makeBlob(now.Add(-time.Hour*(23+24*4)), "day3_2"), + // Yesterday + makeBlob(now.Add(-time.Hour*(12+24*1)), "day6"), + // Today + makeBlob(now.Add(-time.Hour*12), "today_1"), + makeBlob(now.Add(-time.Hour*7), "today_2"), + } + lb := ListBlob(newCursor(stream{"1234", contentType}, time.Time{}), ctx) + var f fakePoll + // 6 days ago + blobs, next := f.SearchQuery(t, lb, db) + assert.Equal(t, []string{"day1_1", "day1_2"}, blobs) + assert.IsType(t, listBlob{}, next) + // 5 days ago + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"day2_1"}, blobs) + + // 4 days ago + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"day3_1_limit", "day3_2"}, blobs) + + // 3 days ago + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) + + // 2 days ago + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) + + // Yesterday + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"day6"}, blobs) + + // Today + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"today_1", "today_2"}, blobs) + + // Query for new data + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) + + // New blob + db = append(db, makeBlob(now.Add(-time.Hour*5), "live_1")) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"live_1"}, blobs) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) + + // Two new blobs + db = append(db, makeBlob(now.Add(-time.Hour*5+time.Second), "live_2")) + db = append(db, makeBlob(now.Add(-time.Hour*5+2*time.Second), "live_3")) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"live_2", "live_3"}, blobs) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) + + // Two more blobs with the same timestamp. + // I don't even know if this is possible, but assuming that in this case + // they will have a different ID because the ID uses the timestamp up to a + // nanosecond precision while the date only has millisecond-precision. + db = append(db, makeBlob(now.Add(-time.Hour*3+time.Nanosecond), "live_4a")) + db = append(db, makeBlob(now.Add(-time.Hour*3+2*time.Nanosecond), "live_4b")) + db = append(db, makeBlob(now.Add(-time.Hour*3+3*time.Nanosecond), "live_4c")) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Equal(t, []string{"live_4a", "live_4b", "live_4c"}, blobs) + + blobs, next = f.SearchQuery(t, next.(listBlob), db) + assert.Empty(t, blobs) +} + +func TestSubscriptionStart(t *testing.T) { + logp.TestingSetup() + log := logp.L() + ctx := apiEnvironment{ + ContentType: contentType, + TenantID: "1234", + Logger: log, + Clock: func() time.Time { + return now + }, + } + lb := ListBlob(newCursor(stream{"1234", contentType}, time.Time{}), ctx) + var f fakePoll + s, l := f.subscriptionError(t, lb) + assert.Equal(t, lb.cursor, l.cursor) + assert.Equal(t, lb.endTime, l.endTime) + assert.Equal(t, lb.startTime, l.startTime) + assert.Equal(t, lb.delay, l.delay) + assert.Equal(t, lb.cursor, l.cursor) + assert.Equal(t, lb.env.TenantID, l.env.TenantID) + assert.Equal(t, lb.env.ContentType, l.env.ContentType) + assert.Equal(t, lb.env.Logger, l.env.Logger) + assert.Equal(t, contentType, s.ContentType) + assert.Equal(t, lb.cursor.tenantID, s.TenantID) +} + +func TestPagination(t *testing.T) { + ctx := testConfig() + db := []blob{ + makeBlob(now.Add(-time.Hour*47+1*time.Nanosecond), "e1"), + makeBlob(now.Add(-time.Hour*47+2*time.Nanosecond), "e2"), + makeBlob(now.Add(-time.Hour*47+3*time.Nanosecond), "e3"), + makeBlob(now.Add(-time.Hour*47+4*time.Nanosecond), "e4"), + makeBlob(now.Add(-time.Hour*47+5*time.Nanosecond), "e5"), + makeBlob(now.Add(-time.Hour*47+6*time.Nanosecond), "e6"), + makeBlob(now.Add(-time.Hour*47+7*time.Nanosecond), "e7"), + makeBlob(now.Add(-time.Hour*47+8*time.Nanosecond), "e8"), + } + lb := ListBlob(newCursor(stream{"1234", contentType}, now.Add(-time.Hour*48)), ctx) + var f fakePoll + // 6 days ago + blobs, next := f.PagedSearchQuery(t, lb, db) + assert.Equal(t, []string{"e1", "e2", "e3"}, blobs) + assert.IsType(t, paginator{}, next) + + blobs, next = f.PagedSearchQuery(t, next, db) + assert.Equal(t, []string{"e4", "e5", "e6"}, blobs) + assert.IsType(t, paginator{}, next) + + blobs, next = f.PagedSearchQuery(t, next, db) + assert.Equal(t, []string{"e7", "e8"}, blobs) + nextlb, ok := next.(listBlob) + if !assert.True(t, ok) { + t.Fatal("bad type after pagination") + } + assert.Equal(t, lb.endTime, nextlb.startTime) + assert.True(t, lb.endTime.Before(nextlb.endTime)) +} + +func mkTime(t testing.TB, str string) time.Time { + tm, err := time.Parse(apiDateFormat, str) + if !assert.NoError(t, err) { + t.Fatal(err) + } + return tm +} + +func TestAdvance(t *testing.T) { + start := mkTime(t, "2020-02-01T15:00:00") + ev1 := mkTime(t, "2020-02-02T12:00:00") + now1 := mkTime(t, "2020-02-03T00:00:00") + ev2 := mkTime(t, "2020-02-03T12:00:00") + now2 := mkTime(t, "2020-02-04T00:00:00") + now3 := mkTime(t, "2020-02-06T00:00:00") + db := []blob{ + makeBlob(ev1, "e1"), + makeBlob(ev2, "e2"), + } + now := &now1 + ctx := testConfig() + ctx.Clock = func() time.Time { + return *now + } + lb := ListBlob(newCursor(stream{"tenant", contentType}, start), ctx) + assert.Equal(t, start, lb.startTime) + assert.Equal(t, start.Add(time.Hour*24), lb.endTime) + assert.True(t, lb.endTime.Before(now1)) + var f fakePoll + blobs, next := f.SearchQuery(t, lb, db) + assert.Equal(t, []string{"e1"}, blobs) + assert.IsType(t, listBlob{}, next) + lb = next.(listBlob) + assert.Equal(t, ev1, lb.startTime) + assert.Equal(t, now1, lb.endTime) + + now = &now2 + blobs, next = f.SearchQuery(t, lb, db) + assert.Empty(t, blobs) + assert.IsType(t, listBlob{}, next) + lb = next.(listBlob) + assert.Equal(t, now1, lb.startTime) + assert.Equal(t, now2, lb.endTime) + + blobs, next = f.SearchQuery(t, lb, db) + assert.Equal(t, []string{"e2"}, blobs) + assert.IsType(t, listBlob{}, next) + lb = next.(listBlob) + assert.Equal(t, ev1.Add(time.Hour*24), lb.startTime) + assert.Equal(t, now2, lb.endTime) + + now = &now3 + blobs, next = f.SearchQuery(t, lb, db) + assert.Empty(t, blobs) + assert.IsType(t, listBlob{}, next) + lb = next.(listBlob) + assert.Equal(t, now2, lb.startTime) + assert.Equal(t, now2.Add(time.Hour*24), lb.endTime) + + blobs, next = f.SearchQuery(t, lb, db) + assert.Empty(t, blobs) + assert.IsType(t, listBlob{}, next) + lb = next.(listBlob) + assert.Equal(t, now2.Add(time.Hour*24), lb.startTime) + assert.Equal(t, now3, lb.endTime) +} diff --git a/x-pack/filebeat/input/o365audit/pagination.go b/x-pack/filebeat/input/o365audit/pagination.go new file mode 100644 index 00000000000..30ff8c341a9 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/pagination.go @@ -0,0 +1,65 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "net/http" + "time" + + "github.com/Azure/go-autorest/autorest" + + "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" +) + +// paginator is a decorator around a poll.Transaction to parse paginated requests. +type paginator struct { + url string + inner poll.Transaction +} + +// String returns the printable representation of this transaction. +func (p paginator) String() string { + return fmt.Sprintf("pager for url:`%s` inner:%s", p.url, p.inner) +} + +// RequestDecorators returns the decorators used to perform a request. +func (p paginator) RequestDecorators() []autorest.PrepareDecorator { + return []autorest.PrepareDecorator{ + autorest.WithBaseURL(p.url), + } +} + +// OnResponse parses the response using the wrapped transaction. +func (p paginator) OnResponse(r *http.Response) []poll.Action { + return p.inner.OnResponse(r) +} + +// Delay returns the delay for the wrapped transaction. +func (p paginator) Delay() time.Duration { + return p.inner.Delay() +} + +func newPager(pageUrl string, inner poll.Transaction) poll.Transaction { + return paginator{ + url: pageUrl, + inner: inner, + } +} + +// The documentation mentions NextPageUri, but shows NetPageUrl in the examples. +var nextPageHeaders = []string{ + "NextPageUri", + "NextPageUrl", +} + +func getNextPage(response *http.Response) (url string, found bool) { + for _, h := range nextPageHeaders { + if urls, found := response.Header[h]; found && len(urls) > 0 { + return urls[0], true + } + } + return "", false +} diff --git a/x-pack/filebeat/input/o365audit/poll/poll.go b/x-pack/filebeat/input/o365audit/poll/poll.go new file mode 100644 index 00000000000..8269ae27efd --- /dev/null +++ b/x-pack/filebeat/input/o365audit/poll/poll.go @@ -0,0 +1,259 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package poll + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/Azure/go-autorest/autorest" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/filebeat/input/o365audit/auth" +) + +// Transaction is the interface that wraps a request-response transaction to be +// performed by the poller. +type Transaction interface { + fmt.Stringer + + // RequestDecorators must return the list of decorators used to customize + // an http.Request. + RequestDecorators() []autorest.PrepareDecorator + + // OnResponse receives the resulting http.Response and returns the actions + // to be performed. + OnResponse(*http.Response) []Action + + // Delay returns the required delay before performing the request. + Delay() time.Duration +} + +// Poller encapsulates a single-threaded polling loop that performs requests +// and executes actions in response. +type Poller struct { + decorators []autorest.PrepareDecorator // Fixed decorators to apply to each request. + log *logp.Logger + tp auth.TokenProvider + list transactionList // List of pending transactions. + interval time.Duration // Minimum interval between transactions. + ctx context.Context +} + +// New creates a new Poller. +func New(options ...PollerOption) (p *Poller, err error) { + p = &Poller{ + ctx: context.Background(), + } + for _, opt := range options { + if err = opt(p); err != nil { + return nil, err + } + } + return p, nil +} + +// Run starts the poll loop with the given first transaction and continuing with +// any transactions spawned by it. It will execute until an error, a Terminate +// action is returned by a transaction, it runs out of transactions to perform, +// or a context set using WithContext() is done. +func (r *Poller) Run(item Transaction) error { + r.list.push(item) + for r.ctx.Err() == nil { + transaction := r.list.pop() + if transaction == nil { + return nil + } + if err := r.fetch(transaction); err != nil { + return err + } + } + return nil +} + +func (r *Poller) fetch(item Transaction) error { + r.log.Debugf("* Fetch %s", item) + // The order here is important. item's decorators must come first as those + // set the URL, which is required by other decorators (WithQueryParameters). + decorators := append( + append([]autorest.PrepareDecorator{}, item.RequestDecorators()...), + r.decorators...) + if r.tp != nil { + token, err := r.tp.Token() + if err != nil { + return errors.Wrap(err, "failed getting a token") + } + decorators = append(decorators, autorest.WithBearerAuthorization(token)) + } + request, err := autorest.Prepare(&http.Request{}, decorators...) + if err != nil { + return errors.Wrap(err, "failed preparing request") + } + delay := item.Delay() + if delay < r.interval { + delay = r.interval + } + r.log.Debugf(" -- wait %s for %s", delay, request.URL.String()) + + response, err := autorest.Send(request, + autorest.DoCloseIfError(), + autorest.AfterDelay(delay)) + if err != nil { + return errors.Wrap(err, "failed sending request") + } + + acts := item.OnResponse(response) + r.log.Debugf(" <- Result (%s) #acts=%d", response.Status, len(acts)) + + for _, act := range acts { + if err = act(r); err != nil { + return errors.Wrapf(err, "error acting on %+v", act) + } + } + + return nil +} + +// Logger returns the logger used. +func (p *Poller) Logger() *logp.Logger { + return p.log +} + +// PollerOption is the type for additional configuration options for a Poller. +type PollerOption func(r *Poller) error + +// WithRequestDecorator sets additional request decorators that will be applied +// to all requests. +func WithRequestDecorator(decorators ...autorest.PrepareDecorator) PollerOption { + return func(r *Poller) error { + r.decorators = append(r.decorators, decorators...) + return nil + } +} + +// WithTokenProvider sets the token provider that will be used to set a bearer +// token to all requests. +func WithTokenProvider(tp auth.TokenProvider) PollerOption { + return func(r *Poller) error { + if r.tp != nil { + return errors.New("tried to set more than one token provider") + } + r.tp = tp + return nil + } +} + +// WithLogger sets the logger to use. +func WithLogger(logger *logp.Logger) PollerOption { + return func(r *Poller) error { + r.log = logger + return nil + } +} + +// WithContext sets the context used to terminate the poll loop. +func WithContext(ctx context.Context) PollerOption { + return func(r *Poller) error { + r.ctx = ctx + return nil + } +} + +// WithMinRequestInterval sets the minimum delay between requests. +func WithMinRequestInterval(d time.Duration) PollerOption { + return func(r *Poller) error { + r.interval = d + return nil + } +} + +type listItem struct { + item Transaction + next *listItem +} + +type transactionList struct { + head *listItem + tail *listItem + size uint +} + +func (p *transactionList) push(item Transaction) { + li := &listItem{ + item: item, + } + if p.head != nil { + p.tail.next = li + } else { + p.head = li + } + p.tail = li + p.size++ +} + +func (p *transactionList) pop() Transaction { + item := p.head + if item == nil { + return nil + } + p.head = item.next + if p.head == nil { + p.tail = nil + } + p.size-- + return item.item +} + +// Enqueuer is the interface provided to actions so they can act on a Poller. +type Enqueuer interface { + Enqueue(item Transaction) error + RenewToken() error +} + +// Action is an operation returned by a transaction. +type Action func(q Enqueuer) error + +// Enqueue adds a new transaction to the queue. +func (r *Poller) Enqueue(item Transaction) error { + r.list.push(item) + return nil +} + +// RenewToken renews the token provider's master token in the case of an +// authorization error. +func (r *Poller) RenewToken() error { + if r.tp == nil { + return errors.New("can't renew token: no token provider set") + } + return r.tp.Renew() +} + +// Terminate action causes the poll loop to finish with the given error. +func Terminate(err error) Action { + return func(Enqueuer) error { + if err == nil { + return errors.New("polling terminated without a specific error") + } + return errors.Wrap(err, "polling terminated due to error") + } +} + +// Fetch action will add an element to the transaction queue. +func Fetch(item Transaction) Action { + return func(q Enqueuer) error { + return q.Enqueue(item) + } +} + +// RenewToken will renew the token provider's master token in the case of an +// authorization error. +func RenewToken() Action { + return func(q Enqueuer) error { + return q.RenewToken() + } +} diff --git a/x-pack/filebeat/input/o365audit/schema.go b/x-pack/filebeat/input/o365audit/schema.go new file mode 100644 index 00000000000..fd66b4f29f4 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/schema.go @@ -0,0 +1,63 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "time" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +type apiError struct { + Error struct { + Code string `json:"code"` + Message string `json:"message"` + } `json:"error"` +} + +func (e apiError) getErrorStrings() (code, msg string) { + const none = "(none)" + code, msg = e.Error.Code, e.Error.Message + if len(code) == 0 { + code = none + } + if len(msg) == 0 { + msg = none + } + return +} + +func (e apiError) String() string { + code, msg := e.getErrorStrings() + return fmt.Sprintf("api error:%s %s", code, msg) +} + +// ToBeatEvent returns a beat.Event representing the API error. +func (e apiError) ToBeatEvent() beat.Event { + code, msg := e.getErrorStrings() + return beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "error": common.MapStr{ + "code": code, + "message": msg, + }, + }, + } +} + +type content struct { + Type string `json:"contentType"` + ID string `json:"contentId"` + URI string `json:"contentUri"` + Created time.Time `json:"contentCreated"` + Expiration time.Time `json:"contentExpiration"` +} + +type subscribeResponse struct { + Status string `json:"status"` +} diff --git a/x-pack/filebeat/input/o365audit/state.go b/x-pack/filebeat/input/o365audit/state.go new file mode 100644 index 00000000000..ecdb8fc89ff --- /dev/null +++ b/x-pack/filebeat/input/o365audit/state.go @@ -0,0 +1,158 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "errors" + "fmt" + "sync" + "time" +) + +var errNoUpdate = errors.New("new cursor doesn't preceed the existing cursor") + +// Stream represents an event stream. +type stream struct { + tenantID, contentType string +} + +// A cursor represents a point in time within an event stream +// that can be persisted and used to resume processing from that point. +type cursor struct { + // Identifier for the event stream. + stream + + // createdTime for the last seen blob. + timestamp time.Time + // index of object count (1...n) within a blob. + line int + // startTime used in the last list content query. + // This is necessary to ensure that the same blobs are observed. + startTime time.Time +} + +// Create a new cursor. +func newCursor(s stream, time time.Time) cursor { + return cursor{ + stream: s, + timestamp: time, + } +} + +// TryAdvance advances the cursor to the given content blob +// if it's not in the past. +// Returns whether the given content needs to be processed. +func (c *cursor) TryAdvance(ct content) bool { + if ct.Created.Before(c.timestamp) { + return false + } + if ct.Created.Equal(c.timestamp) { + // Only need to re-process the current content blob if we're + // seeking to a line inside it. + return c.line > 0 + } + c.timestamp = ct.Created + c.line = 0 + return true +} + +// Before allows to compare cursors to see if the new cursor needs to be persisted. +func (c cursor) Before(b cursor) bool { + if c.contentType != b.contentType || c.tenantID != b.tenantID { + panic(fmt.Sprintf("assertion failed: %+v vs %+v", c, b)) + } + + if c.timestamp.Before(b.timestamp) { + return true + } + if c.timestamp.Equal(b.timestamp) { + return c.line < b.line + } + return false +} + +// WithStartTime allows to create a cursor with an updated startTime. +func (c cursor) WithStartTime(s time.Time) cursor { + c.startTime = s + return c +} + +// ForNextLine returns a new cursor for the next line within a blob. +func (c cursor) ForNextLine() cursor { + c.line++ + return c +} + +// String returns the printable representation of a cursor. +func (c cursor) String() string { + return fmt.Sprintf("cursor{tenantID:%s contentType:%s timestamp:%s line:%d start:%s}", + c.tenantID, c.contentType, c.timestamp, c.line, c.startTime) +} + +// ErrStateNotFound is the error returned by a statePersister when a cursor +// is not found for a stream. +var errStateNotFound = errors.New("no saved state found") + +type statePersister interface { + Load(key stream) (cursor, error) + Save(cursor cursor) error +} + +type stateStorage struct { + sync.Mutex + saved map[stream]cursor + persister statePersister +} + +func (s *stateStorage) Load(key stream) (cursor, error) { + s.Lock() + defer s.Unlock() + if st, found := s.saved[key]; found { + return st, nil + } + cur, err := s.persister.Load(key) + if err != nil { + if err != errStateNotFound { + return cur, err + } + cur = newCursor(key, time.Time{}) + } + return cur, s.saveUnsafe(cur) +} + +func (s *stateStorage) Save(c cursor) error { + s.Lock() + defer s.Unlock() + return s.saveUnsafe(c) +} + +func (s *stateStorage) saveUnsafe(c cursor) error { + if prev, found := s.saved[c.stream]; found { + if !prev.Before(c) { + return errNoUpdate + } + } + if s.saved == nil { + s.saved = make(map[stream]cursor) + } + s.saved[c.stream] = c + return s.persister.Save(c) +} + +func newStateStorage(underlying statePersister) *stateStorage { + return &stateStorage{ + persister: underlying, + } +} + +type noopPersister struct{} + +func (p noopPersister) Load(key stream) (cursor, error) { + return cursor{}, errStateNotFound +} + +func (p noopPersister) Save(cursor cursor) error { + return nil +} diff --git a/x-pack/filebeat/input/o365audit/state_test.go b/x-pack/filebeat/input/o365audit/state_test.go new file mode 100644 index 00000000000..71b778d16ec --- /dev/null +++ b/x-pack/filebeat/input/o365audit/state_test.go @@ -0,0 +1,105 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNoopState(t *testing.T) { + const ( + ct = "content-type" + tn = "my_tenant" + ) + myStream := stream{tn, ct} + t.Run("new state", func(t *testing.T) { + st := newStateStorage(noopPersister{}) + cur, err := st.Load(myStream) + if !assert.NoError(t, err) { + t.Fatal(err) + } + empty := newCursor(myStream, time.Time{}) + assert.Equal(t, empty, cur) + }) + t.Run("update state", func(t *testing.T) { + st := newStateStorage(noopPersister{}) + cur, err := st.Load(myStream) + if !assert.NoError(t, err) { + t.Fatal(err) + } + advanced := cur.TryAdvance(content{ + Type: tn, + ID: "1234", + URI: "http://localhost.test/my_uri", + Created: time.Now(), + Expiration: time.Now().Add(time.Hour), + }) + assert.True(t, advanced) + err = st.Save(cur) + if !assert.NoError(t, err) { + t.Fatal(err) + } + saved, err := st.Load(myStream) + if !assert.NoError(t, err) { + t.Fatal(err) + } + assert.Equal(t, cur, saved) + }) + t.Run("forbid reversal", func(t *testing.T) { + st := newStateStorage(noopPersister{}) + cur := newCursor(myStream, time.Now()) + next := cur.ForNextLine() + err := st.Save(next) + if !assert.NoError(t, err) { + t.Fatal(err) + } + err = st.Save(cur) + assert.Equal(t, errNoUpdate, err) + }) + t.Run("multiple contexts", func(t *testing.T) { + st := newStateStorage(noopPersister{}) + cursors := []cursor{ + newCursor(myStream, time.Time{}), + newCursor(stream{"tenant2", ct}, time.Time{}), + newCursor(stream{ct, "bananas"}, time.Time{}), + } + for idx, cur := range cursors { + msg := fmt.Sprintf("idx:%d cur:%+v", idx, cur) + err := st.Save(cur) + if !assert.NoError(t, err, msg) { + t.Fatal(err) + } + } + for idx, cur := range cursors { + msg := fmt.Sprintf("idx:%d cur:%+v", idx, cur) + saved, err := st.Load(cur.stream) + if !assert.NoError(t, err, msg) { + t.Fatal(err) + } + assert.Equal(t, cur, saved) + } + for idx, cur := range cursors { + cur = cur.ForNextLine() + cursors[idx] = cur + msg := fmt.Sprintf("idx:%d cur:%+v", idx, cur) + err := st.Save(cur) + if !assert.NoError(t, err, msg) { + t.Fatal(err) + } + } + for idx, cur := range cursors { + msg := fmt.Sprintf("idx:%d cur:%+v", idx, cur) + saved, err := st.Load(cur.stream) + if !assert.NoError(t, err, msg) { + t.Fatal(err) + } + assert.Equal(t, cur, saved) + } + }) +} diff --git a/x-pack/filebeat/input/o365audit/subscribe.go b/x-pack/filebeat/input/o365audit/subscribe.go new file mode 100644 index 00000000000..fd8584d5e09 --- /dev/null +++ b/x-pack/filebeat/input/o365audit/subscribe.go @@ -0,0 +1,81 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package o365audit + +import ( + "fmt" + "net/http" + "time" + + "github.com/Azure/go-autorest/autorest" + + "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" +) + +// Subscribe is a poll.Transaction that subscribes to an event stream. +type subscribe struct { + apiEnvironment +} + +// String returns the printable representation of a subscribe transaction. +func (s subscribe) String() string { + return fmt.Sprintf("subscribe tenant:%s contentType:%s", s.TenantID, s.ContentType) +} + +// RequestDecorators returns the decorators used to perform a request. +func (s subscribe) RequestDecorators() []autorest.PrepareDecorator { + return []autorest.PrepareDecorator{ + autorest.AsPost(), + autorest.WithBaseURL(s.Config.Resource), + autorest.WithPath("api/v1.0"), + autorest.WithPath(s.TenantID), + autorest.WithPath("activity/feed/subscriptions/start"), + autorest.WithQueryParameters( + map[string]interface{}{ + "contentType": s.ContentType, + }), + } +} + +// OnResponse handles the output of a list content request. +func (s subscribe) OnResponse(response *http.Response) []poll.Action { + if response.StatusCode != 200 { + return s.handleError(response) + } + var js subscribeResponse + if err := readJSONBody(response, &js); err != nil { + return []poll.Action{ + poll.Terminate(err), + } + } + if js.Status != "enabled" { + return []poll.Action{ + poll.Terminate(fmt.Errorf("unable to subscribe. Got status: %s", js.Status)), + } + } + return nil +} + +func (s subscribe) handleError(response *http.Response) []poll.Action { + var msg apiError + if err := readJSONBody(response, &msg); err != nil { + return []poll.Action{poll.Terminate(err)} + } + return []poll.Action{ + poll.Terminate(fmt.Errorf("got an error when subscribing: %s body: %+v", response.Status, msg)), + } +} + +// Delay returns the delay before executing a transaction. +func (s subscribe) Delay() time.Duration { + return time.Second * 5 +} + +// Subscribe returns an action to subscribe to a stream. +func Subscribe(env apiEnvironment) subscribe { + return subscribe{ + apiEnvironment: env, + } +} From bdb0ddb5cac9460e62dfe977f1822fe7435d967c Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 12 Feb 2020 10:00:01 +0100 Subject: [PATCH 02/19] Improve error handling --- .../filebeat/input/o365audit/contentblob.go | 42 +++++++++++++++++-- x-pack/filebeat/input/o365audit/listblobs.go | 31 +++++++------- 2 files changed, 55 insertions(+), 18 deletions(-) diff --git a/x-pack/filebeat/input/o365audit/contentblob.go b/x-pack/filebeat/input/o365audit/contentblob.go index c032fd35acc..de32f9569f4 100644 --- a/x-pack/filebeat/input/o365audit/contentblob.go +++ b/x-pack/filebeat/input/o365audit/contentblob.go @@ -48,10 +48,7 @@ func (c contentBlob) Delay() time.Duration { // OnResponse parses the response for a content blob. func (c contentBlob) OnResponse(response *http.Response) (actions []poll.Action) { if response.StatusCode != 200 { - // TODO: - return append(actions, poll.Terminate( - fmt.Errorf("operation %s returned HTTP code %d %s", - c, response.StatusCode, response.Status))) + return c.handleError(response) } var js []common.MapStr if err := readJSONBody(response, &js); err != nil { @@ -90,6 +87,33 @@ func (c contentBlob) OnResponse(response *http.Response) (actions []poll.Action) return actions } +func (c contentBlob) handleError(response *http.Response) (actions []poll.Action) { + var msg apiError + readJSONBody(response, &msg) + c.env.Logger.Warnf("Got error %s: %+v", response.Status, msg) + + if _, found := fatalErrors[msg.Error.Code]; found { + return []poll.Action{ + c.env.ReportAPIError(msg), + poll.Terminate(errors.New(msg.Error.Message)), + } + } + + switch response.StatusCode { + case 401: // Authentication error. Renew oauth token and repeat this op. + return []poll.Action{ + poll.RenewToken(), + poll.Fetch(withDelay{contentBlob: c, delay: c.env.Config.LiveWindowPollInterval}), + } + case 404: + return nil + } + if msg.Error.Code != "" { + actions = append(actions, c.env.ReportAPIError(msg)) + } + return append(actions, poll.Fetch(withDelay{contentBlob: c, delay: c.env.Config.ErrorRetryInterval})) +} + // ContentBlob creates a new contentBlob. func ContentBlob(url string, cursor cursor, env apiEnvironment) contentBlob { return contentBlob{ @@ -110,3 +134,13 @@ func (c contentBlob) WithSkipLines(nlines int) contentBlob { c.skipLines = nlines return c } + +type withDelay struct { + contentBlob + delay time.Duration +} + +// Delay overrides the contentBlob's delay. +func (w withDelay) Delay() time.Duration { + return w.delay +} diff --git a/x-pack/filebeat/input/o365audit/listblobs.go b/x-pack/filebeat/input/o365audit/listblobs.go index 715a330c93c..fcbf19fc517 100644 --- a/x-pack/filebeat/input/o365audit/listblobs.go +++ b/x-pack/filebeat/input/o365audit/listblobs.go @@ -156,13 +156,10 @@ func (l listBlob) Next() listBlob { } var fatalErrors = map[string]struct{}{ - // The permission set ({0}) sent in the request did not include the expected permission ActivityFeed.Read. - "AF10001": {}, // Missing parameter: {0}. "AF20001": {}, // Invalid parameter type: {0}. Expected type: {1} "AF20002": {}, - // TODO: // Expiration {0} provided is set to past date and time. "AF20003": {}, // The tenant ID passed in the URL ({0}) does not match the tenant ID passed in the access token ({1}). @@ -177,15 +174,11 @@ var fatalErrors = map[string]struct{}{ "AF20020": {}, // The webhook endpoint {{0}) could not be validated. {1} "AF20021": {}, - // Invalid nextPage Input: {0}. - "AF20031": {}, } func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { var msg apiError - if err := readJSONBody(response, &msg); err != nil { - return []poll.Action{poll.Terminate(err)} - } + readJSONBody(response, &msg) l.env.Logger.Warnf("Got error %s: %+v", response.Status, msg) if response.StatusCode == 401 { @@ -197,8 +190,14 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { } } - switch msg.Error.Code { + if _, found := fatalErrors[msg.Error.Code]; found { + return []poll.Action{ + l.env.ReportAPIError(msg), + poll.Terminate(errors.New(msg.Error.Message)), + } + } + switch msg.Error.Code { // AF20022: No subscription found for the specified content type // AF20023: The subscription was disabled by [..] case "AF20022", "AF20023": @@ -235,15 +234,19 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { "difference", delta) } case "AF429": - // ... + // Too many requests. case "AF50000": // ... + // Invalid nextPage Input: {0}. + case "AF20031": + // Can be ignored. } - l.delay = l.env.Config.ErrorRetryInterval - return []poll.Action{ - l.env.ReportAPIError(msg), - poll.Fetch(l), + + if msg.Error.Code != "" { + actions = append(actions, l.env.ReportAPIError(msg)) } + l.delay = l.env.Config.ErrorRetryInterval + return append(actions, poll.Fetch(l)) } func readJSONBody(response *http.Response, dest interface{}) error { From 1a7df995e69fe457de826c52e9e468e794b46195 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 12 Feb 2020 14:26:07 +0100 Subject: [PATCH 03/19] Fix initial query failure when queued for a long time. If the first query list_blobs(now-7d,now-6d) is queued for more than 1h (because of service unavailable errors for example), when it finally runs it falls outside the acceptable time-range for the server, resulting in a AF20030 error. --- x-pack/filebeat/input/o365audit/listblobs.go | 22 +++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/x-pack/filebeat/input/o365audit/listblobs.go b/x-pack/filebeat/input/o365audit/listblobs.go index fcbf19fc517..a376c9a32ae 100644 --- a/x-pack/filebeat/input/o365audit/listblobs.go +++ b/x-pack/filebeat/input/o365audit/listblobs.go @@ -15,7 +15,6 @@ import ( "github.com/Azure/go-autorest/autorest" "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" ) @@ -214,7 +213,20 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { // As of writing this, the server fails a request if it's more than // retention_time(7d)+1h in the past. // On the other hand, requests can be days into the future without error. - delta := getServerTimeDelta(response) + + // First check if this is caused by a request close to that's been + // queued for hours because of server being down. Repeat the request + // with updated times. + now := l.env.Clock() + delta := now.Sub(l.startTime) + l.delay = l.env.Config.LiveWindowPollInterval + if delta > (l.env.Config.MaxRetention + 30*time.Minute) { + return []poll.Action{ + poll.Fetch(l.adjustTimes(l.startTime)), + } + } + + delta = getServerTimeDelta(response) l.env.Logger.Errorf("Server is complaining about query interval. "+ "This is usually a problem with the local clock and the server's clock "+ "being out of sync. Time difference with server is %v.", delta) @@ -224,15 +236,15 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { } l.delay = l.env.Config.ErrorRetryInterval l.env.Logger.Info("Compensating for time difference") - return []poll.Action{ - poll.Fetch(l.adjustTimes(l.startTime)), - } } else { l.env.Logger.Infow("Not adjusting for time offset.", "api.adjust_clock", l.env.Config.AdjustClock, "api.adjust_clock_min_difference", l.env.Config.AdjustClockMinDifference, "difference", delta) } + return []poll.Action{ + poll.Fetch(l.adjustTimes(l.startTime)), + } case "AF429": // Too many requests. case "AF50000": From 90f62bce4e42182d5f464bedda8021ef55b7a75f Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 12 Feb 2020 14:28:21 +0100 Subject: [PATCH 04/19] Removed extra debug --- x-pack/filebeat/input/o365audit/listblobs.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/filebeat/input/o365audit/listblobs.go b/x-pack/filebeat/input/o365audit/listblobs.go index a376c9a32ae..4f739f16a67 100644 --- a/x-pack/filebeat/input/o365audit/listblobs.go +++ b/x-pack/filebeat/input/o365audit/listblobs.go @@ -269,7 +269,6 @@ func readJSONBody(response *http.Response, dest interface{}) error { if err != nil { return errors.Wrap(err, "reading body failed") } - logp.L().Infof(" --> content: %s", string(body)) if err = json.Unmarshal(body, dest); err != nil { return errors.Wrap(err, "decoding json failed") } From 30ebd24471a5addd383ee6d389b66eb245332649 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 12 Feb 2020 18:20:14 +0100 Subject: [PATCH 05/19] Better handling of network errors --- x-pack/filebeat/input/o365audit/input.go | 2 +- x-pack/filebeat/input/o365audit/poll/poll.go | 20 ++++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index 4798b762102..abbc7076ddc 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -188,7 +188,7 @@ func (inp *o365input) runPoller(poller *poll.Poller, start cursor) { action = action.WithStartTime(start.startTime) } if err := poller.Run(action); err != nil { - inp.log.Errorf("API request failed with error: %v", err.Error()) + ctx.Logger.Errorf("API polling terminated with error: %v", err.Error()) msg := common.MapStr{} msg.Put("error.message", err.Error()) event := beat.Event{ diff --git a/x-pack/filebeat/input/o365audit/poll/poll.go b/x-pack/filebeat/input/o365audit/poll/poll.go index 8269ae27efd..d130b01f002 100644 --- a/x-pack/filebeat/input/o365audit/poll/poll.go +++ b/x-pack/filebeat/input/o365audit/poll/poll.go @@ -75,8 +75,11 @@ func (r *Poller) Run(item Transaction) error { } return nil } - func (r *Poller) fetch(item Transaction) error { + return r.fetchWithDelay(item, r.interval) +} + +func (r *Poller) fetchWithDelay(item Transaction, minDelay time.Duration) error { r.log.Debugf("* Fetch %s", item) // The order here is important. item's decorators must come first as those // set the URL, which is required by other decorators (WithQueryParameters). @@ -94,17 +97,15 @@ func (r *Poller) fetch(item Transaction) error { if err != nil { return errors.Wrap(err, "failed preparing request") } - delay := item.Delay() - if delay < r.interval { - delay = r.interval - } + delay := max(item.Delay(), minDelay) r.log.Debugf(" -- wait %s for %s", delay, request.URL.String()) response, err := autorest.Send(request, autorest.DoCloseIfError(), autorest.AfterDelay(delay)) if err != nil { - return errors.Wrap(err, "failed sending request") + r.log.Warnf("-- error sending request: %v", err) + return r.fetchWithDelay(item, max(time.Minute, r.interval)) } acts := item.OnResponse(response) @@ -257,3 +258,10 @@ func RenewToken() Action { return q.RenewToken() } } + +func max(a, b time.Duration) time.Duration { + if a < b { + return b + } + return a +} From 3b6e221a2d2a244fa2da894bdd027b251b65bc4c Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 12 Feb 2020 23:10:38 +0100 Subject: [PATCH 06/19] Add support for client secret authentication --- x-pack/filebeat/input/o365audit/auth/auth.go | 25 ++++++++ x-pack/filebeat/input/o365audit/auth/cert.go | 59 ++++--------------- .../filebeat/input/o365audit/auth/secret.go | 23 ++++++++ x-pack/filebeat/input/o365audit/config.go | 23 ++++++++ x-pack/filebeat/input/o365audit/input.go | 15 ++--- 5 files changed, 86 insertions(+), 59 deletions(-) create mode 100644 x-pack/filebeat/input/o365audit/auth/secret.go diff --git a/x-pack/filebeat/input/o365audit/auth/auth.go b/x-pack/filebeat/input/o365audit/auth/auth.go index 99d8a5c66d9..bac0c755a89 100644 --- a/x-pack/filebeat/input/o365audit/auth/auth.go +++ b/x-pack/filebeat/input/o365audit/auth/auth.go @@ -4,6 +4,11 @@ package auth +import ( + "github.com/Azure/go-autorest/autorest/adal" + "github.com/pkg/errors" +) + // TokenProvider is the interface that wraps an authentication mechanism and // allows to obtain tokens. type TokenProvider interface { @@ -14,3 +19,23 @@ type TokenProvider interface { // when the API returns an Authentication error. Renew() error } + +// servicePrincipalToken extends *adal.ServicePrincipalToken with the +// the TokenProvider interface. +type servicePrincipalToken adal.ServicePrincipalToken + +// Token returns an oauth token that can be used for bearer authorization. +func (provider *servicePrincipalToken) Token() (string, error) { + inner := (*adal.ServicePrincipalToken)(provider) + if err := inner.EnsureFresh(); err != nil { + return "", errors.Wrap(err, "refreshing spt token") + } + token := inner.Token() + return token.OAuthToken(), nil +} + +// Renew re-authenticates with the oauth2 endpoint to get a new Service Principal Token. +func (provider *servicePrincipalToken) Renew() error { + inner := (*adal.ServicePrincipalToken)(provider) + return inner.Refresh() +} diff --git a/x-pack/filebeat/input/o365audit/auth/cert.go b/x-pack/filebeat/input/o365audit/auth/cert.go index 053b5f85784..93240befd98 100644 --- a/x-pack/filebeat/input/o365audit/auth/cert.go +++ b/x-pack/filebeat/input/o365audit/auth/cert.go @@ -8,7 +8,6 @@ import ( "crypto/rsa" "crypto/x509" "fmt" - "sync" "github.com/Azure/go-autorest/autorest/adal" "github.com/pkg/errors" @@ -16,70 +15,32 @@ import ( "github.com/elastic/beats/libbeat/common/transport/tlscommon" ) -type sptProviderFromCert struct { - sync.Mutex - certs tlscommon.CertificateConfig - applicationID string - endpoint string - resource string - tenantID string - spt *adal.ServicePrincipalToken -} - // NewProviderFromCertificate returns a TokenProvider that uses certificate-based // authentication. func NewProviderFromCertificate( endpoint, resource, applicationID, tenantID string, conf tlscommon.CertificateConfig) (sptp TokenProvider, err error) { - provider := &sptProviderFromCert{ - certs: conf, - applicationID: applicationID, - resource: resource, - endpoint: endpoint, - tenantID: tenantID, - } - if provider.spt, err = provider.getServicePrincipalToken(tenantID); err != nil { - return nil, err - } - provider.spt.SetAutoRefresh(true) - return provider, nil -} - -// Token returns an oauth token that can be used for bearer authorization. -func (provider *sptProviderFromCert) Token() (string, error) { - provider.Mutex.Lock() - defer provider.Mutex.Unlock() - if err := provider.spt.EnsureFresh(); err != nil { - return "", errors.Wrap(err, "refreshing spt token") - } - token := provider.spt.Token() - return token.OAuthToken(), nil -} - -// Renew re-authenticates with the oauth2 endpoint to get a new Service Principal Token. -func (provider *sptProviderFromCert) Renew() error { - provider.Mutex.Lock() - defer provider.Mutex.Unlock() - return provider.spt.Refresh() -} - -func (provider *sptProviderFromCert) getServicePrincipalToken(tenantID string) (*adal.ServicePrincipalToken, error) { - cert, privKey, err := loadConfigCerts(provider.certs) + cert, privKey, err := loadConfigCerts(conf) if err != nil { return nil, errors.Wrap(err, "failed loading certificates") } - oauth, err := adal.NewOAuthConfig(provider.endpoint, tenantID) + oauth, err := adal.NewOAuthConfig(endpoint, tenantID) if err != nil { return nil, errors.Wrap(err, "error generating OAuthConfig") } - return adal.NewServicePrincipalTokenFromCertificate( + spt, err := adal.NewServicePrincipalTokenFromCertificate( *oauth, - provider.applicationID, + applicationID, cert, privKey, - provider.resource, + resource, ) + if err != nil { + return nil, err + } + spt.SetAutoRefresh(true) + return (*servicePrincipalToken)(spt), nil } func loadConfigCerts(cfg tlscommon.CertificateConfig) (cert *x509.Certificate, key *rsa.PrivateKey, err error) { diff --git a/x-pack/filebeat/input/o365audit/auth/secret.go b/x-pack/filebeat/input/o365audit/auth/secret.go new file mode 100644 index 00000000000..38e170cacea --- /dev/null +++ b/x-pack/filebeat/input/o365audit/auth/secret.go @@ -0,0 +1,23 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package auth + +import ( + "github.com/Azure/go-autorest/autorest/adal" + "github.com/pkg/errors" +) + +func NewProviderFromClientSecret(endpoint, resource, applicationID, tenantID, secret string) (p TokenProvider, err error) { + oauth, err := adal.NewOAuthConfig(endpoint, tenantID) + if err != nil { + return nil, errors.Wrap(err, "error generating OAuthConfig") + } + spt, err := adal.NewServicePrincipalToken(*oauth, applicationID, secret, resource) + if err != nil { + return nil, err + } + spt.SetAutoRefresh(true) + return (*servicePrincipalToken)(spt), nil +} diff --git a/x-pack/filebeat/input/o365audit/config.go b/x-pack/filebeat/input/o365audit/config.go index 52463c57042..8256bed08e5 100644 --- a/x-pack/filebeat/input/o365audit/config.go +++ b/x-pack/filebeat/input/o365audit/config.go @@ -8,6 +8,7 @@ import ( "fmt" "time" + "github.com/elastic/beats/x-pack/filebeat/input/o365audit/auth" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common/transport/tlscommon" @@ -21,6 +22,8 @@ type Config struct { // ApplicationID (aka. client ID) of the Azure application. ApplicationID string `config:"application_id" validate:"required"` + ClientSecret string `config:"client_secret"` + // TenantID (aka. Directory ID) is a list of tenants for which to fetch // the audit logs. This can be a string or a list of strings. TenantID interface{} `config:"tenant_id,replace" validate:"required"` @@ -159,3 +162,23 @@ func asStringList(value interface{}) (list []string, err error) { } return list, nil } + +// NewTokenProvider returns an auth.TokenProvider for the given tenantID. +func (c *Config) NewTokenProvider(tenantID string) (auth.TokenProvider, error) { + if c.ClientSecret != "" { + return auth.NewProviderFromClientSecret( + c.API.AuthenticationEndpoint, + c.API.Resource, + c.ApplicationID, + tenantID, + c.ClientSecret, + ) + } + return auth.NewProviderFromCertificate( + c.API.AuthenticationEndpoint, + c.API.Resource, + c.ApplicationID, + tenantID, + c.CertificateConfig, + ) +} diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index abbc7076ddc..cea887a4791 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -20,7 +20,6 @@ import ( "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/common/useragent" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/filebeat/input/o365audit/auth" "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" ) @@ -104,17 +103,13 @@ func NewInput( for _, tenantID := range config.tenants { // MaxRequestsPerMinute limitation is per tenant. delay := time.Duration(len(config.contentTypes)) * time.Minute / time.Duration(config.API.MaxRequestsPerMinute) - auth, err := auth.NewProviderFromCertificate( - config.API.AuthenticationEndpoint, - config.API.Resource, - config.ApplicationID, - tenantID, - config.CertificateConfig, - ) + auth, err := config.NewTokenProvider(tenantID) if err != nil { return nil, err } - + if _, err = auth.Token(); err != nil { + return nil, errors.Wrapf(err, "unable to acquire authentication token for tenant:%s", tenantID) + } for _, contentType := range config.contentTypes { key := stream{ tenantID: tenantID, @@ -158,7 +153,7 @@ func (inp *o365input) Run() { func (inp *o365input) run() { for stream, poller := range inp.pollers { start := inp.loadLastLocation(stream) - inp.log.Debugw("Start fetching events", + inp.log.Infow("Start fetching events", "cursor", start, "tenantID", stream.tenantID, "contentType", stream.contentType) From d3ef9e9ba754f60863881611311eba5beee733a8 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 12 Feb 2020 23:46:57 +0100 Subject: [PATCH 07/19] Improved config error handling --- x-pack/filebeat/input/o365audit/auth/cert.go | 2 +- x-pack/filebeat/input/o365audit/config.go | 15 +++++++++++++-- x-pack/filebeat/input/o365audit/input.go | 10 +++++++++- x-pack/filebeat/input/o365audit/poll/poll.go | 1 + 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/o365audit/auth/cert.go b/x-pack/filebeat/input/o365audit/auth/cert.go index 93240befd98..38bb12c85d6 100644 --- a/x-pack/filebeat/input/o365audit/auth/cert.go +++ b/x-pack/filebeat/input/o365audit/auth/cert.go @@ -48,7 +48,7 @@ func loadConfigCerts(cfg tlscommon.CertificateConfig) (cert *x509.Certificate, k if err != nil { return nil, nil, errors.Wrapf(err, "error loading X509 certificate from '%s'", cfg.Certificate) } - if len(tlsCert.Certificate) < 1 { + if tlsCert == nil || len(tlsCert.Certificate) < 1 { return nil, nil, fmt.Errorf("no certificates loaded from '%s'", cfg.Certificate) } cert, err = x509.ParseCertificate(tlsCert.Certificate[0]) diff --git a/x-pack/filebeat/input/o365audit/config.go b/x-pack/filebeat/input/o365audit/config.go index 8256bed08e5..2069b91401e 100644 --- a/x-pack/filebeat/input/o365audit/config.go +++ b/x-pack/filebeat/input/o365audit/config.go @@ -128,8 +128,19 @@ func defaultConfig() Config { // Validate checks that the configuration is correct. func (c *Config) Validate() (err error) { - if err = c.CertificateConfig.Validate(); err != nil { - return err + hasSecret := c.ClientSecret != "" + hasCert := c.CertificateConfig.Certificate != "" + + if !hasSecret && !hasCert { + return errors.New("no authentication configured. Configure a client_secret or a certificate and key.") + } + if hasSecret && hasCert { + return errors.New("both client_secret and certificate are configured. Only one authentication method can be used.") + } + if hasCert { + if err = c.CertificateConfig.Validate(); err != nil { + return errors.Wrap(err, "invalid certificate config") + } } if c.tenants, err = asStringList(c.TenantID); err != nil { return errors.Wrap(err, "error validating tenant_id") diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index cea887a4791..8936535286e 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -62,10 +62,18 @@ func NewInput( inputContext input.Context, ) (inp input.Input, err error) { cfgwarn.Beta("The %s input is beta", inputName) + inp, err = newInput(cfg, connector, inputContext) + return inp, errors.Wrap(err, inputName) +} +func newInput( + cfg *common.Config, + connector channel.Connector, + inputContext input.Context, +) (inp input.Input, err error) { config := defaultConfig() if err := cfg.Unpack(&config); err != nil { - return nil, errors.Wrapf(err, "reading %s input config", inputName) + return nil, errors.Wrap(err, "reading config") } log := logp.NewLogger(inputName) diff --git a/x-pack/filebeat/input/o365audit/poll/poll.go b/x-pack/filebeat/input/o365audit/poll/poll.go index d130b01f002..8f430e521dd 100644 --- a/x-pack/filebeat/input/o365audit/poll/poll.go +++ b/x-pack/filebeat/input/o365audit/poll/poll.go @@ -93,6 +93,7 @@ func (r *Poller) fetchWithDelay(item Transaction, minDelay time.Duration) error } decorators = append(decorators, autorest.WithBearerAuthorization(token)) } + request, err := autorest.Prepare(&http.Request{}, decorators...) if err != nil { return errors.Wrap(err, "failed preparing request") From bb5251b0507b3df37e800e86a70a2ed6615bef06 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 13 Feb 2020 00:18:53 +0100 Subject: [PATCH 08/19] mage fmt --- x-pack/filebeat/input/o365audit/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/o365audit/config.go b/x-pack/filebeat/input/o365audit/config.go index 2069b91401e..576ad07d3a6 100644 --- a/x-pack/filebeat/input/o365audit/config.go +++ b/x-pack/filebeat/input/o365audit/config.go @@ -8,10 +8,10 @@ import ( "fmt" "time" - "github.com/elastic/beats/x-pack/filebeat/input/o365audit/auth" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/x-pack/filebeat/input/o365audit/auth" ) // Config for the O365 audit API input. From 487d5406374fa1ecb1946a3ad1e03efd4ebda30d Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 13 Feb 2020 13:16:55 +0100 Subject: [PATCH 09/19] Minor date change --- x-pack/filebeat/input/o365audit/dates.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/o365audit/dates.go b/x-pack/filebeat/input/o365audit/dates.go index aa9eb1374e4..79ce8fceea0 100644 --- a/x-pack/filebeat/input/o365audit/dates.go +++ b/x-pack/filebeat/input/o365audit/dates.go @@ -29,9 +29,10 @@ var ( // This is just a safeguard in case the date format used by the API is // updated to include sub-second resolution or timezone information. var apiDateFormats = dateFormats{ + apiDateFormat, + apiDateFormat + "Z", time.RFC3339Nano, time.RFC3339, - apiDateFormat, } // Date formats used by HTTP/1.1 servers. From 837012c8fd08a3349f188d2b5ca96ed162822e1c Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Fri, 14 Feb 2020 13:21:56 +0100 Subject: [PATCH 10/19] Set document ID from audit record ID. --- x-pack/filebeat/input/o365audit/config.go | 7 +++++++ x-pack/filebeat/input/o365audit/input.go | 9 +++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/o365audit/config.go b/x-pack/filebeat/input/o365audit/config.go index 576ad07d3a6..95bfe3ddb26 100644 --- a/x-pack/filebeat/input/o365audit/config.go +++ b/x-pack/filebeat/input/o365audit/config.go @@ -83,6 +83,11 @@ type APIConfig struct { // MaxRequestsPerMinute sets the limit on the number of API requests that // can be sent, per tenant. MaxRequestsPerMinute int `config:"max_requests_per_minute" validate:"positive"` + + // SetIDFromAuditRecord controls whether the unique "Id" field in audit + // record is used as the document id for ingestion. This helps avoiding + // duplicates. + SetIDFromAuditRecord bool `config:"set_id_from_audit_record"` } func defaultConfig() Config { @@ -122,6 +127,8 @@ func defaultConfig() Config { // According to the docs this is the max requests that are allowed // per tenant per minute. MaxRequestsPerMinute: 2000, + + SetIDFromAuditRecord: true, }, } } diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index 8936535286e..73758a355b1 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -255,7 +255,7 @@ var errTerminated = errors.New("terminated due to output closed") // Report returns an action that produces a beat.Event from the given object. func (env apiEnvironment) Report(doc common.MapStr, private interface{}) poll.Action { return func(poll.Enqueuer) error { - if !env.Callback(toBeatEvent(doc, private)) { + if !env.Callback(env.toBeatEvent(doc, private)) { return errTerminated } return nil @@ -272,7 +272,7 @@ func (env apiEnvironment) ReportAPIError(err apiError) poll.Action { } } -func toBeatEvent(doc common.MapStr, private interface{}) beat.Event { +func (env apiEnvironment) toBeatEvent(doc common.MapStr, private interface{}) beat.Event { var errs multierror.Errors ts, err := getDateKey(doc, "CreationTime", apiDateFormats) if err != nil { @@ -286,6 +286,11 @@ func toBeatEvent(doc common.MapStr, private interface{}) beat.Event { }, Private: private, } + if env.Config.SetIDFromAuditRecord { + if id, err := getString(doc, "Id"); err == nil && len(id) > 0 { + b.SetID(id) + } + } if len(errs) > 0 { msgs := make([]string, len(errs)) for idx, e := range errs { From 44a5400bc6095173b96481d21d63086c3e397352 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Tue, 18 Feb 2020 11:19:26 +0100 Subject: [PATCH 11/19] Make input less noisy when backend is down --- x-pack/filebeat/input/o365audit/listblobs.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/o365audit/listblobs.go b/x-pack/filebeat/input/o365audit/listblobs.go index 4f739f16a67..d2ef05ea541 100644 --- a/x-pack/filebeat/input/o365audit/listblobs.go +++ b/x-pack/filebeat/input/o365audit/listblobs.go @@ -179,14 +179,22 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { var msg apiError readJSONBody(response, &msg) l.env.Logger.Warnf("Got error %s: %+v", response.Status, msg) + l.delay = l.env.Config.ErrorRetryInterval - if response.StatusCode == 401 { + switch response.StatusCode { + case 401: // Authentication error. Renew oauth token and repeat this op. l.delay = l.env.Config.LiveWindowPollInterval return []poll.Action{ poll.RenewToken(), poll.Fetch(l), } + case 408, 503: + // Known errors when the backend is down. + //Repeat the request without reporting an error. + return []poll.Action{ + poll.Fetch(l), + } } if _, found := fatalErrors[msg.Error.Code]; found { @@ -200,6 +208,7 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { // AF20022: No subscription found for the specified content type // AF20023: The subscription was disabled by [..] case "AF20022", "AF20023": + l.delay = 0 // Subscribe and retry return []poll.Action{ poll.Fetch(Subscribe(l.env)), @@ -219,8 +228,8 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { // with updated times. now := l.env.Clock() delta := now.Sub(l.startTime) - l.delay = l.env.Config.LiveWindowPollInterval if delta > (l.env.Config.MaxRetention + 30*time.Minute) { + l.delay = l.env.Config.LiveWindowPollInterval return []poll.Action{ poll.Fetch(l.adjustTimes(l.startTime)), } @@ -234,7 +243,6 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { l.env.Clock = func() time.Time { return time.Now().Add(delta) } - l.delay = l.env.Config.ErrorRetryInterval l.env.Logger.Info("Compensating for time difference") } else { l.env.Logger.Infow("Not adjusting for time offset.", @@ -252,12 +260,16 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { // Invalid nextPage Input: {0}. case "AF20031": // Can be ignored. + + // + // AF50005-AF50006: An internal error occurred. Retry the request. + case "AF50005", "AF50006": + return append(actions, poll.Fetch(l)) } if msg.Error.Code != "" { actions = append(actions, l.env.ReportAPIError(msg)) } - l.delay = l.env.Config.ErrorRetryInterval return append(actions, poll.Fetch(l)) } From 99e2857ec5da8de932702a0f1f733f728b23b613 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Tue, 18 Feb 2020 11:40:55 +0100 Subject: [PATCH 12/19] Set event.kind:pipeline_error on errors --- x-pack/filebeat/input/o365audit/input.go | 1 + x-pack/filebeat/input/o365audit/schema.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index 73758a355b1..4b176aeb482 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -194,6 +194,7 @@ func (inp *o365input) runPoller(poller *poll.Poller, start cursor) { ctx.Logger.Errorf("API polling terminated with error: %v", err.Error()) msg := common.MapStr{} msg.Put("error.message", err.Error()) + msg.Put("event.kind", "pipeline_error") event := beat.Event{ Timestamp: time.Now(), Fields: msg, diff --git a/x-pack/filebeat/input/o365audit/schema.go b/x-pack/filebeat/input/o365audit/schema.go index fd66b4f29f4..f396a9b7485 100644 --- a/x-pack/filebeat/input/o365audit/schema.go +++ b/x-pack/filebeat/input/o365audit/schema.go @@ -46,6 +46,9 @@ func (e apiError) ToBeatEvent() beat.Event { "code": code, "message": msg, }, + "event": common.MapStr{ + "kind": "pipeline_error", + }, }, } } From 61b4e18c04f9568bfb6f72abb46a735428774280 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Fri, 21 Feb 2020 15:29:31 +0100 Subject: [PATCH 13/19] Simplify some settings --- x-pack/filebeat/input/o365audit/config.go | 11 +++++------ x-pack/filebeat/input/o365audit/listblobs.go | 6 +++--- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/x-pack/filebeat/input/o365audit/config.go b/x-pack/filebeat/input/o365audit/config.go index 95bfe3ddb26..0f406889c20 100644 --- a/x-pack/filebeat/input/o365audit/config.go +++ b/x-pack/filebeat/input/o365audit/config.go @@ -70,11 +70,6 @@ type APIConfig struct { // errors performing a request. ErrorRetryInterval time.Duration `config:"error_retry_interval" validate:"positive"` - // LiveWindowSize defines the window of time [now-window, now) that will be - // used to poll for new events. If events are created outside of this window, - // they will be lost. - LiveWindowSize time.Duration `config:"live_window_size" validate:"positive"` - // LiveWindowPollInterval determines how often the input should poll for new // data once it has finished scanning for past events and reached the live // window. @@ -88,6 +83,10 @@ type APIConfig struct { // record is used as the document id for ingestion. This helps avoiding // duplicates. SetIDFromAuditRecord bool `config:"set_id_from_audit_record"` + + // MaxQuerySize is the maximum time window that can be queried. The default + // is 24h. + MaxQuerySize time.Duration `config:"max_query_size" validate:"positive"` } func defaultConfig() Config { @@ -122,7 +121,7 @@ func defaultConfig() Config { LiveWindowPollInterval: time.Minute, - LiveWindowSize: timeDay, + MaxQuerySize: timeDay, // According to the docs this is the max requests that are allowed // per tenant per minute. diff --git a/x-pack/filebeat/input/o365audit/listblobs.go b/x-pack/filebeat/input/o365audit/listblobs.go index d2ef05ea541..80d0e4b365e 100644 --- a/x-pack/filebeat/input/o365audit/listblobs.go +++ b/x-pack/filebeat/input/o365audit/listblobs.go @@ -51,12 +51,12 @@ func (l listBlob) adjustTimes(since time.Time) listBlob { if since.Before(fromLimit) { since = fromLimit } - // Max query is 24h worth of events. - to := since.Add(timeDay) + + to := since.Add(l.env.Config.MaxQuerySize) // Can't query into the future. Polling for new events every interval. var delay time.Duration if to.After(now) { - since = now.Add(-l.env.Config.LiveWindowSize) + since = now.Add(-l.env.Config.MaxQuerySize) if since.Before(l.cursor.timestamp) { since = l.cursor.timestamp } From 4c2f5eeccc1bcf55cd26e11166026fe2e1583cb1 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Fri, 21 Feb 2020 17:11:42 +0100 Subject: [PATCH 14/19] Rename live_window_poll_interval to poll_interval. --- x-pack/filebeat/docs/inputs/input-o365audit.asciidoc | 0 x-pack/filebeat/input/o365audit/config.go | 7 ++++--- x-pack/filebeat/input/o365audit/contentblob.go | 2 +- x-pack/filebeat/input/o365audit/listblobs.go | 6 +++--- 4 files changed, 8 insertions(+), 7 deletions(-) create mode 100644 x-pack/filebeat/docs/inputs/input-o365audit.asciidoc diff --git a/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc b/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc new file mode 100644 index 00000000000..e69de29bb2d diff --git a/x-pack/filebeat/input/o365audit/config.go b/x-pack/filebeat/input/o365audit/config.go index 0f406889c20..49844c765dc 100644 --- a/x-pack/filebeat/input/o365audit/config.go +++ b/x-pack/filebeat/input/o365audit/config.go @@ -22,6 +22,7 @@ type Config struct { // ApplicationID (aka. client ID) of the Azure application. ApplicationID string `config:"application_id" validate:"required"` + // ClientSecret (aka. API key) to use for authentication. ClientSecret string `config:"client_secret"` // TenantID (aka. Directory ID) is a list of tenants for which to fetch @@ -70,10 +71,10 @@ type APIConfig struct { // errors performing a request. ErrorRetryInterval time.Duration `config:"error_retry_interval" validate:"positive"` - // LiveWindowPollInterval determines how often the input should poll for new + // PollInterval determines how often the input should poll for new // data once it has finished scanning for past events and reached the live // window. - LiveWindowPollInterval time.Duration `config:"live_window_poll_interval" validate:"positive"` + PollInterval time.Duration `config:"poll_interval" validate:"positive"` // MaxRequestsPerMinute sets the limit on the number of API requests that // can be sent, per tenant. @@ -119,7 +120,7 @@ func defaultConfig() Config { ErrorRetryInterval: 5 * time.Minute, - LiveWindowPollInterval: time.Minute, + PollInterval: 3 * time.Minute, MaxQuerySize: timeDay, diff --git a/x-pack/filebeat/input/o365audit/contentblob.go b/x-pack/filebeat/input/o365audit/contentblob.go index de32f9569f4..85c4c50fb33 100644 --- a/x-pack/filebeat/input/o365audit/contentblob.go +++ b/x-pack/filebeat/input/o365audit/contentblob.go @@ -103,7 +103,7 @@ func (c contentBlob) handleError(response *http.Response) (actions []poll.Action case 401: // Authentication error. Renew oauth token and repeat this op. return []poll.Action{ poll.RenewToken(), - poll.Fetch(withDelay{contentBlob: c, delay: c.env.Config.LiveWindowPollInterval}), + poll.Fetch(withDelay{contentBlob: c, delay: c.env.Config.PollInterval}), } case 404: return nil diff --git a/x-pack/filebeat/input/o365audit/listblobs.go b/x-pack/filebeat/input/o365audit/listblobs.go index 80d0e4b365e..c08e6c95cbf 100644 --- a/x-pack/filebeat/input/o365audit/listblobs.go +++ b/x-pack/filebeat/input/o365audit/listblobs.go @@ -61,7 +61,7 @@ func (l listBlob) adjustTimes(since time.Time) listBlob { since = l.cursor.timestamp } to = now - delay = l.env.Config.LiveWindowPollInterval + delay = l.env.Config.PollInterval } l.startTime = since.UTC() l.endTime = to.UTC() @@ -184,7 +184,7 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { switch response.StatusCode { case 401: // Authentication error. Renew oauth token and repeat this op. - l.delay = l.env.Config.LiveWindowPollInterval + l.delay = l.env.Config.PollInterval return []poll.Action{ poll.RenewToken(), poll.Fetch(l), @@ -229,7 +229,7 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { now := l.env.Clock() delta := now.Sub(l.startTime) if delta > (l.env.Config.MaxRetention + 30*time.Minute) { - l.delay = l.env.Config.LiveWindowPollInterval + l.delay = l.env.Config.PollInterval return []poll.Action{ poll.Fetch(l.adjustTimes(l.startTime)), } From 336450b759a439eb17aacf20193a4e0a761ca69a Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Fri, 21 Feb 2020 17:12:16 +0100 Subject: [PATCH 15/19] o365audit input docs. --- .../docs/inputs/input-o365audit.asciidoc | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc b/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc index e69de29bb2d..5f6d09fc94c 100644 --- a/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc @@ -0,0 +1,134 @@ +[role="xpack"] + +:type: o365audit + +[id="{beatname_lc}-input-{type}"] +=== Office 365 Management Activity API input + +++++ +Office 365 Management Activity API +++++ + +beta[] + +Use the `o365audit` input to retrieve audit messages from Office 365 +and Azure AD activity logs. These are the same logs that are available under +_Audit_ _log_ _search_ in the _Security_ _and_ _Compliance_ center. + + +A single input instance can be used to fetch events for multiple tenants as long +as a single application is configured to access all tenants. Certificate-based +authentication is recommended in this scenario. + +This input doesn't perform any transformation on the incoming messages, notably +no {ecs-ref}/ecs-reference.html[Elastic Common Schema fields] are populated, and +some data is encoded as arrays of objects, which are difficult to query in +Elasticsearch. You probably want to use the +{filebeat-ref}/filebeat-module-o365.html[o365 module] instead. +// TODO: link to O365 module docs. + +Example configuration: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: o365audit + application_id: my-application-id + tenant_id: my-tenant-id + client_secret: my-client-secret +---- + +Multi-tenancy and certificate-based authentication is also supported: + +---- +{beatname_lc}.inputs: +- type: o365audit + application_id: my-application-id + tenant_id: + - tenant-id-A + - tenant-id-B + - tenant-id-C + certificate: /path/to/cert.pem + key: /path/to/private.pem + # key_passphrase: "my key's password" +---- + +==== Configuration options + +The `o365audit` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[float] +===== `application_id` + +The Application ID (also known as Client ID) of the Azure application to +authenticate as. + +[float] +===== `tenant_id` + +The tenant ID (also known as Directory ID) whose data is to be fetched. It's +also possible to specify a list of tenants IDs to fetch data from more than +one tenant. + +[float] +===== `content_type` + +List of content types to fetch. The default is to fetch all known content types: + +- Audit.AzureActiveDirectory +- Audit.Exchange +- Audit.SharePoint +- Audit.General +- DLP.All + +[float] +===== `client_secret` + +The client secret used for authentication. + +[float] +===== `certificate` + +Path to the public certificate file used for certificate-based authentication. + +[float] +===== `key` + +Path to the certificate's private key file for certificate-based authentication. + +[float] +===== `key_passphrase` + +Passphrase used to unlock the private key. + +[float] +===== `api.authentication_endpoint` + +The authentication endpoint used to authorize the Azure app. This is +`https://login.microsoftonline.com/` by default, and can be changed to access +alternative endpoints. + +===== `api.resource` + +The API resource to retrieve information from. This is +`https://manage.office.com` by default, and can be changed to access alternative +endpoints. + +===== `api.max_retention` + +The maximum data retention period to support. `178h` by default. {beatname_uc} +will fetch all retained data for a tenant when run for the first time. + +===== `api.poll_interval` + +The interval to wait before polling the API server for new events. Default `3m`. + +===== `api.max_requests_per_minute` + +The maximum number of requests to perform every minute, for each tenant. The +current limit is `2000` requests per minute per tenant. + +===== `api.max_query_size` + +The maximum time window that API allows in a single query. This is `24h`. From 629fbece60e8284e3aae179685cba8d6737e66b6 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 26 Feb 2020 10:28:55 +0100 Subject: [PATCH 16/19] Review comments --- .../docs/inputs/input-o365audit.asciidoc | 10 +++++----- x-pack/filebeat/input/o365audit/auth/auth.go | 2 +- .../filebeat/input/o365audit/contentblob.go | 2 +- x-pack/filebeat/input/o365audit/listblobs.go | 19 ++++++++++--------- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc b/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc index 5f6d09fc94c..aa1e5370b28 100644 --- a/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-o365audit.asciidoc @@ -15,7 +15,6 @@ Use the `o365audit` input to retrieve audit messages from Office 365 and Azure AD activity logs. These are the same logs that are available under _Audit_ _log_ _search_ in the _Security_ _and_ _Compliance_ center. - A single input instance can be used to fetch events for multiple tenants as long as a single application is configured to access all tenants. Certificate-based authentication is recommended in this scenario. @@ -100,7 +99,7 @@ Path to the certificate's private key file for certificate-based authentication. [float] ===== `key_passphrase` -Passphrase used to unlock the private key. +Passphrase used to decrypt the private key. [float] ===== `api.authentication_endpoint` @@ -126,9 +125,10 @@ The interval to wait before polling the API server for new events. Default `3m`. ===== `api.max_requests_per_minute` -The maximum number of requests to perform every minute, for each tenant. The -current limit is `2000` requests per minute per tenant. +The maximum number of requests to perform per minute, for each tenant. The +default is `2000`, as this is the server-side limit per tenant. ===== `api.max_query_size` -The maximum time window that API allows in a single query. This is `24h`. +The maximum time window that API allows in a single query. Defaults to `24h` +to match Microsoft's documented limit. diff --git a/x-pack/filebeat/input/o365audit/auth/auth.go b/x-pack/filebeat/input/o365audit/auth/auth.go index bac0c755a89..69899e34031 100644 --- a/x-pack/filebeat/input/o365audit/auth/auth.go +++ b/x-pack/filebeat/input/o365audit/auth/auth.go @@ -20,7 +20,7 @@ type TokenProvider interface { Renew() error } -// servicePrincipalToken extends *adal.ServicePrincipalToken with the +// servicePrincipalToken extends adal.ServicePrincipalToken with the // the TokenProvider interface. type servicePrincipalToken adal.ServicePrincipalToken diff --git a/x-pack/filebeat/input/o365audit/contentblob.go b/x-pack/filebeat/input/o365audit/contentblob.go index 85c4c50fb33..8e88ee0f0fb 100644 --- a/x-pack/filebeat/input/o365audit/contentblob.go +++ b/x-pack/filebeat/input/o365audit/contentblob.go @@ -28,7 +28,7 @@ type contentBlob struct { skipLines int } -// String return a printable representation of this transaction. +// String returns a printable representation of this transaction. func (c contentBlob) String() string { return fmt.Sprintf("content blob url:%s id:%s", c.url, c.id) } diff --git a/x-pack/filebeat/input/o365audit/listblobs.go b/x-pack/filebeat/input/o365audit/listblobs.go index c08e6c95cbf..9993351080c 100644 --- a/x-pack/filebeat/input/o365audit/listblobs.go +++ b/x-pack/filebeat/input/o365audit/listblobs.go @@ -191,7 +191,7 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { } case 408, 503: // Known errors when the backend is down. - //Repeat the request without reporting an error. + // Repeat the request without reporting an error. return []poll.Action{ poll.Fetch(l), } @@ -223,9 +223,9 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { // retention_time(7d)+1h in the past. // On the other hand, requests can be days into the future without error. - // First check if this is caused by a request close to that's been - // queued for hours because of server being down. Repeat the request - // with updated times. + // First check if this is caused by a request close to the max retention + // period that's been queued for hours because of server being down. + // Repeat the request with updated times. now := l.env.Clock() delta := now.Sub(l.startTime) if delta > (l.env.Config.MaxRetention + 30*time.Minute) { @@ -253,15 +253,16 @@ func (l listBlob) handleError(response *http.Response) (actions []poll.Action) { return []poll.Action{ poll.Fetch(l.adjustTimes(l.startTime)), } + + // Too many requests. case "AF429": - // Too many requests. + + // Internal server error. Retry the request. case "AF50000": - // ... - // Invalid nextPage Input: {0}. + + // Invalid nextPage Input: {0}. Can be ignored. case "AF20031": - // Can be ignored. - // // AF50005-AF50006: An internal error occurred. Retry the request. case "AF50005", "AF50006": return append(actions, poll.Fetch(l)) From 6079341b50401f4b749c9cff6de2e65ed1ce19b8 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Tue, 3 Mar 2020 10:11:56 +0100 Subject: [PATCH 17/19] Address review comments --- x-pack/filebeat/input/o365audit/auth/cert.go | 2 +- .../filebeat/input/o365audit/auth/secret.go | 2 ++ x-pack/filebeat/input/o365audit/config.go | 33 ++++++++----------- x-pack/filebeat/input/o365audit/input.go | 6 ++-- 4 files changed, 19 insertions(+), 24 deletions(-) diff --git a/x-pack/filebeat/input/o365audit/auth/cert.go b/x-pack/filebeat/input/o365audit/auth/cert.go index 38bb12c85d6..f0b07dc5b9f 100644 --- a/x-pack/filebeat/input/o365audit/auth/cert.go +++ b/x-pack/filebeat/input/o365audit/auth/cert.go @@ -48,7 +48,7 @@ func loadConfigCerts(cfg tlscommon.CertificateConfig) (cert *x509.Certificate, k if err != nil { return nil, nil, errors.Wrapf(err, "error loading X509 certificate from '%s'", cfg.Certificate) } - if tlsCert == nil || len(tlsCert.Certificate) < 1 { + if tlsCert == nil || len(tlsCert.Certificate) == 0 { return nil, nil, fmt.Errorf("no certificates loaded from '%s'", cfg.Certificate) } cert, err = x509.ParseCertificate(tlsCert.Certificate[0]) diff --git a/x-pack/filebeat/input/o365audit/auth/secret.go b/x-pack/filebeat/input/o365audit/auth/secret.go index 38e170cacea..c34d6d48cc6 100644 --- a/x-pack/filebeat/input/o365audit/auth/secret.go +++ b/x-pack/filebeat/input/o365audit/auth/secret.go @@ -9,6 +9,8 @@ import ( "github.com/pkg/errors" ) +// NewProviderFromClientSecret returns a token provider that uses a secret +// for authentication. func NewProviderFromClientSecret(endpoint, resource, applicationID, tenantID, secret string) (p TokenProvider, err error) { oauth, err := adal.NewOAuthConfig(endpoint, tenantID) if err != nil { diff --git a/x-pack/filebeat/input/o365audit/config.go b/x-pack/filebeat/input/o365audit/config.go index 49844c765dc..1701da87190 100644 --- a/x-pack/filebeat/input/o365audit/config.go +++ b/x-pack/filebeat/input/o365audit/config.go @@ -27,17 +27,14 @@ type Config struct { // TenantID (aka. Directory ID) is a list of tenants for which to fetch // the audit logs. This can be a string or a list of strings. - TenantID interface{} `config:"tenant_id,replace" validate:"required"` + TenantID stringList `config:"tenant_id,replace" validate:"required"` // Content-Type is a list of content-types to fetch. // This can be a string or a list of strings. - ContentType interface{} `config:"content_type,replace"` + ContentType stringList `config:"content_type,replace"` // API contains settings to adapt to changes on the API. API APIConfig `config:"api"` - - tenants []string - contentTypes []string } // APIConfig contains advanced settings that are only supposed to be changed @@ -149,36 +146,32 @@ func (c *Config) Validate() (err error) { return errors.Wrap(err, "invalid certificate config") } } - if c.tenants, err = asStringList(c.TenantID); err != nil { - return errors.Wrap(err, "error validating tenant_id") - } - if c.contentTypes, err = asStringList(c.ContentType); err != nil { - return errors.Wrap(err, "error validating content_type") - } return nil } -// A helper to allow defining a field either as a string or a list of strings. -func asStringList(value interface{}) (list []string, err error) { +type stringList []string + +// Unpack populates the stringList with either a single string value or an array. +func (s *stringList) Unpack(value interface{}) error { switch v := value.(type) { case string: - list = []string{v} + *s = []string{v} case []string: - list = v + *s = v case []interface{}: - list = make([]string, len(v)) + *s = make([]string, len(v)) for idx, ival := range v { str, ok := ival.(string) if !ok { - return nil, fmt.Errorf("string value required. Found %v (type %T) at position %d", + return fmt.Errorf("string value required. Found %v (type %T) at position %d", ival, ival, idx+1) } - list[idx] = str + (*s)[idx] = str } default: - return nil, fmt.Errorf("array of strings required. Found %v (type %T)", value, value) + return fmt.Errorf("array of strings required. Found %v (type %T)", value, value) } - return list, nil + return nil } // NewTokenProvider returns an auth.TokenProvider for the given tenantID. diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index 4b176aeb482..6985bac012c 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -108,9 +108,9 @@ func newInput( }() pollers := make(map[stream]*poll.Poller) - for _, tenantID := range config.tenants { + for _, tenantID := range config.TenantID { // MaxRequestsPerMinute limitation is per tenant. - delay := time.Duration(len(config.contentTypes)) * time.Minute / time.Duration(config.API.MaxRequestsPerMinute) + delay := time.Duration(len(config.ContentType)) * time.Minute / time.Duration(config.API.MaxRequestsPerMinute) auth, err := config.NewTokenProvider(tenantID) if err != nil { return nil, err @@ -118,7 +118,7 @@ func newInput( if _, err = auth.Token(); err != nil { return nil, errors.Wrapf(err, "unable to acquire authentication token for tenant:%s", tenantID) } - for _, contentType := range config.contentTypes { + for _, contentType := range config.ContentType { key := stream{ tenantID: tenantID, contentType: contentType, From 49c0f0b111d2bd3b3bde99610f43fab9d2d8879c Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Tue, 3 Mar 2020 11:11:54 +0100 Subject: [PATCH 18/19] Adapt to go modules --- go.mod | 1 + x-pack/filebeat/input/o365audit/auth/cert.go | 2 +- x-pack/filebeat/input/o365audit/config.go | 4 ++-- x-pack/filebeat/input/o365audit/contentblob.go | 4 ++-- .../filebeat/input/o365audit/contentblob_test.go | 8 ++++---- x-pack/filebeat/input/o365audit/dates.go | 2 +- x-pack/filebeat/input/o365audit/input.go | 16 ++++++++-------- x-pack/filebeat/input/o365audit/listblobs.go | 2 +- .../filebeat/input/o365audit/listblobs_test.go | 4 ++-- x-pack/filebeat/input/o365audit/pagination.go | 2 +- x-pack/filebeat/input/o365audit/poll/poll.go | 4 ++-- x-pack/filebeat/input/o365audit/schema.go | 4 ++-- x-pack/filebeat/input/o365audit/subscribe.go | 2 +- 13 files changed, 28 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index de1bb8b07f9..d49fb8a0ebe 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/Azure/azure-storage-blob-go v0.8.0 github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect github.com/Azure/go-autorest/autorest v0.9.4 + github.com/Azure/go-autorest/autorest/adal v0.8.1 github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 github.com/Azure/go-autorest/autorest/date v0.2.0 github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5 diff --git a/x-pack/filebeat/input/o365audit/auth/cert.go b/x-pack/filebeat/input/o365audit/auth/cert.go index f0b07dc5b9f..dc8e1584a3a 100644 --- a/x-pack/filebeat/input/o365audit/auth/cert.go +++ b/x-pack/filebeat/input/o365audit/auth/cert.go @@ -12,7 +12,7 @@ import ( "github.com/Azure/go-autorest/autorest/adal" "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) // NewProviderFromCertificate returns a TokenProvider that uses certificate-based diff --git a/x-pack/filebeat/input/o365audit/config.go b/x-pack/filebeat/input/o365audit/config.go index 1701da87190..f30e368a9e2 100644 --- a/x-pack/filebeat/input/o365audit/config.go +++ b/x-pack/filebeat/input/o365audit/config.go @@ -10,8 +10,8 @@ import ( "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/common/transport/tlscommon" - "github.com/elastic/beats/x-pack/filebeat/input/o365audit/auth" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/auth" ) // Config for the O365 audit API input. diff --git a/x-pack/filebeat/input/o365audit/contentblob.go b/x-pack/filebeat/input/o365audit/contentblob.go index 8e88ee0f0fb..44ddb911f46 100644 --- a/x-pack/filebeat/input/o365audit/contentblob.go +++ b/x-pack/filebeat/input/o365audit/contentblob.go @@ -12,8 +12,8 @@ import ( "github.com/Azure/go-autorest/autorest" "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" ) // contentBlob is a poll.Transaction that processes "content blobs": diff --git a/x-pack/filebeat/input/o365audit/contentblob_test.go b/x-pack/filebeat/input/o365audit/contentblob_test.go index 4026f6ff443..1a08c69fb36 100644 --- a/x-pack/filebeat/input/o365audit/contentblob_test.go +++ b/x-pack/filebeat/input/o365audit/contentblob_test.go @@ -10,10 +10,10 @@ import ( "github.com/stretchr/testify/assert" - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" ) type contentStore struct { diff --git a/x-pack/filebeat/input/o365audit/dates.go b/x-pack/filebeat/input/o365audit/dates.go index 79ce8fceea0..5eb53d4d6de 100644 --- a/x-pack/filebeat/input/o365audit/dates.go +++ b/x-pack/filebeat/input/o365audit/dates.go @@ -12,7 +12,7 @@ import ( "github.com/joeshaw/multierror" "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common" ) const ( diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index 6985bac012c..cafba2184f3 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -13,14 +13,14 @@ import ( "github.com/joeshaw/multierror" "github.com/pkg/errors" - "github.com/elastic/beats/filebeat/channel" - "github.com/elastic/beats/filebeat/input" - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/libbeat/common/useragent" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" + "github.com/elastic/beats/v7/filebeat/channel" + "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + "github.com/elastic/beats/v7/libbeat/common/useragent" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" ) const ( diff --git a/x-pack/filebeat/input/o365audit/listblobs.go b/x-pack/filebeat/input/o365audit/listblobs.go index 9993351080c..5be65a8d67d 100644 --- a/x-pack/filebeat/input/o365audit/listblobs.go +++ b/x-pack/filebeat/input/o365audit/listblobs.go @@ -15,7 +15,7 @@ import ( "github.com/Azure/go-autorest/autorest" "github.com/pkg/errors" - "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" ) // listBlob is a poll.Transaction that handles the content/"blobs" list. diff --git a/x-pack/filebeat/input/o365audit/listblobs_test.go b/x-pack/filebeat/input/o365audit/listblobs_test.go index acbcc9b8342..148ee2273e8 100644 --- a/x-pack/filebeat/input/o365audit/listblobs_test.go +++ b/x-pack/filebeat/input/o365audit/listblobs_test.go @@ -19,8 +19,8 @@ import ( "github.com/Azure/go-autorest/autorest" "github.com/stretchr/testify/assert" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" ) const contentType = "Audit.AzureActiveDirectory" diff --git a/x-pack/filebeat/input/o365audit/pagination.go b/x-pack/filebeat/input/o365audit/pagination.go index 30ff8c341a9..10703a0479a 100644 --- a/x-pack/filebeat/input/o365audit/pagination.go +++ b/x-pack/filebeat/input/o365audit/pagination.go @@ -11,7 +11,7 @@ import ( "github.com/Azure/go-autorest/autorest" - "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" ) // paginator is a decorator around a poll.Transaction to parse paginated requests. diff --git a/x-pack/filebeat/input/o365audit/poll/poll.go b/x-pack/filebeat/input/o365audit/poll/poll.go index 8f430e521dd..e68f0f54c8f 100644 --- a/x-pack/filebeat/input/o365audit/poll/poll.go +++ b/x-pack/filebeat/input/o365audit/poll/poll.go @@ -13,8 +13,8 @@ import ( "github.com/Azure/go-autorest/autorest" "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/filebeat/input/o365audit/auth" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/auth" ) // Transaction is the interface that wraps a request-response transaction to be diff --git a/x-pack/filebeat/input/o365audit/schema.go b/x-pack/filebeat/input/o365audit/schema.go index f396a9b7485..77519a8e953 100644 --- a/x-pack/filebeat/input/o365audit/schema.go +++ b/x-pack/filebeat/input/o365audit/schema.go @@ -8,8 +8,8 @@ import ( "fmt" "time" - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" ) type apiError struct { diff --git a/x-pack/filebeat/input/o365audit/subscribe.go b/x-pack/filebeat/input/o365audit/subscribe.go index fd8584d5e09..8077ea24622 100644 --- a/x-pack/filebeat/input/o365audit/subscribe.go +++ b/x-pack/filebeat/input/o365audit/subscribe.go @@ -11,7 +11,7 @@ import ( "github.com/Azure/go-autorest/autorest" - "github.com/elastic/beats/x-pack/filebeat/input/o365audit/poll" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" ) // Subscribe is a poll.Transaction that subscribes to an event stream. From 56f04b5484d9663e5c06afb2b8aaca9013d81f39 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 4 Mar 2020 16:05:34 +0100 Subject: [PATCH 19/19] CHANGELOG entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e5fff6f9f94..7ca2f65e3ed 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -153,6 +153,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `cloudfoundry` input to send events from Cloud Foundry. {pull}16586[16586] - Improve ECS categorization field mappings in iis module. {issue}16165[16165] {pull}16618[16618] - Improve ECS categorization field mapping in kafka module. {issue}16167[16167] {pull}16645[16645] +- Add `o365audit` input type for consuming events from Office 365 Management Activity API. {issue}16196[16196] {pull}16244[16244] *Heartbeat*