Skip to content

Commit

Permalink
BCDA-7980: Implement gzip compression for files stored on EFS (#955)
Browse files Browse the repository at this point in the history
## 🎫 Ticket

https://jira.cms.gov/browse/BCDA-7980

## 🛠 Changes

Instead of attempting to move files, we write a gzip-encoded file to EFS
and delete the temporary files after each job key

## ℹ️ Context for reviewers

This is the final ticket in the gzip epic, yay! 
Adds a new environment variable, COMPRESSION_LEVEL, that can be set
between 1-9
Files are stored as gzip-encoded content, previous PRs ensure that the
/data/ endpoint are content-encoding aware, and can return un-encoded
content if requested, while ensuring significant gains for the 90+% of
requests that do request gzip-encoded content.

## ✅ Acceptance Validation

Smoke tests + unit tests pass.
I've also performed manual validation utilizing the dev server.
Throughput is as expected.

## 🔒 Security Implications

- [ ] This PR adds a new software dependency or dependencies.
- [ ] This PR modifies or invalidates one or more of our security
controls.
- [ ] This PR stores or transmits data that was not stored or
transmitted before.
- [ ] This PR requires additional review of its security implications
for other reasons.

If any security implications apply, add Jason Ashbaugh (GitHub username:
StewGoin) as a reviewer and do not merge this PR without his approval.
  • Loading branch information
alex-dzeda authored Jun 12, 2024
1 parent 77ea49d commit 00cfea4
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 9 deletions.
50 changes: 47 additions & 3 deletions bcdaworker/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package worker

import (
"bufio"
"compress/gzip"
"context"
"database/sql"
"encoding/json"
goerrors "errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"

"github.com/CMSgov/bcda-app/bcda/cclf/metrics"
Expand All @@ -21,6 +24,7 @@ import (
"github.com/CMSgov/bcda-app/bcdaworker/repository/postgres"
"github.com/CMSgov/bcda-app/conf"
"github.com/CMSgov/bcda-app/log"
"github.com/sirupsen/logrus"

fhircodes "github.com/google/fhir/go/proto/google/fhir/proto/stu3/codes_go_proto"
"github.com/pborman/uuid"
Expand Down Expand Up @@ -144,7 +148,7 @@ func (w *worker) ProcessJob(ctx context.Context, job models.Job, jobArgs models.
}
}
//move the files over
err = moveFiles(tempJobPath, stagingPath)
err = compressFiles(ctx, tempJobPath, stagingPath)
if err != nil {
logger.Error(err)
}
Expand All @@ -164,25 +168,65 @@ func (w *worker) ProcessJob(ctx context.Context, job models.Job, jobArgs models.
return nil
}

func moveFiles(tempDir string, stagingDir string) error {
func compressFiles(ctx context.Context, tempDir string, stagingDir string) error {
logger := log.GetCtxLogger(ctx)
// Open the input file
files, err := os.ReadDir(tempDir)
if err != nil {
err = errors.Wrap(err, "Error reading from the staging directory for files for Job")
return err
}
gzipLevel, err := strconv.Atoi(os.Getenv("COMPRESSION_LEVEL"))
if err != nil || gzipLevel < 1 || gzipLevel > 9 { //levels 1-9 supported by BCDA.
gzipLevel = gzip.DefaultCompression
logger.Warnf("COMPRESSION_LEVEL not set to appropriate value; using default.")
}
for _, f := range files {
oldPath := fmt.Sprintf("%s/%s", tempDir, f.Name())
newPath := fmt.Sprintf("%s/%s", stagingDir, f.Name())
err := os.Rename(oldPath, newPath) //#nosec G304
//Anonymous function to ensure defer statements run
err := func() error {
inputFile, err := os.Open(filepath.Clean(oldPath))
if err != nil {
return err
}
defer CloseOrLogError(logger, inputFile)

outputFile, err := os.Create(filepath.Clean(newPath))
if err != nil {
return err
}
defer CloseOrLogError(logger, outputFile)
gzipWriter, err := gzip.NewWriterLevel(outputFile, gzipLevel)
if err != nil {
return err
}
defer gzipWriter.Close()

// Copy the data from the input file to the gzip writer
if _, err := io.Copy(gzipWriter, inputFile); err != nil {
return err
}
return nil
}()
if err != nil {
return err
}

}
return nil

}

func CloseOrLogError(logger logrus.FieldLogger, f *os.File) {
if f == nil {
return
}
if err := f.Close(); err != nil {
logger.Warnf("Error closing file: %v", err)
}
}

// writeBBDataToFile sends requests to BlueButton and writes the results to ndjson files.
// A list of JobKeys are returned, containing the names of files that were created.
// Filesnames can be "blank.ndjson", "<uuid>.ndjson", or "<uuid>-error.ndjson".
Expand Down
42 changes: 36 additions & 6 deletions bcdaworker/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,38 @@ func (s *WorkerTestSuite) TestCreateDir() {
assert.NoError(s.T(), err)
}

func (s *WorkerTestSuite) TestMoveFiles() {
//negative case
err := moveFiles("/", "fake_dir")
func (s *WorkerTestSuite) TestCompressFilesGzipLevel() {
//In short, none of these should produce errors when being run.
tempDir1, err := os.MkdirTemp("", "*")
if err != nil {
s.FailNow(err.Error())
}
tempDir2, err := os.MkdirTemp("", "*")
if err != nil {
s.FailNow(err.Error())
}

os.Setenv("COMPRESSION_LEVEL", "potato")
err = compressFiles(s.logctx, tempDir1, tempDir2)
assert.NoError(s.T(), err)

os.Setenv("COMPRESSION_LEVEL", "1")
err = compressFiles(s.logctx, tempDir1, tempDir2)
assert.NoError(s.T(), err)

os.Setenv("COMPRESSION_LEVEL", "11")
err = compressFiles(s.logctx, tempDir1, tempDir2)
assert.NoError(s.T(), err)

}

func (s *WorkerTestSuite) TestCompressFiles() {
//negative cases.
err := compressFiles(s.logctx, "/", "fake_dir")
assert.Error(s.T(), err)
err = moveFiles("/proc/fakedir", "fake_dir")
err = compressFiles(s.logctx, "/proc/fakedir", "fake_dir")
assert.Error(s.T(), err)

//positive case, create two temporary directories + a file, and move a file between them.
tempDir1, err := os.MkdirTemp("", "*")
if err != nil {
Expand All @@ -597,12 +623,16 @@ func (s *WorkerTestSuite) TestMoveFiles() {
if err != nil {
s.FailNow(err.Error())
}
err = moveFiles(tempDir1, tempDir2)
err = compressFiles(s.logctx, tempDir1, tempDir2)
assert.NoError(s.T(), err)
files, _ := os.ReadDir(tempDir2)
assert.Len(s.T(), files, 1)
files, _ = os.ReadDir(tempDir1)
assert.Len(s.T(), files, 0)
assert.Len(s.T(), files, 1)

//One more negative case, when the destination is not able to be moved.
err = compressFiles(s.logctx, tempDir2, "/proc/fakedir")
assert.Error(s.T(), err)

}

Expand Down

0 comments on commit 00cfea4

Please sign in to comment.