Skip to content

Commit

Permalink
record-tester: Add support for copy-only recordings (#356)
Browse files Browse the repository at this point in the history
* go.mod: Update go-api-client

* recordtester: Allow configuring recording spec

* rt: Check for the right nunmber of profiles

* recordtester: Fix checking of source ready

* go.mod: Update to merged go-api-client
  • Loading branch information
victorges authored Jun 12, 2024
1 parent c62f64e commit 9005dda
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 52 deletions.
9 changes: 9 additions & 0 deletions cmd/recordtester/recordtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func main() {
testTranscode := fs.Bool("transcode", false, "Check Transcode API workflow")
catalystPipelineStrategy := fs.String("catalyst-pipeline-strategy", "", "Which catalyst pipeline strategy to use regarding. The appropriate values are defined by catalyst-api itself.")
recordObjectStoreId := fs.String("record-object-store-id", "", "ID for the Object Store to use for recording storage. Forwarded to the streams created in the API")
recordingSpecStr := fs.String("recording-spec", "", "JSON object with the `recordingSpec` field to use in the test streams. Forwarded to the streams created in the API")

// Discord related flags
discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel")
Expand Down Expand Up @@ -275,6 +276,13 @@ func main() {
}
}

var recordingSpec *api.RecordingSpec
if *recordingSpecStr != "" {
if err := json.Unmarshal([]byte(*recordingSpecStr), &recordingSpec); err != nil {
glog.Fatalf("Error parsing --recording-spec argument: %v", err)
}
}

serfMembers, err := getSerfMembers(*useSerf, *serfRPCAddr)
if err != nil {
glog.Fatalf("failed to process serf members: %v", err)
Expand Down Expand Up @@ -338,6 +346,7 @@ func main() {
Analyzers: lanalyzers,
Ingest: ingest,
RecordObjectStoreId: *recordObjectStoreId,
RecordingSpec: recordingSpec,
UseForceURL: *forceRecordingUrl,
RecordingWaitTime: *recordingWaitTime,
UseHTTP: *useHttp,
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ require (
github.com/Necroforger/dgrouter v0.0.0-20200517224846-e66453b957c1
github.com/PagerDuty/go-pagerduty v1.7.0
github.com/bwmarrin/discordgo v0.27.1
github.com/golang/glog v1.1.2
github.com/golang/glog v1.2.1
github.com/gosuri/uilive v0.0.3 // indirect
github.com/gosuri/uiprogress v0.0.1
github.com/livepeer/go-api-client v0.4.14-0.20231130155418-dd87e78bea93
github.com/livepeer/go-api-client v0.4.24-0.20240607131835-949d242a631b
github.com/livepeer/go-livepeer v0.7.2-0.20231110152159-b17a70dfe719
github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07
github.com/livepeer/leaderboard-serverless v1.0.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo=
github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ=
github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4=
github.com/golang/glog v1.2.1/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -555,8 +555,8 @@ github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4n
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/livepeer/catalyst-api v0.1.1 h1:WP4rHH88b+lsxo33wPCjl0yvqVDNyxkleZH1sA0M5GE=
github.com/livepeer/catalyst-api v0.1.1/go.mod h1:d6XPE9ehhCutWhCqqcmlYqQa+e9bf3Ke92x+gRZlzoQ=
github.com/livepeer/go-api-client v0.4.14-0.20231130155418-dd87e78bea93 h1:vQYapLFJ9EyRWTjOsJr1ullF0wiazRme2fSJDZnFrIs=
github.com/livepeer/go-api-client v0.4.14-0.20231130155418-dd87e78bea93/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-api-client v0.4.24-0.20240607131835-949d242a631b h1:J8cWLpnTINGAWVPU503SnUhTmuDhXnm9QxpC/CFGk2k=
github.com/livepeer/go-api-client v0.4.24-0.20240607131835-949d242a631b/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-livepeer v0.7.2-0.20231110152159-b17a70dfe719 h1:468kFmwQFaI00eNCLL8qA5XuIBMwqqVgKEXvqS7msa8=
github.com/livepeer/go-livepeer v0.7.2-0.20231110152159-b17a70dfe719/go.mod h1:d6qTStiNmXTQ/5YLB9fhzgDV9MdXg3KmqESQpur2Ak0=
github.com/livepeer/go-tools v0.3.0 h1:xK0mJyPWWyvj9Oi9nfLglhCtk0KM8883WB7VO1oPF8g=
Expand Down
153 changes: 107 additions & 46 deletions internal/app/recordtester/recordtester_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/rand"
"net/http"
"os"
"strings"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -51,6 +52,7 @@ type (
Analyzers testers.AnalyzerByRegion
Ingest *api.Ingest
RecordObjectStoreId string
RecordingSpec *api.RecordingSpec
UseForceURL bool
RecordingWaitTime time.Duration
UseHTTP bool
Expand All @@ -65,6 +67,7 @@ type (
lanalyzers testers.AnalyzerByRegion
ingest *api.Ingest
recordObjectStoreId string
recordingSpec *api.RecordingSpec
useForceURL bool
recordingWaitTime time.Duration
useHTTP bool
Expand All @@ -89,6 +92,7 @@ func NewRecordTester(gctx context.Context, opts RecordTesterOptions, serfOpts Se
ctx: ctx,
cancel: cancel,
recordObjectStoreId: opts.RecordObjectStoreId,
recordingSpec: opts.RecordingSpec,
useForceURL: opts.UseForceURL,
recordingWaitTime: opts.RecordingWaitTime,
useHTTP: opts.UseHTTP,
Expand Down Expand Up @@ -132,7 +136,12 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
streamName := fmt.Sprintf("%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00"))
var stream *api.Stream
for {
stream, err = rt.lapi.CreateStream(api.CreateStreamReq{Name: streamName, Record: true, RecordObjectStoreId: rt.recordObjectStoreId})
stream, err = rt.lapi.CreateStream(api.CreateStreamReq{
Name: streamName,
Record: true,
RecordingSpec: rt.recordingSpec,
RecordObjectStoreId: rt.recordObjectStoreId,
})
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
apiTry++
Expand Down Expand Up @@ -247,6 +256,16 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if err := rt.isCancelled(); err != nil {
return 0, err
}

lapiNoAPIKey := api.NewAPIClient(api.ClientOptions{
Server: rt.lapi.GetServer(),
AccessToken: "", // test playback info call without API key
Timeout: 8 * time.Second,
})
if code, err := checkPlaybackInfo(stream.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil {
return code, err
}

glog.Infof("Waiting 10 seconds. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)
time.Sleep(10 * time.Second)
// now get sessions
Expand Down Expand Up @@ -284,33 +303,84 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
}

glog.Infof("Streaming done, waiting for recording URL to appear. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)

deadline := time.Now().Add(rt.recordingWaitTime)
if rt.useForceURL {
deadline = time.Now().Add(5 * time.Second)
}

// For checking if sourcePlayback was available we see if at least 1 session
// recording (asset) got a playbackUrl before the processing was done.
var sourcePlayback bool
for errCode, errs := -1, []error{}; errCode != 0; {
if time.Now().After(deadline) {
errsStrs := make([]string, len(errs))
for i, err := range errs {
errsStrs[i] = err.Error()
}
err := fmt.Errorf("timeout waiting for recording URL to appear: %s", strings.Join(errsStrs, "; "))
return errCode, err
} else if err = rt.isCancelled(); err != nil {
return 0, err
}
time.Sleep(5 * time.Second)
} else {
time.Sleep(rt.recordingWaitTime)

errCode, errs = 0, nil
for _, sess := range sessions {
// currently the assetID is the same as the sessionID so we could just query on that but just in case that
// ever changes, we can use the ListAssets call to find the asset
assets, _, err := rt.lapi.ListAssets(api.ListOptions{
Limit: 1,
Filters: map[string]interface{}{
"sourceSessionId": sess.ID,
},
})
if err != nil {
errCode, errs = 248, append(errs, err)
continue
}

if len(assets) != 1 {
err := fmt.Errorf("unexpected number of assets. expected: 1 actual: %d", len(assets))
errCode, errs = 247, append(errs, err)
continue
}
asset := assets[0]

if code, err := checkPlaybackInfo(asset.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil {
errCode, errs = code, append(errs, err)
} else {
// if we get playback before the processing is done it means source playback was provided
if asset.Status.Phase != "ready" {
sourcePlayback = true
}
}

if asset.Status.Phase != "ready" {
err := fmt.Errorf("asset status is %s but should be ready", asset.Status.Phase)
errCode, errs = 246, append(errs, err)
}
}
}
if err = rt.isCancelled(); err != nil {
return 0, err
if !sourcePlayback {
return 246, errors.New("source playback was not provided")
}

sessions, err = rt.lapi.GetSessionsNew(stream.ID, rt.useForceURL)
// check actual recordings playback
sessions, err = rt.lapi.GetSessionsNew(stream.ID, false)
if err != nil {
err := fmt.Errorf("error getting sessions for stream id=%s err=%v", stream.ID, err)
glog.Errorf("Error getting sessions err=%v streamId=%s playbackId=%s", err, stream.ID, stream.PlaybackID)
return 252, err
}
glog.V(model.DEBUG).Infof("Sessions: %+v streamId=%s playbackId=%s", sessions, stream.ID, stream.PlaybackID)
if err = rt.isCancelled(); err != nil {
return 0, err
}

lapiNoAPIKey := api.NewAPIClient(api.ClientOptions{
Server: rt.lapi.GetServer(),
AccessToken: "", // test playback info call without API key
Timeout: 8 * time.Second,
})
if code, err := checkPlaybackInfo(stream.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil {
return code, err
if len(sessions) != expectedSessions {
err := fmt.Errorf("invalid session count, expected %d but got %d",
expectedSessions, len(sessions))
glog.Error(err)
return 251, err
}

for _, sess := range sessions {
statusShould := api.RecordingStatusReady
if rt.useForceURL {
Expand All @@ -330,40 +400,21 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
return 0, err
}
if rt.mp4 {
es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration)
es, err := rt.checkRecordingMp4(stream, sess.Mp4Url, testDuration)
if err != nil {
return es, err
}
}

es, err := rt.checkDown(stream, sess.RecordingURL, testDuration)
if err != nil {
return es, err
if err = rt.isCancelled(); err != nil {
return 0, err
}

// currently the assetID is the same as the sessionID so we could just query on that but just in case that
// ever changes, we can use the ListAssets call to find the asset
assets, _, err := rt.lapi.ListAssets(api.ListOptions{
Limit: 1,
Filters: map[string]interface{}{
"sourceSessionId": sess.ID,
},
})
es, err := rt.checkRecordingHls(stream, sess.RecordingURL, testDuration)
if err != nil {
return 248, err
}

if len(assets) != 1 {
return 247, fmt.Errorf("unexpected number of assets. expected: 1 actual: %d", len(assets))
}
if !assets[0].SourcePlaybackReady {
return 246, fmt.Errorf("source playback was not ready")
}

if code, err := checkPlaybackInfo(assets[0].PlaybackID, rt.lapi, lapiNoAPIKey); err != nil {
return code, err
return es, err
}
}

glog.Infof("Done Record Test. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)

rt.lapi.DeleteStream(stream.ID)
Expand Down Expand Up @@ -418,7 +469,13 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str
var err error
apiTry := 0
for {
session, err = rt.lapi.CreateStream(api.CreateStreamReq{Name: streamName, Record: true, RecordObjectStoreId: rt.recordObjectStoreId, ParentID: stream.ID})
session, err = rt.lapi.CreateStream(api.CreateStreamReq{
Name: streamName,
Record: true,
RecordingSpec: rt.recordingSpec,
RecordObjectStoreId: rt.recordObjectStoreId,
ParentID: stream.ID,
})
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
apiTry++
Expand Down Expand Up @@ -449,7 +506,7 @@ func (rt *recordTester) isCancelled() error {
return nil
}

func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
func (rt *recordTester) checkRecordingMp4(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
es := 0
started := time.Now()
glog.V(model.VERBOSE).Infof("Downloading mp4 url=%s streamId=%s playbackId=%s", url, stream.ID, stream.PlaybackID)
Expand Down Expand Up @@ -500,7 +557,7 @@ func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDurat
return es, nil
}

func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
func (rt *recordTester) checkRecordingHls(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
es := 0
started := time.Now()
downloader := testers.NewM3utester2(rt.ctx, url, false, false, false, false, 5*time.Second, nil, false)
Expand All @@ -511,7 +568,11 @@ func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration
}
vs := downloader.VODStats()
rt.vodStats = vs
if len(vs.SegmentsNum) != len(api.StandardProfiles)+1 {
expectedProfiles := len(api.StandardProfiles) + 1
if rt.recordingSpec != nil && rt.recordingSpec.Profiles != nil {
expectedProfiles = len(*rt.recordingSpec.Profiles) + 1
}
if len(vs.SegmentsNum) != expectedProfiles {
glog.Warningf("Number of renditions doesn't match! Has %d should %d. streamId=%s playbackId=%s", len(vs.SegmentsNum), len(api.StandardProfiles)+1, stream.ID, stream.PlaybackID)
es = 35
}
Expand Down

0 comments on commit 9005dda

Please sign in to comment.