Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send overall (rather than per-stage) completion ratio to Studio #33

Merged
merged 4 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 44 additions & 5 deletions clients/callback_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ func (c CallbackClient) DoWithRetries(r *http.Request) error {
return nil
}

func (c CallbackClient) SendTranscodeStatus(url string, status TranscodeStatus, completionRatio float32) error {
// Sends a Transcode Status message to the Client (initially just Studio)
// The status strings will be useful for debugging where in the workflow we got to, but everything
// in Studio will be driven off the overall "Completion Ratio".
// This method will accept the completion ratio of the current stage and will translate that into the overall ratio
func (c CallbackClient) SendTranscodeStatus(url string, status TranscodeStatus, currentStageCompletionRatio float64) error {
tsm := TranscodeStatusMessage{
CompletionRatio: completionRatio,
CompletionRatio: overallCompletionRatio(status, currentStageCompletionRatio),
Status: status.String(),
Timestamp: config.Clock.GetTimestampUTC(),
}
Expand Down Expand Up @@ -86,21 +90,54 @@ func (c CallbackClient) SendTranscodeStatusError(callbackURL, errorMsg string) e
return c.DoWithRetries(r)
}

// Calculate the overall completion ratio based on the completion ratio of the current stage.
// The weighting will need to be tweaked as we understand better the relative time spent in the
// segmenting vs. transcoding stages.
func overallCompletionRatio(status TranscodeStatus, currentStageCompletionRatio float64) float64 {
// Sanity check the inputs are within the 0-1 bounds
if currentStageCompletionRatio < 0 {
currentStageCompletionRatio = 0
}
if currentStageCompletionRatio > 1 {
currentStageCompletionRatio = 1
}

// Define the "base" numbers - e.g the overall ratio we start each stage at
var TranscodeStatusPreparingBase float64 = 0.0
var TranscodeStatusPreparingTranscodingBase float64 = 0.4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be only "Transcoding"?

Suggested change
var TranscodeStatusPreparingTranscodingBase float64 = 0.4
var TranscodeStatusTranscodingBase float64 = 0.4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "Base" was trying to signify that this is the lower limit / floor / whatever, open to suggestions for better names though. Maybe "Floor"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realised what you were actually pointing out 👓

var TranscodeStatusCompletedBase float64 = 1

switch status {
case TranscodeStatusPreparing:
return TranscodeStatusPreparingBase + (currentStageCompletionRatio * (TranscodeStatusPreparingTranscodingBase - TranscodeStatusPreparingBase))
case TranscodeStatusPreparingCompleted:
return TranscodeStatusPreparingTranscodingBase
case TranscodeStatusTranscoding:
return TranscodeStatusPreparingTranscodingBase + (currentStageCompletionRatio * (TranscodeStatusCompletedBase - TranscodeStatusPreparingTranscodingBase))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way of implementing this, to have the more complex math only in 1 place, would be to have 2 vars "start" and "end" and force the currentStageCompletionRatio to be 1 on the "Completed" steps (but use their same start/end as the non-completed). Sth like

	switch status {
	case TranscodeStatusPreparing, TranscodeStatusPreparingCompleted:
		start, end = 0, 0.4
	case TranscodeStatusTranscoding, TranscodeStatusTranscodingCompleted:
		start, end = 0.4, 1
        }
        if (status == TranscodeStatusPreparingCompleted || status == TranscodeStatusTranscodingCompleted) {
                currentStageCompletionRatio = 1
        }

Then there could be a single progress-scaling function like this: https://github.com/livepeer/task-runner/blob/4d1ae1768c01e979969bf7f00b81267db05e299c/task/progress.go#L66

Certainly not a required change, just tripping on the impl. Could make sense when we start adding more steps here. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that a lot!

case TranscodeStatusCompleted:
return TranscodeStatusCompletedBase
default:
// Either unhandled or an error
return -1
}
}

// An enum of potential statuses a Transcode job can have

type TranscodeStatus int

const (
TranscodeStatusPreparing TranscodeStatus = iota
TranscodeStatusPreparingCompleted
TranscodeStatusTranscoding
TranscodeStatusCompleted
TranscodeStatusError
)

type TranscodeStatusMessage struct {
CompletionRatio float32 `json:"completion_ratio,omitempty"`
CompletionRatio float64 `json:"completion_ratio"` // No omitempty or we lose this for 0% completion case
Error string `json:"error,omitempty"`
Retriable bool `json:"retriable,omitempty"`
Retriable *bool `json:"retriable,omitempty"` // Has to be a pointer or we can't differentiate omission from 'false'
Status string `json:"status,omitempty"`
Timestamp int64 `json:"timestamp"`
}
Expand All @@ -109,10 +146,12 @@ func (ts TranscodeStatus) String() string {
switch ts {
case TranscodeStatusPreparing:
return "preparing"
case TranscodeStatusPreparingCompleted:
return "preparing-completed"
case TranscodeStatusTranscoding:
return "transcoding"
case TranscodeStatusCompleted:
return "completed"
return "transcoding-completed"
case TranscodeStatusError:
return "error"
}
Expand Down
25 changes: 22 additions & 3 deletions clients/callback_client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clients

import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand All @@ -22,7 +23,7 @@ func TestItRetriesOnFailedCallbacks(t *testing.T) {
// Check that we got the callback we're expecting
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.JSONEq(t, `{"completion_ratio":1, "status":"completed", "timestamp": 123456789}`, string(body))
require.JSONEq(t, `{"completion_ratio":1, "status":"transcoding-completed", "timestamp": 123456789}`, string(body))

// Return HTTP error codes the first two times
tries += 1
Expand Down Expand Up @@ -54,7 +55,7 @@ func TestItEventuallyStopsRetrying(t *testing.T) {
// Check that we got the callback we're expecting
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.JSONEq(t, `{"completion_ratio":1, "status":"completed", "timestamp": 123456789}`, string(body))
require.JSONEq(t, `{"completion_ratio":1, "status":"transcoding-completed", "timestamp": 123456789}`, string(body))

tries += 1

Expand All @@ -81,7 +82,7 @@ func TestTranscodeStatusErrorNotifcation(t *testing.T) {
// Check that we got the callback we're expecting
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.JSONEq(t, `{"error": "something went wrong", "status":"error", "timestamp": 123456789}`, string(body))
require.JSONEq(t, `{"completion_ratio": 0, "error": "something went wrong", "status":"error", "timestamp": 123456789}`, string(body))

w.WriteHeader(http.StatusOK)
}))
Expand All @@ -91,3 +92,21 @@ func TestTranscodeStatusErrorNotifcation(t *testing.T) {
client := NewCallbackClient()
require.NoError(t, client.SendTranscodeStatusError(svr.URL, "something went wrong"))
}

func TestItCalculatesTheOverallCompletionRatioCorrectly(t *testing.T) {
testCases := []struct {
status TranscodeStatus
completionRatio float64
expectedOverallCompletionRatio float64
}{
{TranscodeStatusPreparing, 0.5, 0.2}, // Half complete in the Preparing stage (i.e half way between 0 and 0.4)
{TranscodeStatusPreparingCompleted, 1234, 0.4}, // Preparing Completed should always == 0.4 for now, regardless of what's reported as the stage ratio
{TranscodeStatusTranscoding, 0.5, 0.7}, // Half complete in the Transcoding stage (i.e half way between 0.4 and 1)
{TranscodeStatusCompleted, 5678, 1}, // Completed should always == 1, regardless of what's reported as the stage ratio
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("%f in %s", tc.completionRatio, tc.status), func(t *testing.T) {
require.Equal(t, tc.expectedOverallCompletionRatio, overallCompletionRatio(tc.status, tc.completionRatio))
})
}
}