-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BCDA-8144: Change worker logic #951
Conversation
… diff from this branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
bcdaworker/worker/worker.go
Outdated
|
||
err = createJobKeys(ctx, w.r, jobKeys, job.ID) | ||
if err != nil { | ||
logger.Error(err) | ||
} | ||
|
||
//with a job key created, delete the temp job directory. | ||
if err = os.RemoveAll(tempJobPath); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible for this function to return before this is called? Wondering if we want to move this into a defer
statement right after creating the temp job path.
I could also imagine wanting to create a new temporary subdir for each call to process job, but maybe overkill if the workers process one job at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call; that latter condition is also a concern I'll address: if a worker is processing multiple keys on the same job, it could remove files that are actually still in-progress for a given job.
will add a defer block and update the directory structure / scope, thank you!
@@ -96,9 +96,19 @@ func (w *worker) ProcessJob(ctx context.Context, job models.Job, jobArgs models. | |||
} | |||
|
|||
jobID := strconv.Itoa(jobArgs.ID) | |||
//temp Job path is not dependent upon the directory. Using a UUID for a directory string prevents race conditions. | |||
tempJobPath := fmt.Sprintf("%s/%s", conf.GetEnv("FHIR_TEMP_DIR"), uuid.NewRandom()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kyeah updated the temporary paths to use a UUID to avoid the race condition.
logger.Error(err) | ||
return err | ||
} | ||
defer os.RemoveAll(tempJobPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kyeah and added a defer statement to remove any files in that directory, thanks again for calling that out
…t of avoiding race condition
// writeBBDataToFile sends requests to BlueButton and writes the results to ndjson files. | ||
// A list of JobKeys are returned, containing the names of files that were created. | ||
// Filesnames can be "blank.ndjson", "<uuid>.ndjson", or "<uuid>-error.ndjson". | ||
func writeBBDataToFile(ctx context.Context, r repository.Repository, bb client.APIClient, | ||
cmsID string, jobArgs models.JobEnqueueArgs) (jobKeys []models.JobKey, err error) { | ||
cmsID string, jobArgs models.JobEnqueueArgs, tmpDir string) (jobKeys []models.JobKey, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the method signature to allow for passing of the directory rather than expecting it on the basis of the job ID. Of note, the []models.JobKey already had the job ID, so nothing of value is irreversibly lost here.
🎫 Ticket
https://jira.cms.gov/browse/BCDA-8144
🛠 Changes
Writes job files first to a local EBS volume, and then when a job key is completed, moves it to to the staging directory and cleans up.
ℹ️ Context for reviewers
Currently, workers write directly to EFS when performing jobs. This utilizes operations that are in scarce supply. With this change, we will only write files to EFS once a file has completed. In combination with the upcoming gzip change, this will significantly reduce the necessary throughput / ops performed on EFS, enabling greater overall throughput (when comparing data moved * compression ratio).
Note, in each environment, I've set up the temp directory to be /home/bcda/TEMP_FHIR_DIR, since the files will be owned by the bcda user.
✅ Acceptance Validation
Unit tests pass, sonarqube quality gate passes.
Verified intended operation on the development servers, creating the temporary directory and clearing it out with each job key!
🔒 Security Implications
If any security implications apply, add Jason Ashbaugh (GitHub username: StewGoin) as a reviewer and do not merge this PR without his approval.