diff --git a/.golanglint-ci.yml b/.golanglint-ci.yml index 0051ef8a8..ed697431f 100644 --- a/.golanglint-ci.yml +++ b/.golanglint-ci.yml @@ -1,3 +1,8 @@ +run: + go: '1.19' + # Show statistics per linter. + # Default: false + show-stats: true linters: # Disable all linters. # Default: false @@ -13,3 +18,8 @@ linters: - godox - gosec - gosimple +issues: + exclude-rules: + - path: / + linters: + - typecheck diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1d792f1ed..0c00e74ca 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,4 +6,5 @@ repos: - repo: https://github.com/tekwizely/pre-commit-golang rev: v1.0.0-rc.1 hooks: - - id: golangci-lint + - id: golangci-lint-pkg + args: ['--new'] diff --git a/bcda/models/postgres/postgrestest/postgrestest.go b/bcda/models/postgres/postgrestest/postgrestest.go index 7a1728859..b4b64f3de 100644 --- a/bcda/models/postgres/postgrestest/postgrestest.go +++ b/bcda/models/postgres/postgrestest/postgrestest.go @@ -381,3 +381,28 @@ func getCCLFFiles(db *sql.DB, field, value string) ([]models.CCLFFile, error) { return cclfFiles, nil } + +func GetJobKey(db *sql.DB, jobID int) ([]models.JobKey, error) { + sb := sqlbuilder.PostgreSQL.NewSelectBuilder().Select("id", "job_id", "file_name", "resource_type").From("job_keys") + sb.Where(sb.Equal("job_id", jobID)) + query, args := sb.Build() + fmt.Println(query) + fmt.Println(args) + rows, err := db.Query(query, args...) + if err != nil { + return nil, err + } + + defer rows.Close() + + var jobKeys []models.JobKey + for rows.Next() { + var jobKey models.JobKey + if err := rows.Scan(&jobKey.ID, &jobKey.JobID, &jobKey.FileName, + &jobKey.ResourceType); err != nil { + return nil, err + } + jobKeys = append(jobKeys, jobKey) + } + return jobKeys, err +} diff --git a/bcdaworker/worker/worker.go b/bcdaworker/worker/worker.go index c3cc65ca8..41e5511c8 100644 --- a/bcdaworker/worker/worker.go +++ b/bcdaworker/worker/worker.go @@ -109,10 +109,7 @@ func (w *worker) ProcessJob(ctx context.Context, job models.Job, jobArgs models. return err } - fileUUID, fileSize, err := writeBBDataToFile(ctx, w.r, bb, *aco.CMSID, jobArgs) - fileName := fileUUID + ".ndjson" - ctx, _ = log.SetCtxLogger(ctx, "file_uuid", fileUUID) - ctx, logger = log.SetCtxLogger(ctx, "file_size", fileSize) + jobKeys, err := writeBBDataToFile(ctx, w.r, bb, *aco.CMSID, jobArgs) // This is only run AFTER completion of all the collection if err != nil { @@ -130,23 +127,10 @@ func (w *worker) ProcessJob(ctx context.Context, job models.Job, jobArgs models. return err } } - if fileSize == 0 { - logger.Warnf("ProcessJob: File %s is empty (fileSize 0), will rename file to %s", fileName, models.BlankFileName) - fileName = models.BlankFileName - } - - jk := models.JobKey{JobID: job.ID, FileName: fileName, ResourceType: jobArgs.ResourceType} - if err := w.r.CreateJobKey(ctx, jk); err != nil { - err = errors.Wrap(err, fmt.Sprintf("ProcessJob: Error creating job key record for filename %s", fileName)) - logger.Error(err) - return err - } - _, err = checkJobCompleteAndCleanup(ctx, w.r, job.ID) + err = createJobKeys(ctx, w.r, jobKeys, job.ID) if err != nil { - err = errors.Wrap(err, fmt.Sprintf("ProcessJob: Error checking job completion & cleanup for filename %s", fileName)) logger.Error(err) - return err } // Not critical since we use the job_keys count as the authoritative list of completed jobs. @@ -159,8 +143,14 @@ func (w *worker) ProcessJob(ctx context.Context, job models.Job, jobArgs models. return nil } +// writeBBDataToFile sends requests to BlueButton and writes the results to ndjson files. +// A list of JobKeys are returned, containing the names of files that were created. +// Filesnames can be "blank.ndjson", ".ndjson", or "-error.ndjson". func writeBBDataToFile(ctx context.Context, r repository.Repository, bb client.APIClient, - cmsID string, jobArgs models.JobEnqueueArgs) (fileUUID string, size int64, err error) { + cmsID string, jobArgs models.JobEnqueueArgs) (jobKeys []models.JobKey, err error) { + + jobKeys = append(jobKeys, models.JobKey{JobID: uint(jobArgs.ID), FileName: models.BlankFileName, ResourceType: jobArgs.ResourceType}) + logger := log.GetCtxLogger(ctx) close := metrics.NewChild(ctx, "writeBBDataToFile") defer close() @@ -203,15 +193,15 @@ func writeBBDataToFile(ctx context.Context, r repository.Repository, bb client.A return bb.GetClaimResponse(jobArgs, bene.MBI, cw) } default: - return "", 0, fmt.Errorf("unsupported resource type requested: %s", jobArgs.ResourceType) + return jobKeys, fmt.Errorf("unsupported resource type requested: %s", jobArgs.ResourceType) } dataDir := conf.GetEnv("FHIR_STAGING_DIR") - fileUUID = uuid.New() + fileUUID := uuid.New() f, err := os.Create(fmt.Sprintf("%s/%d/%s.ndjson", dataDir, jobArgs.ID, fileUUID)) if err != nil { err = errors.Wrap(err, "Error creating ndjson file") - return "", 0, err + return jobKeys, err } defer utils.CloseFileAndLogError(f) @@ -269,23 +259,31 @@ func writeBBDataToFile(ctx context.Context, r repository.Repository, bb client.A } if err = w.Flush(); err != nil { - return "", 0, errors.Wrap(err, "Error in writing the buffered data to the writer") + return jobKeys, errors.Wrap(err, "Error in writing the buffered data to the writer") } if failed { if ctx.Err() == context.Canceled { - return "", 0, errors.New("Parent job was cancelled") + return jobKeys, errors.New("Parent job was cancelled") } - return "", 0, errors.New(fmt.Sprintf("Number of failed requests has exceeded threshold of %f ", failThreshold)) + return jobKeys, errors.New(fmt.Sprintf("Number of failed requests has exceeded threshold of %f ", failThreshold)) } fstat, err := f.Stat() if err != nil { err = errors.Wrap(err, fmt.Sprintf("Error in obtaining FileInfo structure describing the file for ndjson fileUUID %s jobId %d for cmsID %s", fileUUID, jobArgs.ID, cmsID)) - return "", 0, err + return jobKeys, err + } + + if fstat.Size() != 0 { + pr := &jobKeys[0] + (*pr).FileName = fileUUID + ".ndjson" } - return fileUUID, fstat.Size(), nil + if errorCount > 0 { + jobKeys = append(jobKeys, models.JobKey{JobID: uint(jobArgs.ID), FileName: fileUUID + "-error.ndjson", ResourceType: jobArgs.ResourceType}) + } + return jobKeys, nil } // getBeneficiary returns the beneficiary. The bb ID value is retrieved and set in the model. @@ -446,6 +444,21 @@ func checkJobCompleteAndCleanup(ctx context.Context, r repository.Repository, jo return false, nil } +func createJobKeys(ctx context.Context, r repository.Repository, jobKeys []models.JobKey, id uint) error { + for i := 0; i < len(jobKeys); i++ { + if err := r.CreateJobKey(ctx, jobKeys[i]); err != nil { + err = errors.Wrap(err, fmt.Sprintf("Error creating job key record for filename %s", jobKeys[i].FileName)) + return err + } + _, err := checkJobCompleteAndCleanup(ctx, r, id) + if err != nil { + err = errors.Wrap(err, fmt.Sprintf("Error checking job completion & cleanup for filename %s", jobKeys[i].FileName)) + return err + } + } + return nil +} + func createDir(path string) error { if _, err := os.Stat(path); os.IsNotExist(err) { if err = os.MkdirAll(path, os.ModePerm); err != nil { diff --git a/bcdaworker/worker/worker_test.go b/bcdaworker/worker/worker_test.go index 25ea9134d..6e961150c 100644 --- a/bcdaworker/worker/worker_test.go +++ b/bcdaworker/worker/worker_test.go @@ -52,6 +52,8 @@ type WorkerTestSuite struct { db *sql.DB r repository.Repository w Worker + + logctx context.Context } func (s *WorkerTestSuite) SetupSuite() { @@ -83,6 +85,9 @@ func (s *WorkerTestSuite) SetupSuite() { // Set up the logger since we're using the real client client.SetLogger(log.BBWorker) oldLogger = log.Worker + + ctx := context.Background() + s.logctx = log.NewStructuredLoggerEntry(log.Worker, ctx) } func (s *WorkerTestSuite) SetupTest() { @@ -124,88 +129,36 @@ func TestWorkerTestSuite(t *testing.T) { suite.Run(t, new(WorkerTestSuite)) } -func (s *WorkerTestSuite) TestWriteEOBToFile() { - - ctx, jobArgs, bbc := SetupWriteResourceToFile(s, "ExplanationOfBenefit") - uuid, size, err := writeBBDataToFile(ctx, s.r, bbc, *s.testACO.CMSID, jobArgs) - assert.NotEqual(s.T(), int64(0), size) - - files, err1 := os.ReadDir(s.stagingDir) - assert.NoError(s.T(), err1) - assert.NoError(s.T(), err) - assert.NotEmpty(s.T(), uuid) - assert.Len(s.T(), files, 1) - - VerifyFileContent(s.T(), files, "ExplanationOfBenefit", 33, s.jobID) -} - -func (s *WorkerTestSuite) TestWriteCoverageToFile() { - ctx, jobArgs, bbc := SetupWriteResourceToFile(s, "Coverage") - uuid, size, err := writeBBDataToFile(ctx, s.r, bbc, *s.testACO.CMSID, jobArgs) - assert.NotEqual(s.T(), int64(0), size) - - files, err1 := os.ReadDir(s.stagingDir) - assert.NoError(s.T(), err1) - assert.NoError(s.T(), err) - assert.NotEmpty(s.T(), uuid) - assert.Len(s.T(), files, 1) - - VerifyFileContent(s.T(), files, "Coverage", 3, s.jobID) -} - -func (s *WorkerTestSuite) TestWritePatientToFile() { - ctx, jobArgs, bbc := SetupWriteResourceToFile(s, "Patient") - uuid, size, err := writeBBDataToFile(ctx, s.r, bbc, *s.testACO.CMSID, jobArgs) - assert.NotEqual(s.T(), int64(0), size) - - files, err1 := os.ReadDir(s.stagingDir) - assert.NoError(s.T(), err1) - assert.NoError(s.T(), err) - assert.NotEmpty(s.T(), uuid) - assert.Len(s.T(), files, 1) - - VerifyFileContent(s.T(), files, "Patient", 1, s.jobID) -} - -func (s *WorkerTestSuite) TestWriteClaimToFile() { - ctx, jobArgs, bbc := SetupWriteResourceToFile(s, "Claim") - uuid, size, err := writeBBDataToFile(ctx, s.r, bbc, *s.testACO.CMSID, jobArgs) - assert.NotEqual(s.T(), int64(0), size) - - files, err1 := os.ReadDir(s.stagingDir) - assert.NoError(s.T(), err1) - assert.NoError(s.T(), err) - assert.NotEmpty(s.T(), uuid) - assert.Len(s.T(), files, 1) - - VerifyFileContent(s.T(), files, "Claim", 1, s.jobID) -} - -func (s *WorkerTestSuite) TestWriteClaimResponseToFile() { - ctx, jobArgs, bbc := SetupWriteResourceToFile(s, "ClaimResponse") - uuid, size, err := writeBBDataToFile(ctx, s.r, bbc, *s.testACO.CMSID, jobArgs) - assert.NotEqual(s.T(), int64(0), size) - - files, err1 := os.ReadDir(s.stagingDir) - assert.NoError(s.T(), err1) - assert.NoError(s.T(), err) - assert.NotEmpty(s.T(), uuid) - assert.Len(s.T(), files, 1) - - VerifyFileContent(s.T(), files, "ClaimResponse", 1, s.jobID) - -} - -func (s *WorkerTestSuite) TestWriteUnsupportedResourceToFile() { - ctx, jobArgs, bbc := SetupWriteResourceToFile(s, "UnsupportedResourceType") - uuid, size, err := writeBBDataToFile(ctx, s.r, bbc, *s.testACO.CMSID, jobArgs) - assert.EqualValues(s.T(), 0, size) - assert.Error(s.T(), err) - assert.Empty(s.T(), uuid) - files, err := os.ReadDir(s.stagingDir) - assert.NoError(s.T(), err) - assert.Len(s.T(), files, 0) +func (s *WorkerTestSuite) TestWriteResourcesToFile() { + tests := []struct { + resource string + jobKeysCount int + fileCount int + expectedCount int + err error + }{ + {"ExplanationOfBenefit", 1, 1, 33, nil}, + {"Coverage", 1, 1, 3, nil}, + {"Patient", 1, 1, 1, nil}, + {"Claim", 1, 1, 1, nil}, + {"ClaimResponse", 1, 1, 1, nil}, + {"UnsupportedResource", 1, 0, 0, errors.Errorf("unsupported resouce")}, + } + for _, tt := range tests { + ctx, jobArgs, bbc := SetupWriteResourceToFile(s, tt.resource) + jobKeys, err := writeBBDataToFile(ctx, s.r, bbc, *s.testACO.CMSID, jobArgs) + if tt.err == nil { + assert.NoError(s.T(), err) + } else { + assert.Error(s.T(), err) + } + files, err := os.ReadDir(s.stagingDir) + assert.NoError(s.T(), err) + assert.Len(s.T(), jobKeys, tt.jobKeysCount) + assert.Len(s.T(), files, tt.fileCount) + VerifyFileContent(s.T(), files, tt.resource, tt.expectedCount, s.jobID) + } } func SetupWriteResourceToFile(s *WorkerTestSuite, resource string) (context.Context, models.JobEnqueueArgs, *client.MockBlueButtonClient) { @@ -297,10 +250,8 @@ func (s *WorkerTestSuite) TestWriteEmptyResourceToFile() { jobArgs := models.JobEnqueueArgs{ID: s.jobID, ResourceType: "ExplanationOfBenefit", BeneficiaryIDs: cclfBeneficiaryIDs, TransactionTime: transactionTime, ACOID: s.testACO.UUID.String()} // Set up the mock function to return the expected values bbc.On("GetExplanationOfBenefit", jobArgs, "abcdef12000", client.ClaimsWindow{}).Return(bbc.GetBundleData("ExplanationOfBenefitEmpty", "abcdef12000")) - ctx := context.Background() - ctx = log.NewStructuredLoggerEntry(log.Worker, ctx) - _, size, err := writeBBDataToFile(ctx, s.r, &bbc, *s.testACO.CMSID, jobArgs) - assert.EqualValues(s.T(), 0, size) + jobKeys, err := writeBBDataToFile(s.logctx, s.r, &bbc, *s.testACO.CMSID, jobArgs) + assert.EqualValues(s.T(), "blank.ndjson", jobKeys[0].FileName) assert.NoError(s.T(), err) } @@ -330,13 +281,12 @@ func (s *WorkerTestSuite) TestWriteEOBDataToFileWithErrorsBelowFailureThreshold( bbc.On("GetExplanationOfBenefit", jobArgs, "abcdef10000", claimsWindowMatcher()).Return(nil, errors.New("error")) bbc.On("GetExplanationOfBenefit", jobArgs, "abcdef11000", claimsWindowMatcher()).Return(nil, errors.New("error")) bbc.On("GetExplanationOfBenefit", jobArgs, "abcdef12000", claimsWindowMatcher()).Return(bbc.GetBundleData("ExplanationOfBenefit", "abcdef12000")) - ctx := context.Background() - ctx = log.NewStructuredLoggerEntry(log.Worker, ctx) - fileUUID, size, err := writeBBDataToFile(ctx, s.r, &bbc, *s.testACO.CMSID, jobArgs) - assert.NotEqual(s.T(), int64(0), size) + jobKeys, err := writeBBDataToFile(s.logctx, s.r, &bbc, *s.testACO.CMSID, jobArgs) + assert.NotEqual(s.T(), "blank.ndjson", jobKeys[0].FileName) + assert.Contains(s.T(), jobKeys[1].FileName, "error.ndjson") + assert.Len(s.T(), jobKeys, 2) assert.NoError(s.T(), err) - - errorFilePath := fmt.Sprintf("%s/%d/%s-error.ndjson", conf.GetEnv("FHIR_STAGING_DIR"), s.jobID, fileUUID) + errorFilePath := fmt.Sprintf("%s/%d/%s", conf.GetEnv("FHIR_STAGING_DIR"), s.jobID, jobKeys[1].FileName) fData, err := os.ReadFile(errorFilePath) assert.NoError(s.T(), err) @@ -383,9 +333,8 @@ func (s *WorkerTestSuite) TestWriteEOBDataToFileWithErrorsAboveFailureThreshold( bbc.On("GetPatientByIdentifierHash", id1).Return(bbc.GetData("Patient", beneficiaryIDs[1])) jobArgs.BeneficiaryIDs = cclfBeneficiaryIDs - ctx := context.Background() - ctx = log.NewStructuredLoggerEntry(log.Worker, ctx) - _, _, err = writeBBDataToFile(ctx, s.r, &bbc, *s.testACO.CMSID, jobArgs) + jobKeys, err := writeBBDataToFile(s.logctx, s.r, &bbc, *s.testACO.CMSID, jobArgs) + assert.Len(s.T(), jobKeys, 1) assert.Contains(s.T(), err.Error(), "Number of failed requests has exceeded threshold") files, err := os.ReadDir(s.stagingDir) @@ -428,9 +377,9 @@ func (s *WorkerTestSuite) TestWriteEOBDataToFile_BlueButtonIDNotFound() { } jobArgs := models.JobEnqueueArgs{ID: s.jobID, ResourceType: "ExplanationOfBenefit", BeneficiaryIDs: cclfBeneficiaryIDs, TransactionTime: time.Now(), ACOID: s.testACO.UUID.String()} - ctx := context.Background() - ctx = log.NewStructuredLoggerEntry(log.Worker, ctx) - _, _, err := writeBBDataToFile(ctx, s.r, &bbc, *s.testACO.CMSID, jobArgs) + jobKeys, err := writeBBDataToFile(s.logctx, s.r, &bbc, *s.testACO.CMSID, jobArgs) + assert.Len(s.T(), jobKeys, 1) + assert.Equal(s.T(), jobKeys[0].FileName, "blank.ndjson") assert.Contains(s.T(), err.Error(), "Number of failed requests has exceeded threshold") files, err := os.ReadDir(s.stagingDir) @@ -491,9 +440,7 @@ func (s *WorkerTestSuite) TestGetFailureThreshold() { } func (s *WorkerTestSuite) TestAppendErrorToFile() { - ctx := context.Background() - ctx = log.NewStructuredLoggerEntry(log.Worker, ctx) - appendErrorToFile(ctx, s.testACO.UUID.String(), + appendErrorToFile(s.logctx, s.testACO.UUID.String(), fhircodes.IssueTypeCode_CODE_INVALID, "", "", s.jobID) @@ -516,7 +463,6 @@ func (s *WorkerTestSuite) TestAppendErrorToFile() { } func (s *WorkerTestSuite) TestProcessJobEOB() { - ctx := log.NewStructuredLoggerEntry(log.Worker, context.Background()) j := models.Job{ ACOID: uuid.Parse(constants.TestACOID), RequestURL: "/api/v1/ExplanationOfBenefit/$export", @@ -525,7 +471,7 @@ func (s *WorkerTestSuite) TestProcessJobEOB() { } postgrestest.CreateJobs(s.T(), s.db, &j) - complete, err := checkJobCompleteAndCleanup(ctx, s.r, j.ID) + complete, err := checkJobCompleteAndCleanup(s.logctx, s.r, j.ID) assert.Nil(s.T(), err) assert.False(s.T(), complete) @@ -538,12 +484,12 @@ func (s *WorkerTestSuite) TestProcessJobEOB() { TransactionID: uuid.New(), } - ctx = log.NewStructuredLoggerEntry(log.Worker, ctx) - ctx, _ = log.SetCtxLogger(ctx, "job_id", j.ID) + ctx, _ := log.SetCtxLogger(s.logctx, "job_id", j.ID) ctx, logger := log.SetCtxLogger(ctx, "transaction_id", jobArgs.TransactionID) logHook = test.NewLocal(testUtils.GetLogger(logger)) err = s.w.ProcessJob(ctx, j, jobArgs) + entries := logHook.AllEntries() assert.Nil(s.T(), err) assert.Contains(s.T(), entries[0].Data, "cms_id") @@ -630,13 +576,11 @@ func (s *WorkerTestSuite) TestProcessJob_NoBBClient() { BBBasePath: constants.TestFHIRPath, } - ctx := context.Background() - ctx = log.NewStructuredLoggerEntry(log.Worker, ctx) origBBCert := conf.GetEnv("BB_CLIENT_CERT_FILE") defer conf.SetEnv(s.T(), "BB_CLIENT_CERT_FILE", origBBCert) conf.UnsetEnv(s.T(), "BB_CLIENT_CERT_FILE") - assert.Contains(s.T(), s.w.ProcessJob(ctx, j, jobArgs).Error(), "could not create Blue Button client") + assert.Contains(s.T(), s.w.ProcessJob(s.logctx, j, jobArgs).Error(), "could not create Blue Button client") } func (s *WorkerTestSuite) TestJobCancelledTerminalStatus() { @@ -720,9 +664,7 @@ func (s *WorkerTestSuite) TestCheckJobCompleteAndCleanup() { } } - ctx := context.Background() - ctx = log.NewStructuredLoggerEntry(log.Worker, ctx) - completed, err := checkJobCompleteAndCleanup(ctx, repository, jobID) + completed, err := checkJobCompleteAndCleanup(s.logctx, repository, jobID) assert.NoError(t, err) assert.Equal(t, tt.completed, completed) @@ -789,6 +731,31 @@ func (s *WorkerTestSuite) TestValidateJob() { assert.EqualValues(s.T(), validJob.ID, j.ID) } +func (s *WorkerTestSuite) TestCreateJobKeys() { + j := models.Job{ + ACOID: uuid.Parse(constants.TestACOID), + RequestURL: "/api/v1/ExplanationOfBenefit/$export", + Status: models.JobStatusPending, + JobCount: 1, + } + postgrestest.CreateJobs(s.T(), s.db, &j) + + complete, err := checkJobCompleteAndCleanup(s.logctx, s.r, j.ID) + assert.Nil(s.T(), err) + assert.False(s.T(), complete) + + keys := []models.JobKey{ + {JobID: 1, FileName: models.BlankFileName, ResourceType: "Patient"}, + {JobID: 1, FileName: uuid.New() + ".ndjson", ResourceType: "Coverage"}, + } + err = createJobKeys(s.logctx, s.r, keys, j.ID) + assert.NoError(s.T(), err) + for i := 0; i < len(keys); i++ { + job, _ := postgrestest.GetJobKey(s.db, int(keys[i].JobID)) + assert.NotEmpty(s.T(), job) + } +} + func generateUniqueJobID(t *testing.T, db *sql.DB, acoID uuid.UUID) int { j := models.Job{ ACOID: acoID,