Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate access/msteams to slog + add debug logs #47045

Merged
merged 7 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 64 additions & 54 deletions integrations/access/msteams/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package msteams

import (
"context"
"log/slog"
"time"

"github.com/gravitational/trace"
Expand All @@ -25,7 +26,6 @@ import (
"github.com/gravitational/teleport/integrations/access/common"
"github.com/gravitational/teleport/integrations/access/common/teleport"
"github.com/gravitational/teleport/integrations/lib"
"github.com/gravitational/teleport/integrations/lib/logger"
pd "github.com/gravitational/teleport/integrations/lib/plugindata"
"github.com/gravitational/teleport/integrations/lib/stringset"
"github.com/gravitational/teleport/integrations/lib/watcherjob"
Expand Down Expand Up @@ -53,12 +53,22 @@ type App struct {
watcherJob lib.ServiceJob
pd *pd.CompareAndSwap[PluginData]

log *slog.Logger

*lib.Process
}

// NewApp initializes a new teleport-msteams app and returns it.
func NewApp(conf Config) (*App, error) {
app := &App{conf: conf}
log, err := conf.Log.NewSLogLogger()
if err != nil {
return nil, trace.Wrap(err)
}

app := &App{
conf: conf,
log: log.With("plugin", pluginName),
}

app.mainJob = lib.NewServiceJob(app.run)

Expand All @@ -67,8 +77,7 @@ func NewApp(conf Config) (*App, error) {

// Run starts the main job process
func (a *App) Run(ctx context.Context) error {
log := logger.Get(ctx)
log.Info("Starting Teleport MS Teams Plugin")
a.log.InfoContext(ctx, "Starting Teleport MS Teams Plugin")

err := a.init(ctx)
if err != nil {
Expand Down Expand Up @@ -131,7 +140,7 @@ func (a *App) init(ctx context.Context) error {
webProxyAddr = pong.ProxyPublicAddr
}

a.bot, err = NewBot(a.conf.MSAPI, pong.ClusterName, webProxyAddr)
a.bot, err = NewBot(a.conf.MSAPI, pong.ClusterName, webProxyAddr, a.log)
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -141,8 +150,6 @@ func (a *App) init(ctx context.Context) error {

// initBot initializes bot
func (a *App) initBot(ctx context.Context) error {
log := logger.Get(ctx)

teamsApp, err := a.bot.GetTeamsApp(ctx)
if trace.IsNotFound(err) {
return trace.Wrap(err, "MS Teams app not found in org app store.")
Expand All @@ -151,28 +158,31 @@ func (a *App) initBot(ctx context.Context) error {
return trace.Wrap(err)
}

log.WithField("name", teamsApp.DisplayName).
WithField("id", teamsApp.ID).
Info("MS Teams app found in org app store")
a.log.InfoContext(ctx, "MS Teams app found in org app store",
"name", teamsApp.DisplayName,
"id", teamsApp.ID)

if !a.conf.Preload {
return nil
}

log.Info("Preloading recipient data...")
a.log.InfoContext(ctx, "Preloading recipient data...")

for _, recipient := range a.conf.Recipients.GetAllRawRecipients() {
recipientData, err := a.bot.FetchRecipient(ctx, recipient)
if err != nil {
return trace.Wrap(err)
}
log.WithField("recipient", recipient).
WithField("chat_id", recipientData.Chat.ID).
WithField("kind", recipientData.Kind).
Info("Recipient found, chat found")
a.log.InfoContext(ctx, "Recipient and chat found",
slog.Group("recipient",
"raw", recipient,
"recipient_chat_id", recipientData.Chat.ID,
"recipient_kind", recipientData.Kind,
),
)
}

log.Info("Recipient data preloaded and cached.")
a.log.InfoContext(ctx, "Recipient data preloaded and cached")

return nil
}
Expand All @@ -193,17 +203,15 @@ func (a *App) newWatcherJob() (lib.ServiceJob, error) {

// run starts the main process
func (a *App) run(ctx context.Context) error {
log := logger.Get(ctx)

ok, err := a.watcherJob.WaitReady(ctx)
if err != nil {
return trace.Wrap(err)
}

if ok {
log.Info("Plugin is ready")
a.log.InfoContext(ctx, "Plugin is ready")
} else {
log.Error("Plugin is not ready")
a.log.ErrorContext(ctx, "Plugin is not ready")
}

a.mainJob.SetReady(ok)
Expand All @@ -215,16 +223,15 @@ func (a *App) run(ctx context.Context) error {

// checkTeleportVersion loads Teleport version and checks that it meets the minimal required
func (a *App) checkTeleportVersion(ctx context.Context) (proto.PingResponse, error) {
log := logger.Get(ctx)
log.Debug("Checking Teleport server version")
a.log.DebugContext(ctx, "Checking Teleport server version")

pong, err := a.apiClient.Ping(ctx)
if err != nil {
if trace.IsNotImplemented(err) {
return pong, trace.Wrap(err, "server version must be at least %s", minServerVersion)
}

log.Error("Unable to get Teleport server version")
a.log.ErrorContext(ctx, "Unable to get Teleport server version")
return pong, trace.Wrap(err)
}

Expand All @@ -242,48 +249,44 @@ func (a *App) onWatcherEvent(ctx context.Context, event types.Event) error {

op := event.Type
reqID := event.Resource.GetName()
ctx, _ = logger.WithField(ctx, "request_id", reqID)
log := a.log.With("request_id", reqID, "request_op", op.String())

switch op {
case types.OpPut:
ctx, _ = logger.WithField(ctx, "request_op", "put")
req, ok := event.Resource.(types.AccessRequest)
if !ok {
return trace.Errorf("unexpected resource type %T", event.Resource)
}
ctx, log := logger.WithField(ctx, "request_state", req.GetState().String())

log = log.With("request_state", req.GetState().String())
var err error

switch {
case req.GetState().IsPending():
log.Debug("Pending request received")
log.DebugContext(ctx, "Pending request received")
err = a.onPendingRequest(ctx, req)
case req.GetState().IsApproved():
log.Debug("Approval request received")
log.DebugContext(ctx, "Approval request received")
err = a.onResolvedRequest(ctx, req)
case req.GetState().IsDenied():
log.Debug("Denial request received")
log.DebugContext(ctx, "Denial request received")
err = a.onResolvedRequest(ctx, req)
default:
log.WithField("event", event).Warn("Unknown request state")
log.WarnContext(ctx, "Unknown request state", "event", event)
return nil
}

if err != nil {
log.WithError(err).Errorf("Failed to process request")
log.ErrorContext(ctx, "Failed to process request", "error", err)
return trace.Wrap(err)
}

return nil

case types.OpDelete:
ctx, log := logger.WithField(ctx, "request_op", "delete")

log.Debug("Expiration request received")
log.DebugContext(ctx, "Expiration request received")

if err := a.onDeletedRequest(ctx, reqID); err != nil {
log.WithError(err).Errorf("Failed to process deleted request")
log.ErrorContext(ctx, "Failed to process delete request", "error", err)
return trace.Wrap(err)
}
return nil
Expand All @@ -294,15 +297,16 @@ func (a *App) onWatcherEvent(ctx context.Context, event types.Event) error {

// onPendingRequest is called when there's a new request or a review
func (a *App) onPendingRequest(ctx context.Context, req types.AccessRequest) error {
log := logger.Get(ctx)

id := req.GetName()
data := pd.AccessRequestData{
User: req.GetUser(),
Roles: req.GetRoles(),
RequestReason: req.GetRequestReason(),
}

log := a.log.With("request_id", id)
log.DebugContext(ctx, "Claiming access request", "user", req.GetUser(), "roles", req.GetRoles(), "reason", req.GetRequestReason())

// Let's try to create PluginData. This equals to locking AccessRequest to this
// instance of a plugin.
_, err := a.pd.Create(ctx, id, PluginData{AccessRequestData: data})
Expand All @@ -312,25 +316,30 @@ func (a *App) onPendingRequest(ctx context.Context, req types.AccessRequest) err
if err != nil {
return trace.Wrap(err)
}
log.DebugContext(ctx, "Access request claimed")

recipients := a.getMessageRecipients(ctx, req)

if len(recipients) == 0 {
log.Warning("No recipients to notify")
log.WarnContext(ctx, "No recipients to notify")
} else {
err = a.postMessages(ctx, recipients, id, data)
if err != nil {
return trace.Wrap(err)
}
}
} else {
log.DebugContext(ctx, "Access request already claimed, skipping initial message posting")
}

// Update the received reviews
reviews := req.GetReviews()
if len(reviews) == 0 {
log.DebugContext(ctx, "No access request reviews to process")
return nil
}

log.DebugContext(ctx, "Processing access request reviews", "review_count", len(reviews))
err = a.postReviews(ctx, id, reviews)
if err != nil {
return trace.Wrap(err)
Expand All @@ -343,15 +352,18 @@ func (a *App) onPendingRequest(ctx context.Context, req types.AccessRequest) err
func (a *App) onResolvedRequest(ctx context.Context, req types.AccessRequest) error {
var tag pd.ResolutionTag
state := req.GetState()

log := a.log.With("request_id", req.GetName(), "request_state", state.String())
switch state {
case types.RequestState_APPROVED:
tag = pd.ResolvedApproved
case types.RequestState_DENIED:
tag = pd.ResolvedDenied
default:
logger.Get(ctx).Warningf("Unknown state %v (%s)", state, state.String())
log.WarnContext(ctx, "Unknown request state")
return trace.Errorf("Unknown state")
}
log.DebugContext(ctx, "Updating messages to mark the request resolved")
err := a.updateMessages(ctx, req.GetName(), tag, req.GetResolveReason(), req.GetReviews())
if err != nil {
return trace.Wrap(err)
Expand All @@ -361,6 +373,7 @@ func (a *App) onResolvedRequest(ctx context.Context, req types.AccessRequest) er

// onDeleteRequest gets called when a request is deleted
func (a *App) onDeletedRequest(ctx context.Context, reqID string) error {
a.log.DebugContext(ctx, "Updating messages to mark the request deleted/expired", "request_id", reqID, "request_state", pd.ResolvedExpired)
return a.updateMessages(ctx, reqID, pd.ResolvedExpired, "", nil)
}

Expand All @@ -369,19 +382,17 @@ func (a *App) postMessages(ctx context.Context, recipients []string, id string,
teamsData, err := a.bot.PostMessages(ctx, recipients, id, data)
if err != nil {
if len(teamsData) == 0 {
// TODO: add better logging here
a.log.ErrorContext(ctx, "Failed to post all messages to MS Teams")
return trace.Wrap(err)
}

logger.Get(ctx).WithError(err).Error("Failed to post one or more messages to MS Teams")
a.log.ErrorContext(ctx, "Failed to post one or more messages to MS Teams, continuing")
}

for _, data := range teamsData {
logger.Get(ctx).WithFields(logger.Fields{
"id": data.ID,
"timestamp": data.Timestamp,
"recipient": data.RecipientID,
}).Info("Successfully posted to MS Teams")
a.log.InfoContext(ctx, "Successfully posted to MS Teams",
"id", data.ID,
"timestamp", data.Timestamp,
"recipient", data.RecipientID)
}

// Let's update sent messages data
Expand All @@ -395,6 +406,7 @@ func (a *App) postMessages(ctx context.Context, recipients []string, id string,

// postReviews updates a message with reviews
func (a *App) postReviews(ctx context.Context, id string, reviews []types.AccessReview) error {
a.log.DebugContext(ctx, "Looking for reviews that need to be posted", "review_count", len(reviews))
pluginData, err := a.pd.Update(ctx, id, func(existing PluginData) (PluginData, error) {
teamsData := existing.TeamsData
if len(teamsData) == 0 {
Expand Down Expand Up @@ -428,8 +440,6 @@ func (a *App) postReviews(ctx context.Context, id string, reviews []types.Access

// updateMessages updates the messages status and adds the resolve reason.
func (a *App) updateMessages(ctx context.Context, reqID string, tag pd.ResolutionTag, reason string, reviews []types.AccessReview) error {
log := logger.Get(ctx)

pluginData, err := a.pd.Update(ctx, reqID, func(existing PluginData) (PluginData, error) {
// No teamsData found in the plugin data. This might be because of a race condition
// (messages not sent yet) or because sending failed (msapi error or no recipient)
Expand Down Expand Up @@ -458,29 +468,29 @@ func (a *App) updateMessages(ctx context.Context, reqID string, tag pd.Resolutio
return trace.Wrap(err)
}

log.Infof("Successfully marked request as %s in all messages", tag)
a.log.InfoContext(ctx, "Successfully updated all messages with the resolution", "resolution_tag", tag)

return nil
}

// getMessageRecipients returns a recipients list for the access request
func (a *App) getMessageRecipients(ctx context.Context, req types.AccessRequest) []string {
log := logger.Get(ctx)

// We receive a set from GetRawRecipientsFor but we still might end up with duplicate channel names.
// This can happen if this set contains the channel `C` and the email for channel `C`.
recipientSet := stringset.New()

a.log.DebugContext(ctx, "Getting suggested reviewer recipients")
var validEmailsSuggReviewers []string
for _, reviewer := range req.GetSuggestedReviewers() {
if !lib.IsEmail(reviewer) {
log.Warningf("Failed to notify a suggested reviewer: %q does not look like a valid email", reviewer)
a.log.WarnContext(ctx, "Failed to notify a suggested reviewer, does not look like a valid email", "reviewer", reviewer)
continue
}

validEmailsSuggReviewers = append(validEmailsSuggReviewers, reviewer)
}

a.log.DebugContext(ctx, "Getting recipients for role", "role", req.GetRoles())
recipients := a.conf.Recipients.GetRawRecipientsFor(req.GetRoles(), validEmailsSuggReviewers)
for _, recipient := range recipients {
if recipient != "" {
Expand Down
Loading
Loading