Skip to content

Commit

Permalink
fix: reconnect to the logs stream in dashboard after reboot
Browse files Browse the repository at this point in the history
The log stream displayed in the dashboard was stopping to work when a node was rebooted.
Rework the log data source to establish a per-node connection and use a retry loop to always reconnect until the dashboard is terminated.

Print the connection errors in the log stream in red color.

Closes #8388.

Signed-off-by: Utku Ozdemir <[email protected]>
(cherry picked from commit 3735add)
  • Loading branch information
utkuozdemir authored and smira committed Apr 12, 2024
1 parent 5019c9f commit 028a5b4
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 71 deletions.
10 changes: 8 additions & 2 deletions internal/pkg/dashboard/components/logviewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func NewLogViewer() *LogViewer {
}

widget.logs.ScrollToEnd().
SetDynamicColors(true).
SetMaxLines(maxLogLines).
SetText(noData).
SetBorderPadding(0, 0, 1, 1).
Expand Down Expand Up @@ -54,7 +55,12 @@ func NewLogViewer() *LogViewer {
}

// WriteLog writes the log line to the widget.
func (widget *LogViewer) WriteLog(logLine string) {
func (widget *LogViewer) WriteLog(logLine, logError string) {
if logError != "" {
logLine = "[red]" + tview.Escape(logError) + "[-]\n"
} else {
logLine = tview.Escape(logLine) + "\n"
}

widget.logs.Write([]byte(logLine)) //nolint:errcheck
widget.logs.Write([]byte("\n")) //nolint:errcheck
}
15 changes: 6 additions & 9 deletions internal/pkg/dashboard/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type ResourceDataListener interface {

// LogDataListener is a listener which is notified when a log line is received.
type LogDataListener interface {
OnLogDataChange(node string, logLine string)
OnLogDataChange(node, logLine, logError string)
}

// NodeSelectListener is a listener which is notified when a node is selected.
Expand Down Expand Up @@ -376,10 +376,7 @@ func (d *Dashboard) startDataHandler(ctx context.Context) func() error {
defer d.resourceDataSource.Stop() //nolint:errcheck

// start logs data source
if err := d.logDataSource.Start(ctx); err != nil {
return err
}

d.logDataSource.Start(ctx)
defer d.logDataSource.Stop() //nolint:errcheck

lastLogTime := time.Now()
Expand All @@ -393,11 +390,11 @@ func (d *Dashboard) startDataHandler(ctx context.Context) func() error {

if time.Since(lastLogTime) < 50*time.Millisecond {
d.app.QueueUpdate(func() {
d.processLog(nodeAlias, nodeLog.Log)
d.processLog(nodeAlias, nodeLog.Log, nodeLog.Error)
})
} else {
d.app.QueueUpdateDraw(func() {
d.processLog(nodeAlias, nodeLog.Log)
d.processLog(nodeAlias, nodeLog.Log, nodeLog.Error)
})
}

Expand Down Expand Up @@ -461,9 +458,9 @@ func (d *Dashboard) processNodeResource(nodeResource resourcedata.Data) {
}

// processLog re-renders the log components with new log data.
func (d *Dashboard) processLog(node, line string) {
func (d *Dashboard) processLog(node, logLine, logError string) {
for _, component := range d.logDataListeners {
component.OnLogDataChange(node, line)
component.OnLogDataChange(node, logLine, logError)
}
}

Expand Down
94 changes: 64 additions & 30 deletions internal/pkg/dashboard/logdata/logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,26 @@ package logdata
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/siderolabs/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/siderolabs/talos/internal/pkg/dashboard/util"
"github.com/siderolabs/talos/pkg/machinery/api/common"
"github.com/siderolabs/talos/pkg/machinery/client"
)

// Data is a log line from a node.
type Data struct {
Node string
Log string
Node string
Log string
Error string
}

// Source is a data source for Kernel (dmesg) logs.
Expand All @@ -45,14 +51,10 @@ func NewSource(client *client.Client) *Source {
}

// Start starts the data source.
func (source *Source) Start(ctx context.Context) error {
var err error

func (source *Source) Start(ctx context.Context) {
source.once.Do(func() {
err = source.start(ctx)
source.start(ctx)
})

return err
}

// Stop stops the data source.
Expand All @@ -62,38 +64,70 @@ func (source *Source) Stop() error {
return source.eg.Wait()
}

func (source *Source) start(ctx context.Context) error {
func (source *Source) start(ctx context.Context) {
ctx, source.logCtxCancel = context.WithCancel(ctx)

dmesgStream, err := source.client.Dmesg(ctx, true, false)
if err != nil {
return err
for _, nodeContext := range util.NodeContexts(ctx) {
source.eg.Go(func() error {
return source.tailNodeWithRetries(nodeContext.Ctx, nodeContext.Node)
})
}
}

source.eg.Go(func() error {
return helpers.ReadGRPCStream(dmesgStream, func(data *common.Data, node string, multipleNodes bool) error {
if len(data.Bytes) == 0 {
return nil
}
func (source *Source) tailNodeWithRetries(ctx context.Context, node string) error {
for {
readErr := source.readDmesg(ctx, node)
if errors.Is(readErr, context.Canceled) || status.Code(readErr) == codes.Canceled {
return nil
}

line := strings.TrimSpace(string(data.Bytes))
if line == "" {
return nil
}
if readErr != nil {
source.LogCh <- Data{Node: node, Error: readErr.Error()}
}

select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.Canceled) {
return nil
}
// back off a bit before retrying
sleepWithContext(ctx, 30*time.Second)
}
}

return ctx.Err()
case source.LogCh <- Data{Node: node, Log: line}:
}
func (source *Source) readDmesg(ctx context.Context, node string) error {
dmesgStream, err := source.client.Dmesg(ctx, true, false)
if err != nil {
return fmt.Errorf("dashboard: error opening dmesg stream: %w", err)
}

readErr := helpers.ReadGRPCStream(dmesgStream, func(data *common.Data, _ string, _ bool) error {
if len(data.Bytes) == 0 {
return nil
})
}

line := strings.TrimSpace(string(data.Bytes))
if line == "" {
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
case source.LogCh <- Data{Node: node, Log: line}:
}

return nil
})
if readErr != nil {
return fmt.Errorf("error reading dmesg stream: %w", readErr)
}

return nil
}

func sleepWithContext(ctx context.Context, d time.Duration) {
timer := time.NewTimer(d)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
}
}
31 changes: 3 additions & 28 deletions internal/pkg/dashboard/resourcedata/resourcedata.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ import (
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/channel"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/metadata"

"github.com/siderolabs/talos/pkg/machinery/client"
"github.com/siderolabs/talos/internal/pkg/dashboard/util"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/resources/cluster"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
Expand Down Expand Up @@ -70,10 +69,9 @@ func (source *Source) run(ctx context.Context) {

source.NodeResourceCh = source.ch

nodes := source.nodes(ctx)
for _, node := range nodes {
for _, nodeContext := range util.NodeContexts(ctx) {
source.eg.Go(func() error {
source.runResourceWatchWithRetries(ctx, node)
source.runResourceWatchWithRetries(nodeContext.Ctx, nodeContext.Node)

return nil
})
Expand Down Expand Up @@ -101,10 +99,6 @@ func (source *Source) runResourceWatchWithRetries(ctx context.Context, node stri

//nolint:gocyclo,cyclop
func (source *Source) runResourceWatch(ctx context.Context, node string) error {
if node != "" {
ctx = client.WithNode(ctx, node)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -211,22 +205,3 @@ func (source *Source) runResourceWatch(ctx context.Context, node string) error {
}
}
}

func (source *Source) nodes(ctx context.Context) []string {
md, mdOk := metadata.FromOutgoingContext(ctx)
if !mdOk {
return []string{""} // local node
}

nodeVal := md.Get("node")
if len(nodeVal) > 0 {
return []string{nodeVal[0]}
}

nodesVal := md.Get("nodes")
if len(nodesVal) == 0 {
return []string{""} // local node
}

return nodesVal
}
4 changes: 2 additions & 2 deletions internal/pkg/dashboard/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (widget *SummaryGrid) OnResourceDataChange(nodeResource resourcedata.Data)
}

// OnLogDataChange implements the LogDataListener interface.
func (widget *SummaryGrid) OnLogDataChange(node string, logLine string) {
widget.logViewer(node).WriteLog(logLine)
func (widget *SummaryGrid) OnLogDataChange(node, logLine, logError string) {
widget.logViewer(node).WriteLog(logLine, logError)
}

func (widget *SummaryGrid) updateLogViewer() {
Expand Down
49 changes: 49 additions & 0 deletions internal/pkg/dashboard/util/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package util provides utility functions for the dashboard.
package util

import (
"context"

"google.golang.org/grpc/metadata"

"github.com/siderolabs/talos/pkg/machinery/client"
)

// NodeContext contains the context.Context for a single node and the node name.
type NodeContext struct {
Ctx context.Context //nolint:containedctx
Node string
}

// NodeContexts returns a list of NodeContexts from the given context.
//
// It extracts the node names from the outgoing GRPC context metadata.
// If the node name is not present in the metadata, context will be returned as-is with an empty node name.
func NodeContexts(ctx context.Context) []NodeContext {
md, mdOk := metadata.FromOutgoingContext(ctx)
if !mdOk {
return []NodeContext{{Ctx: ctx}}
}

nodeVal := md.Get("node")
if len(nodeVal) > 0 {
return []NodeContext{{Ctx: ctx, Node: nodeVal[0]}}
}

nodesVal := md.Get("nodes")
if len(nodesVal) == 0 {
return []NodeContext{{Ctx: ctx}}
}

nodeContexts := make([]NodeContext, 0, len(nodesVal))

for _, node := range nodesVal {
nodeContexts = append(nodeContexts, NodeContext{Ctx: client.WithNode(ctx, node), Node: node})
}

return nodeContexts
}

0 comments on commit 028a5b4

Please sign in to comment.