Skip to content

Commit

Permalink
Add EHR data sync tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
toddkazakov committed Aug 15, 2023
1 parent cf3755b commit dbe08e0
Show file tree
Hide file tree
Showing 13 changed files with 7,961 additions and 1,743 deletions.
47 changes: 47 additions & 0 deletions clinics/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"net/http"

"github.com/tidepool-org/platform/pointer"

"github.com/kelseyhightower/envconfig"
clinic "github.com/tidepool-org/clinic/client"
"go.uber.org/fx"
Expand All @@ -22,6 +24,8 @@ var ClientModule = fx.Provide(NewClient)
type Client interface {
GetClinician(ctx context.Context, clinicID, clinicianID string) (*clinic.Clinician, error)
SharePatientAccount(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error)
ListEHREnabledClinics(ctx context.Context) ([]clinic.Clinic, error)
SyncEHRData(ctx context.Context, clinicID string) error
}

type config struct {
Expand Down Expand Up @@ -76,6 +80,38 @@ func (d *defaultClient) GetClinician(ctx context.Context, clinicID, clinicianID
return response.JSON200, nil
}

func (d *defaultClient) ListEHREnabledClinics(ctx context.Context) ([]clinic.Clinic, error) {
offset := 0
batchSize := 1000

clinics := make([]clinic.Clinic, 0)
for {
response, err := d.httpClient.ListClinicsWithResponse(ctx, &clinic.ListClinicsParams{
EhrEnabled: pointer.FromBool(true),
Offset: &offset,
Limit: &batchSize,
})
if err != nil {
return nil, err
}
if response.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("unexpected response status code %v from %v", response.StatusCode(), response.HTTPResponse.Request.URL)
}
if response.JSON200 == nil {
break
}

clinics = append(clinics, *response.JSON200...)
offset = offset + batchSize

if len(*response.JSON200) < batchSize {
break
}
}

return clinics, nil
}

func (d *defaultClient) SharePatientAccount(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error) {
permission := make(map[string]interface{}, 0)
body := clinic.CreatePatientFromUserJSONRequestBody{
Expand All @@ -98,6 +134,17 @@ func (d *defaultClient) SharePatientAccount(ctx context.Context, clinicID, patie
return response.JSON200, nil
}

func (d *defaultClient) SyncEHRData(ctx context.Context, clinicID string) error {
response, err := d.httpClient.SyncEHRDataWithResponse(ctx, clinicID)
if err != nil {
return err
}
if response.StatusCode() != http.StatusAccepted {
return fmt.Errorf("unexpected response status code %v from %v", response.StatusCode(), response.HTTPResponse.Request.URL)
}
return nil
}

func (d *defaultClient) getPatient(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error) {
response, err := d.httpClient.GetPatientWithResponse(ctx, clinic.ClinicId(clinicID), clinic.PatientId(patientID))
if err != nil {
Expand Down
167 changes: 167 additions & 0 deletions ehr/reconcile/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package reconcile

import (
"context"
"math/rand"
"time"

api "github.com/tidepool-org/clinic/client"

"github.com/tidepool-org/platform/auth"
"github.com/tidepool-org/platform/clinics"
"github.com/tidepool-org/platform/ehr/sync"
"github.com/tidepool-org/platform/errors"
"github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/page"
"github.com/tidepool-org/platform/pointer"
"github.com/tidepool-org/platform/task"
)

const (
AvailableAfterDurationMaximum = AvailableAfterDurationMinimum + 1*time.Hour
AvailableAfterDurationMinimum = 14*24*time.Hour - 30*time.Minute
TaskDurationMaximum = 5 * time.Minute
)

type Runner struct {
authClient auth.Client
clinicsClient clinics.Client
taskClient task.Client
logger log.Logger
}

func NewRunner(authClient auth.Client, clinicsClient clinics.Client, taskClient task.Client, logger log.Logger) (*Runner, error) {
return &Runner{
authClient: authClient,
clinicsClient: clinicsClient,
taskClient: taskClient,
logger: logger,
}, nil
}

func (r *Runner) GetRunnerType() string {
return Type
}

func (r *Runner) GetRunnerDeadline() time.Time {
return time.Now().Add(TaskDurationMaximum * 3)
}

func (r *Runner) GetRunnerMaximumDuration() time.Duration {
return TaskDurationMaximum
}

func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool {
now := time.Now()
tsk.ClearError()

serverSessionToken, err := r.authClient.ServerSessionToken()
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to get server session token"))
}

ctx = auth.NewContextWithServerSessionToken(ctx, serverSessionToken)

// Get the list of all existing EHR sync tasks
syncTasks, err := r.getSyncTasks(ctx)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to get sync tasks"))
}

// Get the list of all EHR enabled clinics
clinicsList, err := r.clinicsClient.ListEHREnabledClinics(ctx)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to list clinics"))
}

plan := GetReconciliationPlan(syncTasks, clinicsList)
r.reconcileTasks(ctx, tsk, plan)

if !tsk.IsFailed() {
tsk.RepeatAvailableAfter(AvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(AvailableAfterDurationMaximum-AvailableAfterDurationMinimum+1))))
}

if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum {
r.logger.WithField("taskDuration", taskDuration.Truncate(time.Millisecond).Seconds()).Warn("Task duration exceeds maximum")
}

return true
}

func (r *Runner) getSyncTasks(ctx context.Context) (map[string]task.Task, error) {
filter := task.TaskFilter{
Type: pointer.FromString(sync.Type),
}
pagination := page.Pagination{
Page: 0,
Size: 1000,
}

tasksByClinicId := map[string]task.Task{}
for {
tasks, err := r.taskClient.ListTasks(ctx, &filter, &pagination)
if err != nil {
return nil, errors.Wrap(err, "unable to list tasks")
}

for _, tsk := range tasks {
clinicId, err := sync.GetClinicId(tsk.Data)
if err != nil {
r.logger.Errorf("unable to get clinicId from task data (taskId %v): %v", tsk.ID, err)
continue
}
tasksByClinicId[clinicId] = *tsk
}
if len(tasks) < pagination.Size {
break
} else {
pagination.Page++
}
}

return tasksByClinicId, nil
}

func (r *Runner) reconcileTasks(ctx context.Context, task *task.Task, plan ReconciliationPlan) {
for _, t := range plan.ToDelete {
if err := r.taskClient.DeleteTask(ctx, t.ID); err != nil {
task.AppendError(errors.Wrap(err, "unable to delete task"))
}
}
for _, t := range plan.ToCreate {
if _, err := r.taskClient.CreateTask(ctx, &t); err != nil {
task.AppendError(errors.Wrap(err, "unable to create task"))
}
}
}

type ReconciliationPlan struct {
ToCreate []task.TaskCreate
ToDelete []task.Task
}

func GetReconciliationPlan(syncTasks map[string]task.Task, clinics []api.Clinic) ReconciliationPlan {
toDelete := make([]task.Task, 0)
toCreate := make([]task.TaskCreate, 0)

// At the end of the loop syncTasks will contain only the tasks that need to be deleted,
// and toCreate will contain tasks for new clinics that need to be synced.
for _, clinic := range clinics {
clinicId := *clinic.Id

_, exists := syncTasks[clinicId]
if exists {
delete(syncTasks, clinicId)
} else {
create := sync.NewTaskCreate(clinicId)
toCreate = append(toCreate, *create)
}
}
for _, tsk := range syncTasks {
toDelete = append(toDelete, tsk)
}
return ReconciliationPlan{
ToCreate: toCreate,
ToDelete: toDelete,
}
}
20 changes: 20 additions & 0 deletions ehr/reconcile/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package reconcile

import (
"time"

"github.com/tidepool-org/platform/pointer"
"github.com/tidepool-org/platform/task"
)

const (
Type = "org.tidepool.ehr.reconcile"
)

func NewTaskCreate() *task.TaskCreate {
return &task.TaskCreate{
Name: pointer.FromString(Type),
Type: Type,
AvailableTime: pointer.FromAny(time.Now().UTC()),
}
}
68 changes: 68 additions & 0 deletions ehr/sync/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package sync

import (
"context"
"math/rand"
"time"

"github.com/tidepool-org/platform/clinics"
"github.com/tidepool-org/platform/errors"
"github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/task"
)

const (
AvailableAfterDurationMaximum = 75 * time.Minute
AvailableAfterDurationMinimum = 45 * time.Minute
TaskDurationMaximum = 5 * time.Minute
)

type Runner struct {
clinicsClient clinics.Client
logger log.Logger
}

func NewRunner(clinicsClient clinics.Client, logger log.Logger) (*Runner, error) {
return &Runner{
clinicsClient: clinicsClient,
logger: logger,
}, nil
}

func (r *Runner) GetRunnerType() string {
return Type
}

func (r *Runner) GetRunnerDeadline() time.Time {
return time.Now().Add(TaskDurationMaximum * 3)
}

func (r *Runner) GetRunnerMaximumDuration() time.Duration {
return TaskDurationMaximum
}

func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool {
now := time.Now()
tsk.ClearError()

clinicId, err := GetClinicId(tsk.Data)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to get clinicId from task data"))
return true
}

err = r.clinicsClient.SyncEHRData(ctx, clinicId)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to sync ehr data"))
}

if !tsk.IsFailed() {
tsk.RepeatAvailableAfter(AvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(AvailableAfterDurationMaximum-AvailableAfterDurationMinimum+1))))
}

if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum {
r.logger.WithField("taskDuration", taskDuration.Truncate(time.Millisecond).Seconds()).Warn("Task duration exceeds maximum")
}

return true
}
36 changes: 36 additions & 0 deletions ehr/sync/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package sync

import (
"fmt"
"time"

"github.com/tidepool-org/platform/pointer"
"github.com/tidepool-org/platform/task"
)

const (
Type = "org.tidepool.ehr.sync"
)

func TaskName(clinicId string) string {
return fmt.Sprintf("%s:%s", Type, clinicId)
}

func NewTaskCreate(clinicId string) *task.TaskCreate {
return &task.TaskCreate{
Name: pointer.FromString(TaskName(clinicId)),
Type: Type,
AvailableTime: pointer.FromAny(time.Now().UTC()),
Data: map[string]interface{}{
"clinicId": clinicId,
},
}
}

func GetClinicId(data map[string]interface{}) (string, error) {
clinicId, ok := data["clinicId"].(string)
if !ok {
return "", fmt.Errorf("unable to get clinicId from task data")
}
return clinicId, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.2
github.com/rinchsan/device-check-go v1.2.3
github.com/tidepool-org/clinic/client v0.0.0-20211118205743-020bf46ac989
github.com/tidepool-org/clinic/client v0.0.0-20230815132146-bd6c2982ff6d
github.com/tidepool-org/devices/api v0.0.0-20220914225528-c7373eb1babc
github.com/tidepool-org/go-common v0.9.0
github.com/tidepool-org/hydrophone/client v0.0.0-20221219223301-92bd47a8a11c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tidepool-org/clinic/client v0.0.0-20211118205743-020bf46ac989 h1:qmL+ueYSUU53YiktJjxRk008uxTyDGsMr4z/RAa9ScQ=
github.com/tidepool-org/clinic/client v0.0.0-20211118205743-020bf46ac989/go.mod h1:eduhUZw6oOhrtt2C57RGn4rYq9CoCX8ucwDV0PmxSF4=
github.com/tidepool-org/clinic/client v0.0.0-20230815132146-bd6c2982ff6d h1:tlkqNyucYC1AXszmDJPX6O+ww+NmtLFdQCjUAyQtFWU=
github.com/tidepool-org/clinic/client v0.0.0-20230815132146-bd6c2982ff6d/go.mod h1:eduhUZw6oOhrtt2C57RGn4rYq9CoCX8ucwDV0PmxSF4=
github.com/tidepool-org/devices/api v0.0.0-20220914225528-c7373eb1babc h1:QLsASXo2G8RRm9C6/5Wf7+4iQK6dJQlqcVQFax0AunA=
github.com/tidepool-org/devices/api v0.0.0-20220914225528-c7373eb1babc/go.mod h1:hiVnAb182K2eV2/ZqZGhi3v3qK7qJhBuDE4bR0HvIcE=
github.com/tidepool-org/go-common v0.9.0 h1:iCQNusSj4kRJC66mDxz8KSibBZG7IgejY0+7Jw3LwRc=
Expand Down
Loading

0 comments on commit dbe08e0

Please sign in to comment.