Skip to content

Commit

Permalink
Migrate access/msteams to slog + add debug logs (#47045)
Browse files Browse the repository at this point in the history
* Migrate access/msteams to slog + add debug logs

* fixup! Migrate access/msteams to slog + add debug logs

* Apply suggestions from code review

* Address timr's feedback

* fixup! Address timr's feedback

* tidy go mod
  • Loading branch information
hugoShaka authored Oct 3, 2024
1 parent 85c6193 commit fd7030a
Show file tree
Hide file tree
Showing 13 changed files with 508 additions and 90 deletions.
118 changes: 64 additions & 54 deletions integrations/access/msteams/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package msteams

import (
"context"
"log/slog"
"time"

"github.com/gravitational/trace"
Expand All @@ -25,7 +26,6 @@ import (
"github.com/gravitational/teleport/integrations/access/common"
"github.com/gravitational/teleport/integrations/access/common/teleport"
"github.com/gravitational/teleport/integrations/lib"
"github.com/gravitational/teleport/integrations/lib/logger"
pd "github.com/gravitational/teleport/integrations/lib/plugindata"
"github.com/gravitational/teleport/integrations/lib/stringset"
"github.com/gravitational/teleport/integrations/lib/watcherjob"
Expand Down Expand Up @@ -53,12 +53,22 @@ type App struct {
watcherJob lib.ServiceJob
pd *pd.CompareAndSwap[PluginData]

log *slog.Logger

*lib.Process
}

// NewApp initializes a new teleport-msteams app and returns it.
func NewApp(conf Config) (*App, error) {
app := &App{conf: conf}
log, err := conf.Log.NewSLogLogger()
if err != nil {
return nil, trace.Wrap(err)
}

app := &App{
conf: conf,
log: log.With("plugin", pluginName),
}

app.mainJob = lib.NewServiceJob(app.run)

Expand All @@ -67,8 +77,7 @@ func NewApp(conf Config) (*App, error) {

// Run starts the main job process
func (a *App) Run(ctx context.Context) error {
log := logger.Get(ctx)
log.Info("Starting Teleport MS Teams Plugin")
a.log.InfoContext(ctx, "Starting Teleport MS Teams Plugin")

err := a.init(ctx)
if err != nil {
Expand Down Expand Up @@ -131,7 +140,7 @@ func (a *App) init(ctx context.Context) error {
webProxyAddr = pong.ProxyPublicAddr
}

a.bot, err = NewBot(a.conf.MSAPI, pong.ClusterName, webProxyAddr)
a.bot, err = NewBot(a.conf.MSAPI, pong.ClusterName, webProxyAddr, a.log)
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -141,8 +150,6 @@ func (a *App) init(ctx context.Context) error {

// initBot initializes bot
func (a *App) initBot(ctx context.Context) error {
log := logger.Get(ctx)

teamsApp, err := a.bot.GetTeamsApp(ctx)
if trace.IsNotFound(err) {
return trace.Wrap(err, "MS Teams app not found in org app store.")
Expand All @@ -151,28 +158,31 @@ func (a *App) initBot(ctx context.Context) error {
return trace.Wrap(err)
}

log.WithField("name", teamsApp.DisplayName).
WithField("id", teamsApp.ID).
Info("MS Teams app found in org app store")
a.log.InfoContext(ctx, "MS Teams app found in org app store",
"name", teamsApp.DisplayName,
"id", teamsApp.ID)

if !a.conf.Preload {
return nil
}

log.Info("Preloading recipient data...")
a.log.InfoContext(ctx, "Preloading recipient data...")

for _, recipient := range a.conf.Recipients.GetAllRawRecipients() {
recipientData, err := a.bot.FetchRecipient(ctx, recipient)
if err != nil {
return trace.Wrap(err)
}
log.WithField("recipient", recipient).
WithField("chat_id", recipientData.Chat.ID).
WithField("kind", recipientData.Kind).
Info("Recipient found, chat found")
a.log.InfoContext(ctx, "Recipient and chat found",
slog.Group("recipient",
"raw", recipient,
"recipient_chat_id", recipientData.Chat.ID,
"recipient_kind", recipientData.Kind,
),
)
}

log.Info("Recipient data preloaded and cached.")
a.log.InfoContext(ctx, "Recipient data preloaded and cached")

return nil
}
Expand All @@ -193,17 +203,15 @@ func (a *App) newWatcherJob() (lib.ServiceJob, error) {

// run starts the main process
func (a *App) run(ctx context.Context) error {
log := logger.Get(ctx)

ok, err := a.watcherJob.WaitReady(ctx)
if err != nil {
return trace.Wrap(err)
}

if ok {
log.Info("Plugin is ready")
a.log.InfoContext(ctx, "Plugin is ready")
} else {
log.Error("Plugin is not ready")
a.log.ErrorContext(ctx, "Plugin is not ready")
}

a.mainJob.SetReady(ok)
Expand All @@ -215,16 +223,15 @@ func (a *App) run(ctx context.Context) error {

// checkTeleportVersion loads Teleport version and checks that it meets the minimal required
func (a *App) checkTeleportVersion(ctx context.Context) (proto.PingResponse, error) {
log := logger.Get(ctx)
log.Debug("Checking Teleport server version")
a.log.DebugContext(ctx, "Checking Teleport server version")

pong, err := a.apiClient.Ping(ctx)
if err != nil {
if trace.IsNotImplemented(err) {
return pong, trace.Wrap(err, "server version must be at least %s", minServerVersion)
}

log.Error("Unable to get Teleport server version")
a.log.ErrorContext(ctx, "Unable to get Teleport server version")
return pong, trace.Wrap(err)
}

Expand All @@ -242,48 +249,44 @@ func (a *App) onWatcherEvent(ctx context.Context, event types.Event) error {

op := event.Type
reqID := event.Resource.GetName()
ctx, _ = logger.WithField(ctx, "request_id", reqID)
log := a.log.With("request_id", reqID, "request_op", op.String())

switch op {
case types.OpPut:
ctx, _ = logger.WithField(ctx, "request_op", "put")
req, ok := event.Resource.(types.AccessRequest)
if !ok {
return trace.Errorf("unexpected resource type %T", event.Resource)
}
ctx, log := logger.WithField(ctx, "request_state", req.GetState().String())

log = log.With("request_state", req.GetState().String())
var err error

switch {
case req.GetState().IsPending():
log.Debug("Pending request received")
log.DebugContext(ctx, "Pending request received")
err = a.onPendingRequest(ctx, req)
case req.GetState().IsApproved():
log.Debug("Approval request received")
log.DebugContext(ctx, "Approval request received")
err = a.onResolvedRequest(ctx, req)
case req.GetState().IsDenied():
log.Debug("Denial request received")
log.DebugContext(ctx, "Denial request received")
err = a.onResolvedRequest(ctx, req)
default:
log.WithField("event", event).Warn("Unknown request state")
log.WarnContext(ctx, "Unknown request state", "event", event)
return nil
}

if err != nil {
log.WithError(err).Errorf("Failed to process request")
log.ErrorContext(ctx, "Failed to process request", "error", err)
return trace.Wrap(err)
}

return nil

case types.OpDelete:
ctx, log := logger.WithField(ctx, "request_op", "delete")

log.Debug("Expiration request received")
log.DebugContext(ctx, "Expiration request received")

if err := a.onDeletedRequest(ctx, reqID); err != nil {
log.WithError(err).Errorf("Failed to process deleted request")
log.ErrorContext(ctx, "Failed to process delete request", "error", err)
return trace.Wrap(err)
}
return nil
Expand All @@ -294,15 +297,16 @@ func (a *App) onWatcherEvent(ctx context.Context, event types.Event) error {

// onPendingRequest is called when there's a new request or a review
func (a *App) onPendingRequest(ctx context.Context, req types.AccessRequest) error {
log := logger.Get(ctx)

id := req.GetName()
data := pd.AccessRequestData{
User: req.GetUser(),
Roles: req.GetRoles(),
RequestReason: req.GetRequestReason(),
}

log := a.log.With("request_id", id)
log.DebugContext(ctx, "Claiming access request", "user", req.GetUser(), "roles", req.GetRoles(), "reason", req.GetRequestReason())

// Let's try to create PluginData. This equals to locking AccessRequest to this
// instance of a plugin.
_, err := a.pd.Create(ctx, id, PluginData{AccessRequestData: data})
Expand All @@ -312,25 +316,30 @@ func (a *App) onPendingRequest(ctx context.Context, req types.AccessRequest) err
if err != nil {
return trace.Wrap(err)
}
log.DebugContext(ctx, "Access request claimed")

recipients := a.getMessageRecipients(ctx, req)

if len(recipients) == 0 {
log.Warning("No recipients to notify")
log.WarnContext(ctx, "No recipients to notify")
} else {
err = a.postMessages(ctx, recipients, id, data)
if err != nil {
return trace.Wrap(err)
}
}
} else {
log.DebugContext(ctx, "Access request already claimed, skipping initial message posting")
}

// Update the received reviews
reviews := req.GetReviews()
if len(reviews) == 0 {
log.DebugContext(ctx, "No access request reviews to process")
return nil
}

log.DebugContext(ctx, "Processing access request reviews", "review_count", len(reviews))
err = a.postReviews(ctx, id, reviews)
if err != nil {
return trace.Wrap(err)
Expand All @@ -343,15 +352,18 @@ func (a *App) onPendingRequest(ctx context.Context, req types.AccessRequest) err
func (a *App) onResolvedRequest(ctx context.Context, req types.AccessRequest) error {
var tag pd.ResolutionTag
state := req.GetState()

log := a.log.With("request_id", req.GetName(), "request_state", state.String())
switch state {
case types.RequestState_APPROVED:
tag = pd.ResolvedApproved
case types.RequestState_DENIED:
tag = pd.ResolvedDenied
default:
logger.Get(ctx).Warningf("Unknown state %v (%s)", state, state.String())
log.WarnContext(ctx, "Unknown request state")
return trace.Errorf("Unknown state")
}
log.DebugContext(ctx, "Updating messages to mark the request resolved")
err := a.updateMessages(ctx, req.GetName(), tag, req.GetResolveReason(), req.GetReviews())
if err != nil {
return trace.Wrap(err)
Expand All @@ -361,6 +373,7 @@ func (a *App) onResolvedRequest(ctx context.Context, req types.AccessRequest) er

// onDeleteRequest gets called when a request is deleted
func (a *App) onDeletedRequest(ctx context.Context, reqID string) error {
a.log.DebugContext(ctx, "Updating messages to mark the request deleted/expired", "request_id", reqID, "request_state", pd.ResolvedExpired)
return a.updateMessages(ctx, reqID, pd.ResolvedExpired, "", nil)
}

Expand All @@ -369,19 +382,17 @@ func (a *App) postMessages(ctx context.Context, recipients []string, id string,
teamsData, err := a.bot.PostMessages(ctx, recipients, id, data)
if err != nil {
if len(teamsData) == 0 {
// TODO: add better logging here
a.log.ErrorContext(ctx, "Failed to post all messages to MS Teams")
return trace.Wrap(err)
}

logger.Get(ctx).WithError(err).Error("Failed to post one or more messages to MS Teams")
a.log.ErrorContext(ctx, "Failed to post one or more messages to MS Teams, continuing")
}

for _, data := range teamsData {
logger.Get(ctx).WithFields(logger.Fields{
"id": data.ID,
"timestamp": data.Timestamp,
"recipient": data.RecipientID,
}).Info("Successfully posted to MS Teams")
a.log.InfoContext(ctx, "Successfully posted to MS Teams",
"id", data.ID,
"timestamp", data.Timestamp,
"recipient", data.RecipientID)
}

// Let's update sent messages data
Expand All @@ -395,6 +406,7 @@ func (a *App) postMessages(ctx context.Context, recipients []string, id string,

// postReviews updates a message with reviews
func (a *App) postReviews(ctx context.Context, id string, reviews []types.AccessReview) error {
a.log.DebugContext(ctx, "Looking for reviews that need to be posted", "review_count", len(reviews))
pluginData, err := a.pd.Update(ctx, id, func(existing PluginData) (PluginData, error) {
teamsData := existing.TeamsData
if len(teamsData) == 0 {
Expand Down Expand Up @@ -428,8 +440,6 @@ func (a *App) postReviews(ctx context.Context, id string, reviews []types.Access

// updateMessages updates the messages status and adds the resolve reason.
func (a *App) updateMessages(ctx context.Context, reqID string, tag pd.ResolutionTag, reason string, reviews []types.AccessReview) error {
log := logger.Get(ctx)

pluginData, err := a.pd.Update(ctx, reqID, func(existing PluginData) (PluginData, error) {
// No teamsData found in the plugin data. This might be because of a race condition
// (messages not sent yet) or because sending failed (msapi error or no recipient)
Expand Down Expand Up @@ -458,29 +468,29 @@ func (a *App) updateMessages(ctx context.Context, reqID string, tag pd.Resolutio
return trace.Wrap(err)
}

log.Infof("Successfully marked request as %s in all messages", tag)
a.log.InfoContext(ctx, "Successfully updated all messages with the resolution", "resolution_tag", tag)

return nil
}

// getMessageRecipients returns a recipients list for the access request
func (a *App) getMessageRecipients(ctx context.Context, req types.AccessRequest) []string {
log := logger.Get(ctx)

// We receive a set from GetRawRecipientsFor but we still might end up with duplicate channel names.
// This can happen if this set contains the channel `C` and the email for channel `C`.
recipientSet := stringset.New()

a.log.DebugContext(ctx, "Getting suggested reviewer recipients")
var validEmailsSuggReviewers []string
for _, reviewer := range req.GetSuggestedReviewers() {
if !lib.IsEmail(reviewer) {
log.Warningf("Failed to notify a suggested reviewer: %q does not look like a valid email", reviewer)
a.log.WarnContext(ctx, "Failed to notify a suggested reviewer, does not look like a valid email", "reviewer", reviewer)
continue
}

validEmailsSuggReviewers = append(validEmailsSuggReviewers, reviewer)
}

a.log.DebugContext(ctx, "Getting recipients for role", "role", req.GetRoles())
recipients := a.conf.Recipients.GetRawRecipientsFor(req.GetRoles(), validEmailsSuggReviewers)
for _, recipient := range recipients {
if recipient != "" {
Expand Down
Loading

0 comments on commit fd7030a

Please sign in to comment.