Skip to content

Commit

Permalink
Merge pull request #364 from Zibbp/fixes-2024-02-08
Browse files Browse the repository at this point in the history
fix(exec): check if livechatworkflowid is populated
  • Loading branch information
Zibbp authored Feb 9, 2024
2 parents 215e0be + d878420 commit fe579f5
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
6 changes: 4 additions & 2 deletions internal/activities/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ func DownloadTwitchLiveThumbnails(ctx context.Context, input dto.ArchiveVideoInp
if dbErr != nil {
return dbErr
}
// stream isn't live so archive shouldn't continue and should be cleaned up
// TODO: clean up entire archive thus far
return temporal.NewApplicationError(fmt.Sprintf("no stream found for channel %s", input.Channel.Name), "", nil)
}

Expand Down Expand Up @@ -344,7 +346,7 @@ func DownloadTwitchLiveVideo(ctx context.Context, input dto.ArchiveVideoInput, c
_, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskVideoDownload(utils.Failed).Save(ctx)
if dbErr != nil {
stopHeartbeat <- true
return dbErr
return temporal.NewApplicationError(err.Error(), "", nil)
}
stopHeartbeat <- true
return temporal.NewApplicationError(err.Error(), "", nil)
Expand All @@ -353,7 +355,7 @@ func DownloadTwitchLiveVideo(ctx context.Context, input dto.ArchiveVideoInput, c
_, dbErr = database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskVideoDownload(utils.Success).Save(ctx)
if dbErr != nil {
stopHeartbeat <- true
return dbErr
return temporal.NewApplicationError(err.Error(), "", nil)
}

// Update video duration with duration from downloaded video
Expand Down
24 changes: 13 additions & 11 deletions internal/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,17 +302,19 @@ func DownloadTwitchLiveVideo(ctx context.Context, v *ent.Vod, ch *ent.Channel, l
log.Debug().Msgf("streamlink live args: %v", cmdArgs)
log.Debug().Msgf("running: streamlink %s", strings.Join(cmdArgs, " "))

// Notify chat download that video download is about to start
log.Debug().Msg("notifying chat download that video download is about to start")

// !send signal to workflow to start chat download
temporal.InitializeTemporalClient()
signal := utils.ArchiveTwitchLiveChatStartSignal{
Start: true,
}
err := temporal.GetTemporalClient().Client.SignalWorkflow(ctx, liveChatWorkflowId, "", "start-chat-download", signal)
if err != nil {
return fmt.Errorf("error sending signal to workflow to start chat download: %w", err)
if liveChatWorkflowId != "" {
// Notify chat download that video download is about to start
log.Debug().Msg("notifying chat download that video download is about to start")

// !send signal to workflow to start chat download
temporal.InitializeTemporalClient()
signal := utils.ArchiveTwitchLiveChatStartSignal{
Start: true,
}
err := temporal.GetTemporalClient().Client.SignalWorkflow(ctx, liveChatWorkflowId, "", "start-chat-download", signal)
if err != nil {
return fmt.Errorf("error sending signal to workflow to start chat download: %w", err)
}
}

// Execute streamlink
Expand Down
8 changes: 5 additions & 3 deletions internal/workflows/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,11 @@ func DownloadTwitchLiveVideoWorkflow(ctx workflow.Context, input dto.ArchiveVide
}

// kill live chat download
err = workflow.ExecuteActivity(ctx, activities.KillTwitchLiveChatDownload, input).Get(ctx, nil)
if err != nil {
return workflowErrorHandler(err, input, "kill-chat-download")
if input.Queue.ChatProcessing {
err = workflow.ExecuteActivity(ctx, activities.KillTwitchLiveChatDownload, input).Get(ctx, nil)
if err != nil {
return workflowErrorHandler(err, input, "kill-chat-download")
}
}

// mark live channel as not live
Expand Down

0 comments on commit fe579f5

Please sign in to comment.