From da46890301b8fbcbcbd9ebb8b172732fe0b6fd54 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sat, 12 Oct 2024 09:18:05 -0700 Subject: [PATCH 1/4] fix resource leak --- pkg/handlers/handlers.go | 9 ++---- pkg/sources/git/git.go | 60 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/pkg/handlers/handlers.go b/pkg/handlers/handlers.go index 8f1f5e106bef..d379a1338f0a 100644 --- a/pkg/handlers/handlers.go +++ b/pkg/handlers/handlers.go @@ -285,14 +285,9 @@ func HandleFile( } defer func() { // Ensure all data is read to prevent broken pipe. - _, copyErr := io.Copy(io.Discard, rdr) - if copyErr != nil { - err = fmt.Errorf("error discarding remaining data: %w", copyErr) - } - closeErr := rdr.Close() - if closeErr != nil { + if closeErr := rdr.Close(); closeErr != nil { if err != nil { - err = fmt.Errorf("%v; error closing reader: %w", err, closeErr) + err = fmt.Errorf("%w; error closing reader: %w", err, closeErr) } else { err = fmt.Errorf("error closing reader: %w", closeErr) } diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index ca6f82f967b6..12d7e66e5265 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -1226,7 +1226,14 @@ func getSafeRemoteURL(repo *git.Repository, preferred string) string { return safeURL } -func (s *Git) handleBinary(ctx context.Context, gitDir string, reporter sources.ChunkReporter, chunkSkel *sources.Chunk, commitHash plumbing.Hash, path string) error { +func (s *Git) handleBinary( + ctx context.Context, + gitDir string, + reporter sources.ChunkReporter, + chunkSkel *sources.Chunk, + commitHash plumbing.Hash, + path string, +) (err error) { fileCtx := context.WithValues(ctx, "commit", commitHash.String()[:7], "path", path) fileCtx.Logger().V(5).Info("handling binary file") @@ -1240,9 +1247,19 @@ func (s *Git) handleBinary(ctx context.Context, gitDir string, reporter sources. return nil } - cmd := exec.Command("git", "-C", gitDir, "cat-file", "blob", commitHash.String()+":"+path) + const ( + cmdTimeout = 60 * time.Second + waitDelay = 5 * time.Second + ) + // Create a timeout context for the 'git cat-file' command to ensure it does not run indefinitely. + // This prevents potential resource exhaustion by terminating the command if it exceeds the specified duration. + catFileCtx, cancel := context.WithTimeoutCause(fileCtx, cmdTimeout, errors.New("git cat-file timeout")) + defer cancel() + + cmd := exec.CommandContext(catFileCtx, "git", "-C", gitDir, "cat-file", "blob", commitHash.String()+":"+path) var stderr bytes.Buffer cmd.Stderr = &stderr + cmd.WaitDelay = waitDelay // give the command a chance to finish before the timeout :) stdout, err := cmd.StdoutPipe() if err != nil { @@ -1253,9 +1270,44 @@ func (s *Git) handleBinary(ctx context.Context, gitDir string, reporter sources. return fmt.Errorf("error starting git cat-file: %w\n%s", err, stderr.Bytes()) } - defer func() { _ = cmd.Wait() }() + // Ensure all data from the reader (stdout) is consumed to prevent broken pipe errors. + // This operation discards any remaining data after HandleFile completion. + // If the reader is fully consumed, the copy is essentially a no-op. + // If an error occurs while discarding, it will be logged and combined with any existing error. + // The command's completion is then awaited and any execution errors are handled. + defer func() { + n, copyErr := io.Copy(io.Discard, stdout) + if copyErr != nil { + ctx.Logger().Error( + copyErr, + "Failed to discard remaining stdout data after HandleFile completion", + ) + } - return handlers.HandleFile(ctx, stdout, chunkSkel, reporter, handlers.WithSkipArchives(s.skipArchives)) + ctx.Logger().V(3).Info( + "HandleFile did not consume all stdout data; excess discarded", + "bytes_discarded", n) + + err = combineErrors(err, fmt.Errorf("error discarding excess stdout data: %w", copyErr)) + + // Wait for the command to finish and handle any errors. + waitErr := cmd.Wait() + err = combineErrors(err, fmt.Errorf("command execution error: %w", waitErr)) + }() + + return handlers.HandleFile(catFileCtx, stdout, chunkSkel, reporter, handlers.WithSkipArchives(s.skipArchives)) +} + +// combineErrors combines two errors into one, preserving both error messages if they exist. +func combineErrors(err1, err2 error) error { + switch { + case err1 != nil && err2 != nil: + return fmt.Errorf("%w; %w", err1, err2) + case err1 != nil: + return err1 + default: + return err2 + } } func (s *Source) Enumerate(ctx context.Context, reporter sources.UnitReporter) error { From a505f07b9b50ddac5c0e801201a6ad8f842d0cc4 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sat, 12 Oct 2024 10:13:00 -0700 Subject: [PATCH 2/4] add comment --- pkg/sources/git/git.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index 12d7e66e5265..348986f30831 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -1251,6 +1251,19 @@ func (s *Git) handleBinary( cmdTimeout = 60 * time.Second waitDelay = 5 * time.Second ) + // NOTE: This kludge ensures the context timeout for the 'git cat-file' command + // matches the timeout for the HandleFile operation. + // By setting both timeouts to the same value, we can be more confident + // that both operations will run for the same duration. + // The command execution includes a small Wait delay before terminating the process, + // giving HandleFile time to respect the context + // and return before the process is forcibly killed. + // This approach helps prevent premature termination and allows for more complete processing. + + // TODO: Develop a more robust mechanism to ensure consistent timeout behavior between the command execution + // and the HandleFile operation. This should prevent premature termination and allow for complete processing. + handlers.SetArchiveMaxTimeout(cmdTimeout) + // Create a timeout context for the 'git cat-file' command to ensure it does not run indefinitely. // This prevents potential resource exhaustion by terminating the command if it exceeds the specified duration. catFileCtx, cancel := context.WithTimeoutCause(fileCtx, cmdTimeout, errors.New("git cat-file timeout")) From 44d9c43f8358bf5b2b67ecb0d957871436c48419 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Sat, 12 Oct 2024 12:52:07 -0700 Subject: [PATCH 3/4] use errors.Join --- pkg/sources/git/git.go | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index 348986f30831..df0cc63b84f9 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -1301,28 +1301,14 @@ func (s *Git) handleBinary( "HandleFile did not consume all stdout data; excess discarded", "bytes_discarded", n) - err = combineErrors(err, fmt.Errorf("error discarding excess stdout data: %w", copyErr)) - // Wait for the command to finish and handle any errors. waitErr := cmd.Wait() - err = combineErrors(err, fmt.Errorf("command execution error: %w", waitErr)) + err = errors.Join(err, copyErr, waitErr) }() return handlers.HandleFile(catFileCtx, stdout, chunkSkel, reporter, handlers.WithSkipArchives(s.skipArchives)) } -// combineErrors combines two errors into one, preserving both error messages if they exist. -func combineErrors(err1, err2 error) error { - switch { - case err1 != nil && err2 != nil: - return fmt.Errorf("%w; %w", err1, err2) - case err1 != nil: - return err1 - default: - return err2 - } -} - func (s *Source) Enumerate(ctx context.Context, reporter sources.UnitReporter) error { for _, repo := range s.conn.GetDirectories() { if repo == "" { From ca923f7cadc77cc3cdde7fe970fb937077e31949 Mon Sep 17 00:00:00 2001 From: Ahrav Dutta Date: Tue, 15 Oct 2024 10:58:02 -0700 Subject: [PATCH 4/4] address error wrapping --- pkg/handlers/handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/handlers/handlers.go b/pkg/handlers/handlers.go index d379a1338f0a..1561d826d0d5 100644 --- a/pkg/handlers/handlers.go +++ b/pkg/handlers/handlers.go @@ -287,7 +287,7 @@ func HandleFile( // Ensure all data is read to prevent broken pipe. if closeErr := rdr.Close(); closeErr != nil { if err != nil { - err = fmt.Errorf("%w; error closing reader: %w", err, closeErr) + err = errors.Join(err, closeErr) } else { err = fmt.Errorf("error closing reader: %w", closeErr) }