diff --git a/cri/cri.go b/cri/cri.go index 7c8abdd..412ab79 100644 --- a/cri/cri.go +++ b/cri/cri.go @@ -1,23 +1,13 @@ package cri import ( - "bufio" - "bytes" "context" "encoding/json" "fmt" - "io" - "net" - "os" - "path/filepath" - "strconv" - "strings" - "sync" "time" "github.com/ddosify/alaz/log" - "github.com/fsnotify/fsnotify" //nolint:staticcheck - + //nolint:staticcheck //nolint:staticcheck internalapi "k8s.io/cri-api/pkg/apis" pb "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -29,28 +19,14 @@ var defaultRuntimeEndpoints = []string{"unix:///proc/1/root/run/containerd/conta "unix:///proc/1/root/var/run/crio/crio.sock", "unix:///proc/1/root/run/crio/crio.sock", "unix:///proc/1/root/run/cri-dockerd.sock", "unix:///proc/1/root/var/run/cri-dockerd.sock"} -type fileReader struct { - mu sync.Mutex - *bufio.Reader -} - -type containerPodInfo struct { - podUid string - podName string - podNs string +type ContainerPodInfo struct { + PodUid string + PodName string + PodNs string } type CRITool struct { rs internalapi.RuntimeService - - connPool *channelPool - watcher *fsnotify.Watcher - - logPathToFile map[string]*fileReader - logPathToContainerMeta map[string]string - containerIdToLogPath map[string]string - - done chan struct{} } func NewCRITool(ctx context.Context) (*CRITool, error) { @@ -71,56 +47,12 @@ func NewCRITool(ctx context.Context) (*CRITool, error) { return nil, err } - logBackend := os.Getenv("LOG_BACKEND") - if logBackend == "" { - logBackend = "log-backend.ddosify:8282" - } - - dialer := &net.Dialer{ - Timeout: 5 * time.Second, - } - - connPool, err := NewChannelPool(5, 30, func() (net.Conn, error) { - return dialer.Dial("tcp", logBackend) - }) - - if err != nil { - log.Logger.Fatal().Err(err).Msg("failed to create connection pool") - } - - watcher, err := fsnotify.NewWatcher() - if err != nil { - log.Logger.Fatal().Err(err).Msg("failed to create fsnotify watcher") - } - - done := make(chan struct{}) - go func() { - <-ctx.Done() - watcher.Close() - connPool.Close() - close(done) - }() - - logPathToFile := make(map[string]*fileReader, 0) - logPathToContainerMeta := make(map[string]string, 0) - containerIdToLogPath := make(map[string]string, 0) - return &CRITool{ - rs: res, - connPool: connPool, - watcher: watcher, - logPathToFile: logPathToFile, - logPathToContainerMeta: logPathToContainerMeta, - containerIdToLogPath: containerIdToLogPath, - done: done, + rs: res, }, nil } -func (ct *CRITool) Done() chan struct{} { - return ct.done -} - -func (ct *CRITool) getAllContainers() ([]*pb.Container, error) { +func (ct *CRITool) GetAllContainers() ([]*pb.Container, error) { // get running containers st := &pb.ContainerStateValue{} st.State = pb.ContainerState_CONTAINER_RUNNING @@ -144,7 +76,7 @@ func (ct *CRITool) getAllContainers() ([]*pb.Container, error) { // get log path of container // id string : containerID -func (ct *CRITool) getLogPath(id string) (string, error) { +func (ct *CRITool) GetLogPath(id string) (string, error) { if id == "" { return "", fmt.Errorf("containerID cannot be empty") } @@ -160,7 +92,7 @@ func (ct *CRITool) getLogPath(id string) (string, error) { return fmt.Sprintf("/proc/1/root%s", r.Status.LogPath), nil } -func (ct *CRITool) containerStatus(id string) (*containerPodInfo, error) { +func (ct *CRITool) ContainerStatus(id string) (*ContainerPodInfo, error) { if id == "" { return nil, fmt.Errorf("ID cannot be empty") } @@ -186,10 +118,10 @@ func (ct *CRITool) containerStatus(id string) (*containerPodInfo, error) { podName := podRes.Status.Metadata.Name podNamespace := podRes.Status.Metadata.Namespace - return &containerPodInfo{ - podUid: podUid, - podName: podName, - podNs: podNamespace, + return &ContainerPodInfo{ + PodUid: podUid, + PodName: podName, + PodNs: podNamespace, }, nil } @@ -231,277 +163,3 @@ func (ct *CRITool) getPod(podUid string) ([]*pb.PodSandbox, error) { return ct.rs.ListPodSandbox(context.Background(), filter) } - -// podUid -// containerName -// which version of container, 0,1,2... -func getContainerMetadataLine(podNs, podName, podUid, containerName string, num int) string { - return fmt.Sprintf("**AlazLogs_%s_%s_%s_%s_%d**\n", podNs, podName, podUid, containerName, num) -} - -func (ct *CRITool) readerForLogPath(logPath string) (*fileReader, error) { - if reader, ok := ct.logPathToFile[logPath]; ok { - return reader, nil - } - - file, err := os.Open(logPath) - if err != nil { - return nil, err - } - - file.Seek(0, io.SeekEnd) // seek to end of file - reader := bufio.NewReader(file) - ct.logPathToFile[logPath] = &fileReader{ - mu: sync.Mutex{}, - Reader: reader, - } - - return ct.logPathToFile[logPath], nil -} - -func (ct *CRITool) watchContainer(id string, name string) error { - logPath, err := ct.getLogPath(id) - if err != nil { - log.Logger.Error().Err(err).Msgf("Failed to get log path for container %s", id) - return err - } - - _, err = ct.readerForLogPath(logPath) - if err != nil { - log.Logger.Error().Err(err).Msgf("Failed to get reader for log path %s", logPath) - return err - } - - err = ct.watcher.Add(logPath) - if err != nil { - log.Logger.Error().Err(err).Msgf("Failed to add log path %s to watcher", logPath) - return err - } - ct.containerIdToLogPath[id] = logPath - - fileName := filepath.Base(logPath) - fileNameWithoutExt := strings.TrimSuffix(fileName, filepath.Ext(fileName)) - suffixNum, err := strconv.Atoi(fileNameWithoutExt) - if err != nil { - log.Logger.Error().Err(err).Msgf("Failed to parse numeric part of log file name %s", fileName) - } - - resp, err := ct.containerStatus(id) - if err != nil { - log.Logger.Error().Err(err).Msgf("Failed to get container status for container %s", id) - return err - } - - ct.logPathToContainerMeta[logPath] = getContainerMetadataLine(resp.podNs, resp.podName, resp.podUid, name, suffixNum) - return nil -} - -func (ct *CRITool) unwatchContainer(id string) { - logPath := ct.containerIdToLogPath[id] - log.Logger.Info().Msgf("removing container: %s, %s", id, logPath) - - // we must read until EOF and then remove the reader - // otherwise the last logs may be lost - // trigger manually - ct.sendLogs(logPath) - log.Logger.Info().Msgf("manually read for last time for %s", logPath) - - ct.watcher.Remove(logPath) - - // close reader - if reader, ok := ct.logPathToFile[logPath]; ok { - reader.Reset(nil) - delete(ct.logPathToFile, logPath) - } - - delete(ct.logPathToContainerMeta, logPath) - delete(ct.containerIdToLogPath, id) -} - -func (ct *CRITool) sendLogs(logPath string) error { - var err error - var poolConn *PoolConn = nil - - t := 1 - for { - poolConn, err = ct.connPool.Get() - if err != nil { - log.Logger.Error().Err(err).Msgf("connect failed, retryconn..") - time.Sleep(time.Duration(t) * time.Second) - t *= 2 - continue - } - if poolConn == nil { - log.Logger.Info().Msgf("poolConn is nil, retryconn..") - time.Sleep(time.Duration(t) * time.Second) - t *= 2 - continue - } - break - } - - defer func() { - if poolConn != nil && poolConn.unusable { - log.Logger.Info().Msgf("connection is unusable, closing..") - err := poolConn.Close() - if err != nil { - log.Logger.Error().Err(err).Msgf("Failed to close connection") - } - } - }() - - // send metadata first - metaLine := ct.logPathToContainerMeta[logPath] - _, err = io.Copy(poolConn, bytes.NewBufferString(metaLine)) - if err != nil { - log.Logger.Error().Err(err).Msgf("metadata could not be sent to backend: %v", err) - poolConn.MarkUnusable() - return err - } - - // send logs - reader, ok := ct.logPathToFile[logPath] - if !ok || reader == nil { - log.Logger.Error().Msgf("reader for log path %s is not found", logPath) - return err - } - - reader.mu.Lock() - _, err = io.Copy(poolConn, reader) - if err != nil { - log.Logger.Error().Err(err).Msgf("logs could not be sent to backend: %v", err) - poolConn.MarkUnusable() - reader.mu.Unlock() - return err - } - reader.mu.Unlock() - - // put the connection back to the pool, closes if unusable - poolConn.Close() - return nil -} - -func (ct *CRITool) StreamLogs() error { - containers, err := ct.getAllContainers() - if err != nil { - return err - } - log.Logger.Info().Msg("watching containers") - - for _, c := range containers { - err := ct.watchContainer(c.Id, c.Metadata.Name) - if err != nil { - log.Logger.Error().Err(err).Msgf("Failed to watch container %s, %s", c.Id, c.Metadata.Name) - } - } - - // listen for new containers - go func() { - // poll every 10 seconds - for { - time.Sleep(10 * time.Second) - - containers, err := ct.getAllContainers() - if err != nil { - log.Logger.Error().Err(err).Msgf("Failed to get all containers") - continue - } - - // current containers that are being watched - currentContainerIds := make(map[string]struct{}, 0) - for id, _ := range ct.containerIdToLogPath { - currentContainerIds[id] = struct{}{} - } - - aliveContainers := make(map[string]struct{}, 0) - - for _, c := range containers { - aliveContainers[c.Id] = struct{}{} - if _, ok := currentContainerIds[c.Id]; ok { - continue - } else { - // new container - log.Logger.Info().Msgf("new container found: %s, %s", c.Id, c.Metadata.Name) - err := ct.watchContainer(c.Id, c.Metadata.Name) - if err != nil { - log.Logger.Error().Err(err).Msgf("Failed to watch new container %s, %s", c.Id, c.Metadata.Name) - } - } - } - - for id := range currentContainerIds { - if _, ok := aliveContainers[id]; !ok { - // container is gone - ct.unwatchContainer(id) - } - } - } - }() - - // start listening for fsnotify events - go func() { - worker := 10 - // start workers - for i := 0; i < worker; i++ { - go func() { - for { - select { - case event, ok := <-ct.watcher.Events: - if !ok { - return - } - - if event.Has(fsnotify.Rename) { // logrotate case - logPath := event.Name - // containerd compresses logs, and recreates the file, it comes as a rename event - for { - _, err := os.Stat(logPath) - if err == nil { - break - } else { - log.Logger.Info().Msgf("waiting for file to be created on rename: %s", logPath) - } - time.Sleep(1 * time.Second) - } - - logFile, err := os.Open(logPath) // reopen file - if err != nil { - log.Logger.Error().Err(err).Msgf("Failed to reopen file on rename event: %s", logPath) - continue - } - - err = ct.watcher.Add(logPath) - if err != nil { - log.Logger.Error().Err(err).Msgf("Failed to add log path %s to watcher", logPath) - continue - } - logFile.Seek(0, io.SeekEnd) // seek to end of file - ct.logPathToFile[logPath] = &fileReader{ - mu: sync.Mutex{}, - Reader: bufio.NewReader(logFile), - } - - log.Logger.Info().Msgf("reopened file for rename: %s", logPath) - continue - } else if event.Has(fsnotify.Write) { - // TODO: apps that writes too much logs might block small applications and causes lag on small apps logs - // we don't have to read logs on every write event ?? - - err := ct.sendLogs(event.Name) - if err != nil { - log.Logger.Error().Err(err).Msgf("Failed to send logs for %s", event.Name) - } - } - case err, ok := <-ct.watcher.Errors: - if !ok { - return - } - log.Logger.Error().Err(err).Msgf("fsnotify error") - } - } - }() - } - }() - - return nil -} diff --git a/cri/pool.go b/logstreamer/pool.go similarity index 99% rename from cri/pool.go rename to logstreamer/pool.go index 70018b8..f2dd590 100644 --- a/cri/pool.go +++ b/logstreamer/pool.go @@ -1,4 +1,4 @@ -package cri +package logstreamer import ( "errors" diff --git a/logstreamer/stream.go b/logstreamer/stream.go new file mode 100644 index 0000000..69b9208 --- /dev/null +++ b/logstreamer/stream.go @@ -0,0 +1,361 @@ +package logstreamer + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "net" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "github.com/ddosify/alaz/log" + + "github.com/ddosify/alaz/cri" + "github.com/fsnotify/fsnotify" +) + +type LogStreamer struct { + critool *cri.CRITool + + connPool *channelPool + watcher *fsnotify.Watcher + + logPathToFile map[string]*fileReader + logPathToContainerMeta map[string]string + containerIdToLogPath map[string]string + + done chan struct{} +} + +type fileReader struct { + mu sync.Mutex + *bufio.Reader +} + +func NewLogStreamer(ctx context.Context, critool *cri.CRITool) *LogStreamer { + ls := &LogStreamer{ + critool: critool, + } + + logBackend := os.Getenv("LOG_BACKEND") + if logBackend == "" { + logBackend = "log-backend.ddosify:8282" + } + + dialer := &net.Dialer{ + Timeout: 5 * time.Second, + } + + connPool, err := NewChannelPool(5, 30, func() (net.Conn, error) { + return dialer.Dial("tcp", logBackend) + }) + ls.connPool = connPool + + if err != nil { + log.Logger.Fatal().Err(err).Msg("failed to create connection pool") + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Logger.Fatal().Err(err).Msg("failed to create fsnotify watcher") + } + ls.watcher = watcher + + done := make(chan struct{}) + go func() { + <-ctx.Done() + watcher.Close() + connPool.Close() + close(done) + }() + + ls.logPathToFile = make(map[string]*fileReader, 0) + ls.logPathToContainerMeta = make(map[string]string, 0) + ls.containerIdToLogPath = make(map[string]string, 0) + + return ls +} + +func (ls *LogStreamer) Done() chan struct{} { + return ls.done +} + +func (ls *LogStreamer) watchContainer(id string, name string) error { + logPath, err := ls.critool.GetLogPath(id) + if err != nil { + log.Logger.Error().Err(err).Msgf("Failed to get log path for container %s", id) + return err + } + + _, err = ls.readerForLogPath(logPath) + if err != nil { + log.Logger.Error().Err(err).Msgf("Failed to get reader for log path %s", logPath) + return err + } + + err = ls.watcher.Add(logPath) + if err != nil { + log.Logger.Error().Err(err).Msgf("Failed to add log path %s to watcher", logPath) + return err + } + ls.containerIdToLogPath[id] = logPath + + fileName := filepath.Base(logPath) + fileNameWithoutExt := strings.TrimSuffix(fileName, filepath.Ext(fileName)) + suffixNum, err := strconv.Atoi(fileNameWithoutExt) + if err != nil { + log.Logger.Error().Err(err).Msgf("Failed to parse numeric part of log file name %s", fileName) + } + + resp, err := ls.critool.ContainerStatus(id) + if err != nil { + log.Logger.Error().Err(err).Msgf("Failed to get container status for container %s", id) + return err + } + + ls.logPathToContainerMeta[logPath] = getContainerMetadataLine(resp.PodNs, resp.PodName, resp.PodUid, name, suffixNum) + return nil +} + +func (ls *LogStreamer) sendLogs(logPath string) error { + var err error + var poolConn *PoolConn = nil + + t := 1 + for { + poolConn, err = ls.connPool.Get() + if err != nil { + log.Logger.Error().Err(err).Msgf("connect failed, retryconn..") + time.Sleep(time.Duration(t) * time.Second) + t *= 2 + continue + } + if poolConn == nil { + log.Logger.Info().Msgf("poolConn is nil, retryconn..") + time.Sleep(time.Duration(t) * time.Second) + t *= 2 + continue + } + break + } + + defer func() { + if poolConn != nil && poolConn.unusable { + log.Logger.Info().Msgf("connection is unusable, closing..") + err := poolConn.Close() + if err != nil { + log.Logger.Error().Err(err).Msgf("Failed to close connection") + } + } + }() + + // send metadata first + metaLine := ls.logPathToContainerMeta[logPath] + _, err = io.Copy(poolConn, bytes.NewBufferString(metaLine)) + if err != nil { + log.Logger.Error().Err(err).Msgf("metadata could not be sent to backend: %v", err) + poolConn.MarkUnusable() + return err + } + + // send logs + reader, ok := ls.logPathToFile[logPath] + if !ok || reader == nil { + log.Logger.Error().Msgf("reader for log path %s is not found", logPath) + return err + } + + reader.mu.Lock() + _, err = io.Copy(poolConn, reader) + if err != nil { + log.Logger.Error().Err(err).Msgf("logs could not be sent to backend: %v", err) + poolConn.MarkUnusable() + reader.mu.Unlock() + return err + } + reader.mu.Unlock() + + // put the connection back to the pool, closes if unusable + poolConn.Close() + return nil +} + +func (ls *LogStreamer) unwatchContainer(id string) { + logPath := ls.containerIdToLogPath[id] + log.Logger.Info().Msgf("removing container: %s, %s", id, logPath) + + // we must read until EOF and then remove the reader + // otherwise the last logs may be lost + // trigger manually + ls.sendLogs(logPath) + log.Logger.Info().Msgf("manually read for last time for %s", logPath) + + ls.watcher.Remove(logPath) + + // close reader + if reader, ok := ls.logPathToFile[logPath]; ok { + reader.Reset(nil) + delete(ls.logPathToFile, logPath) + } + + delete(ls.logPathToContainerMeta, logPath) + delete(ls.containerIdToLogPath, id) +} + +func (ls *LogStreamer) readerForLogPath(logPath string) (*fileReader, error) { + if reader, ok := ls.logPathToFile[logPath]; ok { + return reader, nil + } + + file, err := os.Open(logPath) + if err != nil { + return nil, err + } + + file.Seek(0, io.SeekEnd) // seek to end of file + reader := bufio.NewReader(file) + ls.logPathToFile[logPath] = &fileReader{ + mu: sync.Mutex{}, + Reader: reader, + } + + return ls.logPathToFile[logPath], nil +} + +func (ls *LogStreamer) StreamLogs() error { + containers, err := ls.critool.GetAllContainers() + if err != nil { + return err + } + log.Logger.Info().Msg("watching containers") + + for _, c := range containers { + err := ls.watchContainer(c.Id, c.Metadata.Name) + if err != nil { + log.Logger.Error().Err(err).Msgf("Failed to watch container %s, %s", c.Id, c.Metadata.Name) + } + } + + // listen for new containers + go func() { + // poll every 10 seconds + for { + time.Sleep(10 * time.Second) + + containers, err := ls.critool.GetAllContainers() + if err != nil { + log.Logger.Error().Err(err).Msgf("Failed to get all containers") + continue + } + + // current containers that are being watched + currentContainerIds := make(map[string]struct{}, 0) + for id, _ := range ls.containerIdToLogPath { + currentContainerIds[id] = struct{}{} + } + + aliveContainers := make(map[string]struct{}, 0) + + for _, c := range containers { + aliveContainers[c.Id] = struct{}{} + if _, ok := currentContainerIds[c.Id]; ok { + continue + } else { + // new container + log.Logger.Info().Msgf("new container found: %s, %s", c.Id, c.Metadata.Name) + err := ls.watchContainer(c.Id, c.Metadata.Name) + if err != nil { + log.Logger.Error().Err(err).Msgf("Failed to watch new container %s, %s", c.Id, c.Metadata.Name) + } + } + } + + for id := range currentContainerIds { + if _, ok := aliveContainers[id]; !ok { + // container is gone + ls.unwatchContainer(id) + } + } + } + }() + + // start listening for fsnotify events + go func() { + worker := 10 + // start workers + for i := 0; i < worker; i++ { + go func() { + for { + select { + case event, ok := <-ls.watcher.Events: + if !ok { + return + } + + if event.Has(fsnotify.Rename) { // logrotate case + logPath := event.Name + // containerd compresses logs, and recreates the file, it comes as a rename event + for { + _, err := os.Stat(logPath) + if err == nil { + break + } else { + log.Logger.Info().Msgf("waiting for file to be created on rename: %s", logPath) + } + time.Sleep(1 * time.Second) + } + + logFile, err := os.Open(logPath) // reopen file + if err != nil { + log.Logger.Error().Err(err).Msgf("Failed to reopen file on rename event: %s", logPath) + continue + } + + err = ls.watcher.Add(logPath) + if err != nil { + log.Logger.Error().Err(err).Msgf("Failed to add log path %s to watcher", logPath) + continue + } + logFile.Seek(0, io.SeekEnd) // seek to end of file + ls.logPathToFile[logPath] = &fileReader{ + mu: sync.Mutex{}, + Reader: bufio.NewReader(logFile), + } + + log.Logger.Info().Msgf("reopened file for rename: %s", logPath) + continue + } else if event.Has(fsnotify.Write) { + // TODO: apps that writes too much logs might block small applications and causes lag on small apps logs + // we don't have to read logs on every write event ?? + + err := ls.sendLogs(event.Name) + if err != nil { + log.Logger.Error().Err(err).Msgf("Failed to send logs for %s", event.Name) + } + } + case err, ok := <-ls.watcher.Errors: + if !ok { + return + } + log.Logger.Error().Err(err).Msgf("fsnotify error") + } + } + }() + } + }() + + return nil +} + +// podUid +// containerName +// which version of container, 0,1,2... +func getContainerMetadataLine(podNs, podName, podUid, containerName string, num int) string { + return fmt.Sprintf("**AlazLogs_%s_%s_%s_%s_%d**\n", podNs, podName, podUid, containerName, num) +} diff --git a/main.go b/main.go index 134555a..cce0e62 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,7 @@ import ( "github.com/ddosify/alaz/datastore" "github.com/ddosify/alaz/ebpf" "github.com/ddosify/alaz/k8s" + "github.com/ddosify/alaz/logstreamer" "context" @@ -90,20 +91,25 @@ func main() { } var ct *cri.CRITool + if logsEnabled { ct, err = cri.NewCRITool(ctx) if err != nil { log.Logger.Error().Err(err).Msg("failed to create cri tool") - } else { - go func() { - err := ct.StreamLogs() - if err != nil { - log.Logger.Error().Err(err).Msg("failed to stream logs") - } - }() } } + var ls *logstreamer.LogStreamer + if logsEnabled && ct != nil { + ls = logstreamer.NewLogStreamer(ctx, ct) + go func() { + err := ls.StreamLogs() + if err != nil { + log.Logger.Error().Err(err).Msg("failed to stream logs") + } + }() + } + go http.ListenAndServe(":8181", nil) if k8sCollectorEnabled { @@ -116,8 +122,8 @@ func main() { log.Logger.Info().Msg("ebpfCollector done") } - if logsEnabled { - <-ct.Done() + if logsEnabled && ls != nil { + <-ls.Done() log.Logger.Info().Msg("cri done") }