Skip to content

Commit

Permalink
Revert "Fix callback stale check (#623)" (#648)
Browse files Browse the repository at this point in the history
This reverts commit d8bd747.
  • Loading branch information
thomshutt authored May 3, 2023
1 parent 6918127 commit 2c7589b
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 33 deletions.
13 changes: 7 additions & 6 deletions clients/callback_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/livepeer/catalyst-api/metrics"
)

const MAX_TIME_WITHOUT_UPDATE = 30 * time.Minute

// The default client is only used for the recording event. This is to avoid
// misusing the singleton client to send transcode status updates, which should
// be sent through the JobInfo.ReportStatus function instead.
Expand All @@ -40,7 +42,6 @@ type PeriodicCallbackClient struct {
httpClient *http.Client
callbackInterval time.Duration
headers map[string]string
staleTimeout time.Duration
}

func NewPeriodicCallbackClient(callbackInterval time.Duration, headers map[string]string) *PeriodicCallbackClient {
Expand All @@ -59,7 +60,6 @@ func NewPeriodicCallbackClient(callbackInterval time.Duration, headers map[strin
requestIDToLatestMessage: map[string]TranscodeStatusMessage{},
mapLock: sync.RWMutex{},
headers: headers,
staleTimeout: MaxCopyFileDuration,
}
}

Expand Down Expand Up @@ -105,7 +105,7 @@ func (pcc *PeriodicCallbackClient) SendTranscodeStatus(tsm TranscodeStatusMessag
}

log.Log(tsm.RequestID, "Updated transcode status",
"timestamp", tsm.Timestamp.UnixMilli(), "status", tsm.Status, "completion_ratio", tsm.CompletionRatio,
"timestamp", tsm.Timestamp, "status", tsm.Status, "completion_ratio", tsm.CompletionRatio,
"error", tsm.Error)
}

Expand Down Expand Up @@ -139,13 +139,14 @@ func (pcc *PeriodicCallbackClient) SendCallbacks() {
log.LogNoRequestID(fmt.Sprintf("Sending %d callbacks", len(pcc.requestIDToLatestMessage)))
for _, tsm := range pcc.requestIDToLatestMessage {
// Check timestamp and give up on the job if we haven't received an update for a long time
if tsm.Timestamp.Before(config.Clock.GetTime().Add(-pcc.staleTimeout)) {
cutoff := int64(config.Clock.GetTimestampUTC() - MAX_TIME_WITHOUT_UPDATE.Milliseconds())
if tsm.Timestamp < cutoff {
delete(pcc.requestIDToLatestMessage, tsm.RequestID)
log.Log(
tsm.RequestID,
"timed out waiting for callback updates",
"last_timestamp", tsm.Timestamp.UnixMilli(),
)
"last_timestamp", tsm.Timestamp,
"cutoff_timestamp", cutoff)
continue
}

Expand Down
9 changes: 4 additions & 5 deletions clients/callback_client_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package clients
import (
"encoding/json"
"fmt"
"time"

"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/video"
Expand Down Expand Up @@ -78,7 +77,7 @@ type TranscodeStatusMessage struct {
RequestID string `json:"request_id"`
CompletionRatio float64 `json:"completion_ratio"` // No omitempty or we lose this for 0% completion case
Status TranscodeStatus `json:"status"`
Timestamp time.Time `json:"timestamp"`
Timestamp int64 `json:"timestamp"`

// Only used for the "Error" status message
Error string `json:"error,omitempty"`
Expand All @@ -97,7 +96,7 @@ func NewTranscodeStatusProgress(url, requestID string, status TranscodeStatus, c
RequestID: requestID,
CompletionRatio: OverallCompletionRatio(status, currentStageCompletionRatio),
Status: status,
Timestamp: config.Clock.GetTime(),
Timestamp: config.Clock.GetTimestampUTC(),
}
}

Expand All @@ -108,7 +107,7 @@ func NewTranscodeStatusError(url, requestID, errorMsg string, unretriable bool)
Error: errorMsg,
Unretriable: unretriable,
Status: TranscodeStatusError,
Timestamp: config.Clock.GetTime(),
Timestamp: config.Clock.GetTimestampUTC(),
}
}

Expand All @@ -119,7 +118,7 @@ func NewTranscodeStatusCompleted(url, requestID string, iv video.InputVideo, ov
CompletionRatio: OverallCompletionRatio(TranscodeStatusCompleted, 1),
RequestID: requestID,
Status: TranscodeStatusCompleted,
Timestamp: config.Clock.GetTime(),
Timestamp: config.Clock.GetTimestampUTC(),
Type: "video", // Assume everything is a video for now
InputVideo: iv,
Outputs: ov,
Expand Down
17 changes: 0 additions & 17 deletions clients/callback_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,6 @@ func TestItSendsPeriodicHeartbeats(t *testing.T) {
require.Equal(t, int64(1), atomic.LoadInt64(&tries), "Expected the client to have sent 1 status within the timeframe")
}

func TestItTimesOutOldUpdates(t *testing.T) {
// Counter for the number of retries we've done
var tries int64
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&tries, 1)
w.WriteHeader(http.StatusOK)
}))
defer svr.Close()

client := NewPeriodicCallbackClient(100*time.Millisecond, map[string]string{}).Start()
client.staleTimeout = time.Millisecond
client.SendTranscodeStatus(NewTranscodeStatusProgress(svr.URL, "example-request-id", TranscodeStatusTranscoding, 1))

time.Sleep(200 * time.Millisecond)
require.Equal(t, int64(0), atomic.LoadInt64(&tries))
}

func TestTranscodeStatusErrorNotifcation(t *testing.T) {
// Set up a dummy server to receive the callbacks
var requestCount int64
Expand Down
10 changes: 5 additions & 5 deletions config/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ package config
import "time"

type TimestampGenerator interface {
GetTime() time.Time
GetTimestampUTC() int64
}

type RealTimestampGenerator struct{}

func (t RealTimestampGenerator) GetTime() time.Time {
return time.Now()
func (t RealTimestampGenerator) GetTimestampUTC() int64 {
return time.Now().Unix()
}

type FixedTimestampGenerator struct {
Timestamp time.Time
Timestamp int64
}

func (t FixedTimestampGenerator) GetTime() time.Time {
func (t FixedTimestampGenerator) GetTimestampUTC() int64 {
return t.Timestamp
}

0 comments on commit 2c7589b

Please sign in to comment.