Skip to content

Commit

Permalink
BCDA-7786: Create Job Keys for ndjson files (#912)
Browse files Browse the repository at this point in the history
## 🎫 Ticket

https://jira.cms.gov/browse/BCDA-7786

## 🛠 Changes

- `processJob`: moved code to create job keys and update job status into
its own function for testing
- `processJob`: moved file size check into `writeBBDataToFile`
- `processJob`: creating keys for each Job Key that is returned 
- `writeBBDataToFile`: changed return values to be a list of JobKeys and
an error
- `writeBBDataToFile`: returning "*-error.ndjson" job key if it is
created (was not being returned and did not have job keys previously)
- update to pre-commit / golangci-lint to check the pkg, as linting was
failing on single files.

## ℹ️ Context for reviewers

error.ndjson files not having job keys written to the database and
middleware checks for job keys failing to [serve
HTTP](#909). These changes will
return the ndjson file and pass the middleware check, if the file
exists.

## ✅ Acceptance Validation

Current tests pass, some tests modified, new test added. Lots of
intertwined dependencies with this one; was difficult to test.

## 🔒 Security Implications

- [ ] This PR adds a new software dependency or dependencies.
- [ ] This PR modifies or invalidates one or more of our security
controls.
- [ ] This PR stores or transmits data that was not stored or
transmitted before.
- [ ] This PR requires additional review of its security implications
for other reasons.

If any security implications apply, add Jason Ashbaugh (GitHub username:
StewGoin) as a reviewer and do not merge this PR without his approval.
  • Loading branch information
laurenkrugen-navapbc authored Feb 23, 2024
1 parent be7bea0 commit 5e6a2cb
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 138 deletions.
10 changes: 10 additions & 0 deletions .golanglint-ci.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
run:
go: '1.19'
# Show statistics per linter.
# Default: false
show-stats: true
linters:
# Disable all linters.
# Default: false
Expand All @@ -13,3 +18,8 @@ linters:
- godox
- gosec
- gosimple
issues:
exclude-rules:
- path: /
linters:
- typecheck
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ repos:
- repo: https://github.com/tekwizely/pre-commit-golang
rev: v1.0.0-rc.1
hooks:
- id: golangci-lint
- id: golangci-lint-pkg
args: ['--new']
25 changes: 25 additions & 0 deletions bcda/models/postgres/postgrestest/postgrestest.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,28 @@ func getCCLFFiles(db *sql.DB, field, value string) ([]models.CCLFFile, error) {

return cclfFiles, nil
}

func GetJobKey(db *sql.DB, jobID int) ([]models.JobKey, error) {
sb := sqlbuilder.PostgreSQL.NewSelectBuilder().Select("id", "job_id", "file_name", "resource_type").From("job_keys")
sb.Where(sb.Equal("job_id", jobID))
query, args := sb.Build()
fmt.Println(query)
fmt.Println(args)
rows, err := db.Query(query, args...)
if err != nil {
return nil, err
}

defer rows.Close()

var jobKeys []models.JobKey
for rows.Next() {
var jobKey models.JobKey
if err := rows.Scan(&jobKey.ID, &jobKey.JobID, &jobKey.FileName,
&jobKey.ResourceType); err != nil {
return nil, err
}
jobKeys = append(jobKeys, jobKey)
}
return jobKeys, err
}
67 changes: 40 additions & 27 deletions bcdaworker/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ func (w *worker) ProcessJob(ctx context.Context, job models.Job, jobArgs models.
return err
}

fileUUID, fileSize, err := writeBBDataToFile(ctx, w.r, bb, *aco.CMSID, jobArgs)
fileName := fileUUID + ".ndjson"
ctx, _ = log.SetCtxLogger(ctx, "file_uuid", fileUUID)
ctx, logger = log.SetCtxLogger(ctx, "file_size", fileSize)
jobKeys, err := writeBBDataToFile(ctx, w.r, bb, *aco.CMSID, jobArgs)

// This is only run AFTER completion of all the collection
if err != nil {
Expand All @@ -130,23 +127,10 @@ func (w *worker) ProcessJob(ctx context.Context, job models.Job, jobArgs models.
return err
}
}
if fileSize == 0 {
logger.Warnf("ProcessJob: File %s is empty (fileSize 0), will rename file to %s", fileName, models.BlankFileName)
fileName = models.BlankFileName
}

jk := models.JobKey{JobID: job.ID, FileName: fileName, ResourceType: jobArgs.ResourceType}
if err := w.r.CreateJobKey(ctx, jk); err != nil {
err = errors.Wrap(err, fmt.Sprintf("ProcessJob: Error creating job key record for filename %s", fileName))
logger.Error(err)
return err
}

_, err = checkJobCompleteAndCleanup(ctx, w.r, job.ID)
err = createJobKeys(ctx, w.r, jobKeys, job.ID)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("ProcessJob: Error checking job completion & cleanup for filename %s", fileName))
logger.Error(err)
return err
}

// Not critical since we use the job_keys count as the authoritative list of completed jobs.
Expand All @@ -159,8 +143,14 @@ func (w *worker) ProcessJob(ctx context.Context, job models.Job, jobArgs models.
return nil
}

// writeBBDataToFile sends requests to BlueButton and writes the results to ndjson files.
// A list of JobKeys are returned, containing the names of files that were created.
// Filesnames can be "blank.ndjson", "<uuid>.ndjson", or "<uuid>-error.ndjson".
func writeBBDataToFile(ctx context.Context, r repository.Repository, bb client.APIClient,
cmsID string, jobArgs models.JobEnqueueArgs) (fileUUID string, size int64, err error) {
cmsID string, jobArgs models.JobEnqueueArgs) (jobKeys []models.JobKey, err error) {

jobKeys = append(jobKeys, models.JobKey{JobID: uint(jobArgs.ID), FileName: models.BlankFileName, ResourceType: jobArgs.ResourceType})

logger := log.GetCtxLogger(ctx)
close := metrics.NewChild(ctx, "writeBBDataToFile")
defer close()
Expand Down Expand Up @@ -203,15 +193,15 @@ func writeBBDataToFile(ctx context.Context, r repository.Repository, bb client.A
return bb.GetClaimResponse(jobArgs, bene.MBI, cw)
}
default:
return "", 0, fmt.Errorf("unsupported resource type requested: %s", jobArgs.ResourceType)
return jobKeys, fmt.Errorf("unsupported resource type requested: %s", jobArgs.ResourceType)
}

dataDir := conf.GetEnv("FHIR_STAGING_DIR")
fileUUID = uuid.New()
fileUUID := uuid.New()
f, err := os.Create(fmt.Sprintf("%s/%d/%s.ndjson", dataDir, jobArgs.ID, fileUUID))
if err != nil {
err = errors.Wrap(err, "Error creating ndjson file")
return "", 0, err
return jobKeys, err
}

defer utils.CloseFileAndLogError(f)
Expand Down Expand Up @@ -269,23 +259,31 @@ func writeBBDataToFile(ctx context.Context, r repository.Repository, bb client.A
}

if err = w.Flush(); err != nil {
return "", 0, errors.Wrap(err, "Error in writing the buffered data to the writer")
return jobKeys, errors.Wrap(err, "Error in writing the buffered data to the writer")
}

if failed {
if ctx.Err() == context.Canceled {
return "", 0, errors.New("Parent job was cancelled")
return jobKeys, errors.New("Parent job was cancelled")
}
return "", 0, errors.New(fmt.Sprintf("Number of failed requests has exceeded threshold of %f ", failThreshold))
return jobKeys, errors.New(fmt.Sprintf("Number of failed requests has exceeded threshold of %f ", failThreshold))
}

fstat, err := f.Stat()
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Error in obtaining FileInfo structure describing the file for ndjson fileUUID %s jobId %d for cmsID %s", fileUUID, jobArgs.ID, cmsID))
return "", 0, err
return jobKeys, err
}

if fstat.Size() != 0 {
pr := &jobKeys[0]
(*pr).FileName = fileUUID + ".ndjson"
}

return fileUUID, fstat.Size(), nil
if errorCount > 0 {
jobKeys = append(jobKeys, models.JobKey{JobID: uint(jobArgs.ID), FileName: fileUUID + "-error.ndjson", ResourceType: jobArgs.ResourceType})
}
return jobKeys, nil
}

// getBeneficiary returns the beneficiary. The bb ID value is retrieved and set in the model.
Expand Down Expand Up @@ -446,6 +444,21 @@ func checkJobCompleteAndCleanup(ctx context.Context, r repository.Repository, jo
return false, nil
}

func createJobKeys(ctx context.Context, r repository.Repository, jobKeys []models.JobKey, id uint) error {
for i := 0; i < len(jobKeys); i++ {
if err := r.CreateJobKey(ctx, jobKeys[i]); err != nil {
err = errors.Wrap(err, fmt.Sprintf("Error creating job key record for filename %s", jobKeys[i].FileName))
return err
}
_, err := checkJobCompleteAndCleanup(ctx, r, id)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Error checking job completion & cleanup for filename %s", jobKeys[i].FileName))
return err
}
}
return nil
}

func createDir(path string) error {
if _, err := os.Stat(path); os.IsNotExist(err) {
if err = os.MkdirAll(path, os.ModePerm); err != nil {
Expand Down
Loading

0 comments on commit 5e6a2cb

Please sign in to comment.