Skip to content

Commit

Permalink
feat: auto cleanup tasks over 30 days ago
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Jul 2, 2023
1 parent f42ee05 commit 0fc4a66
Showing 1 changed file with 40 additions and 1 deletion.
41 changes: 40 additions & 1 deletion task/scheduler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Service struct {

func (svc *Service) Start() {
go svc.initTaskStatus()
//go svc.DequeueAndSchedule()
go svc.cleanupTasks()
svc.Wait()
svc.Stop()
}
Expand Down Expand Up @@ -189,6 +189,45 @@ func (svc *Service) isMasterNode(t *models.Task) (ok bool, err error) {
return n.IsMaster, nil
}

func (svc *Service) cleanupTasks() {
for {
// task stats over 30 days ago
taskStats, err := svc.modelSvc.GetTaskStatList(bson.M{
"create_ts": bson.M{
"$lt": time.Now().Add(-30 * 24 * time.Hour),
},
}, nil)
if err != nil {
time.Sleep(30 * time.Minute)
continue
}

// task ids
var ids []primitive.ObjectID
for _, ts := range taskStats {
ids = append(ids, ts.Id)
}

if len(ids) > 0 {
// remove tasks
if err := svc.modelSvc.GetBaseService(interfaces.ModelIdTask).DeleteList(bson.M{
"_id": bson.M{"$in": ids},
}); err != nil {
trace.PrintError(err)
}

// remove task stats
if err := svc.modelSvc.GetBaseService(interfaces.ModelIdTaskStat).DeleteList(bson.M{
"_id": bson.M{"$in": ids},
}); err != nil {
trace.PrintError(err)
}
}

time.Sleep(30 * time.Minute)
}
}

func NewTaskSchedulerService(opts ...Option) (svc2 interfaces.TaskSchedulerService, err error) {
// base service
baseSvc, err := task.NewBaseService()
Expand Down

0 comments on commit 0fc4a66

Please sign in to comment.