Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transcode: trigger the transcode via remote or local broadcaster #98

Merged
merged 5 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type LivepeerTranscodeConfiguration struct {

type Credentials struct {
AccessToken string `json:"access_token"`
CustomAPIURL string `json:"custom_api_url"`
CustomAPIURL string `json:"custom_url"`
}

type BroadcasterList []struct {
Expand Down
8 changes: 7 additions & 1 deletion clients/broadcaster_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
"github.com/livepeer/catalyst-api/config"
)

// Currently only implemented by LocalBroadcasterClient
// TODO: Try to come up with a unified interface across Local and Remote
type BroadcasterClient interface {
TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []EncodedProfile, durationMillis int64) (TranscodeResult, error)
}

type LocalBroadcasterClient struct {
broadcasterURL url.URL
}
Expand All @@ -23,7 +29,7 @@ func NewLocalBroadcasterClient(broadcasterURL string) (LocalBroadcasterClient, e
}, nil
}

func (c *LocalBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, durationMillis int64, profiles []EncodedProfile) (TranscodeResult, error) {
func (c LocalBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []EncodedProfile, durationMillis int64) (TranscodeResult, error) {
conf := LivepeerTranscodeConfiguration{}
conf.Profiles = append(conf.Profiles, profiles...)
transcodeConfig, err := json.Marshal(&conf)
Expand Down
11 changes: 10 additions & 1 deletion clients/broadcaster_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@ type RemoteBroadcasterClient struct {
credentials Credentials
}

func (c *RemoteBroadcasterClient) TranscodeSegmentWithRemoteBroadcaster(segment io.Reader, sequenceNumber int64, durationMillis int64, profiles []EncodedProfile, streamName string) (TranscodeResult, error) {
func NewRemoteBroadcasterClient(credentials Credentials) (RemoteBroadcasterClient, error) {
if credentials.AccessToken == "" || credentials.CustomAPIURL == "" {
return RemoteBroadcasterClient{}, fmt.Errorf("error parsing credentials: empty access-token or api URL")
}
return RemoteBroadcasterClient{
credentials: credentials,
}, nil
}

func (c *RemoteBroadcasterClient) TranscodeSegmentWithRemoteBroadcaster(segment io.Reader, sequenceNumber int64, profiles []EncodedProfile, streamName string, durationMillis int64) (TranscodeResult, error) {
// Get available broadcasters
bList, err := findBroadcaster(c.credentials)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion clients/mist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type MistStreamInfoTrack struct {
Firstms int `json:"firstms,omitempty"`
Idx int `json:"idx,omitempty"`
Init string `json:"init,omitempty"`
Lastms int `json:"lastms,omitempty"`
Lastms int64 `json:"lastms,omitempty"`
Maxbps int `json:"maxbps,omitempty"`
Trackid int `json:"trackid,omitempty"`
Type string `json:"type,omitempty"`
Expand Down
33 changes: 9 additions & 24 deletions handlers/misttriggers/recording_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ import (
"fmt"
"math"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/livepeer/catalyst-api/cache"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/handlers"
"github.com/livepeer/catalyst-api/transcode"
)

Expand Down Expand Up @@ -76,7 +74,7 @@ func (d *MistCallbackHandlersCollection) triggerRecordingEndSegmenting(w http.Re
}

// Compare duration of source stream to the segmented stream to ensure the input file was completely segmented before attempting to transcode
var inputVideoLengthMillis int
var inputVideoLengthMillis int64
for track, trackInfo := range streamInfo.Meta.Tracks {
if strings.Contains(track, "video") {
inputVideoLengthMillis = trackInfo.Lastms
Expand All @@ -89,7 +87,7 @@ func (d *MistCallbackHandlersCollection) triggerRecordingEndSegmenting(w http.Re
}

si := cache.DefaultStreamCache.Segmenting.Get(p.StreamName)
transcodeRequest := handlers.TranscodeSegmentRequest{
transcodeRequest := transcode.TranscodeSegmentRequest{
SourceFile: si.SourceFile,
CallbackURL: si.CallbackURL,
AccessToken: si.AccessToken,
Expand All @@ -98,21 +96,8 @@ func (d *MistCallbackHandlersCollection) triggerRecordingEndSegmenting(w http.Re
UploadURL: si.UploadURL,
}

// Create a separate subdirectory for the transcoded renditions
segmentedUploadURL, err := url.Parse(transcodeRequest.UploadURL)
if err != nil {
_ = config.Logger.Log("msg", "failed to parse transcodeRequest.UploadURL", "error", err)
return
}
relativeTranscodeURL, err := url.Parse("transcoded/index.m3u8")
if err != nil {
_ = config.Logger.Log("msg", "failed to parse relativeTranscodeURL", "error", err)
return
}
transcodedManifestURL := segmentedUploadURL.ResolveReference(relativeTranscodeURL)

go func() {
err := transcode.RunTranscodeProcess(transcodeRequest.UploadURL, transcodedManifestURL.String(), transcodeRequest.Profiles, callbackUrl)
err := transcode.RunTranscodeProcess(transcodeRequest, p.StreamName, p.StreamMediaDurationMillis)
if err != nil {
_ = config.Logger.Log(
"msg", "RunTranscodeProcess returned an error",
Expand Down Expand Up @@ -150,9 +135,9 @@ type RecordingEndPayload struct {
WritingDurationSecs int
ConnectionStartTimeUnix int
ConnectionEndTimeUnix int
StreamMediaDurationMillis int
FirstMediaTimestampMillis int
LastMediaTimestampMillis int
StreamMediaDurationMillis int64
FirstMediaTimestampMillis int64
LastMediaTimestampMillis int64
}

func ParseRecordingEndPayload(payload string) (RecordingEndPayload, error) {
Expand Down Expand Up @@ -181,17 +166,17 @@ func ParseRecordingEndPayload(payload string) (RecordingEndPayload, error) {
return RecordingEndPayload{}, fmt.Errorf("error parsing line %d of RECORDING_END payload as an int. Line contents: %s. Error: %s", 6, lines[6], err)
}

StreamMediaDurationMillis, err := strconv.Atoi(lines[7])
StreamMediaDurationMillis, err := strconv.ParseInt(lines[7], 10, 64)
if err != nil {
return RecordingEndPayload{}, fmt.Errorf("error parsing line %d of RECORDING_END payload as an int. Line contents: %s. Error: %s", 7, lines[7], err)
}

FirstMediaTimestampMillis, err := strconv.Atoi(lines[8])
FirstMediaTimestampMillis, err := strconv.ParseInt(lines[8], 10, 64)
if err != nil {
return RecordingEndPayload{}, fmt.Errorf("error parsing line %d of RECORDING_END payload as an int. Line contents: %s. Error: %s", 8, lines[8], err)
}

LastMediaTimestampMillis, err := strconv.Atoi(lines[9])
LastMediaTimestampMillis, err := strconv.ParseInt(lines[9], 10, 64)
if err != nil {
return RecordingEndPayload{}, fmt.Errorf("error parsing line %d of RECORDING_END payload as an int. Line contents: %s. Error: %s", 9, lines[9], err)
}
Expand Down
6 changes: 3 additions & 3 deletions handlers/misttriggers/recording_end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ func TestItCanParseAValidRecordingEndPayload(t *testing.T) {
require.Equal(t, p.WritingDurationSecs, 5)
require.Equal(t, p.ConnectionStartTimeUnix, 6)
require.Equal(t, p.ConnectionEndTimeUnix, 7)
require.Equal(t, p.StreamMediaDurationMillis, 8)
require.Equal(t, p.FirstMediaTimestampMillis, 9)
require.Equal(t, p.LastMediaTimestampMillis, 10)
require.Equal(t, p.StreamMediaDurationMillis, int64(8))
require.Equal(t, p.FirstMediaTimestampMillis, int64(9))
require.Equal(t, p.LastMediaTimestampMillis, int64(10))
}

func TestItFailsToParseAnInvalidRecordingEndPayload(t *testing.T) {
Expand Down
26 changes: 4 additions & 22 deletions handlers/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,16 @@ import (
"net/http"

"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/transcode"
"github.com/xeipuuv/gojsonschema"
)

type TranscodeSegmentRequest struct {
SourceFile string `json:"source_location"`
CallbackURL string `json:"callback_url"`
UploadURL string `json:"upload_url"`
StreamKey string `json:"streamKey"`
AccessToken string `json:"accessToken"`
TranscodeAPIUrl string `json:"transcodeAPIUrl"`
Profiles []clients.EncodedProfile `json:"profiles"`
Detection struct {
Freq uint `json:"freq"`
SampleRate uint `json:"sampleRate"`
SceneClassification []struct {
Name string `json:"name"`
} `json:"sceneClassification"`
} `json:"detection"`
SourceStreamInfo clients.MistStreamInfo
}

func (d *CatalystAPIHandlersCollection) TranscodeSegment() httprouter.Handle {
schema := inputSchemasCompiled["TranscodeSegment"]

return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
var transcodeRequest TranscodeSegmentRequest
var transcodeRequest transcode.TranscodeSegmentRequest
payload, err := io.ReadAll(req.Body)
if err != nil {
errors.WriteHTTPInternalServerError(w, "Cannot read body", err)
Expand All @@ -54,8 +35,9 @@ func (d *CatalystAPIHandlersCollection) TranscodeSegment() httprouter.Handle {
return
}

// TODO: Do this asynchronously
err = transcode.RunTranscodeProcess(transcodeRequest.SourceFile, transcodeRequest.UploadURL, transcodeRequest.Profiles, transcodeRequest.CallbackURL)
// TODO: Do this asynchronously and pass valid stream-name and input file duration
// when the transcode api endpoint is accessed (only used for testing for now)
err = transcode.RunTranscodeProcess(transcodeRequest, "", 0)
if err != nil {
errors.WriteHTTPInternalServerError(w, "Error running Transcode process", err)
}
Expand Down
3 changes: 1 addition & 2 deletions transcode/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"

"github.com/grafov/m3u8"
"github.com/livepeer/catalyst-api/cache"
"github.com/livepeer/catalyst-api/clients"
)

Expand Down Expand Up @@ -58,7 +57,7 @@ func GetSourceSegmentURLs(sourceManifestURL string, manifest m3u8.MediaPlaylist)
}

// Generate a Master manifest, plus one Rendition manifest for each Profile we're transcoding, then write them to storage
func GenerateAndUploadManifests(sourceManifest m3u8.MediaPlaylist, targetManifestOSURL string, transcodeProfiles []cache.EncodedProfile) error {
func GenerateAndUploadManifests(sourceManifest m3u8.MediaPlaylist, targetManifestOSURL string, transcodeProfiles []clients.EncodedProfile) error {
// Generate the base target OS URL to which we can append filenames / subdirectories to
baseURL := targetManifestOSURL[:strings.LastIndex(targetManifestOSURL, "/")]

Expand Down
4 changes: 2 additions & 2 deletions transcode/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"

"github.com/grafov/m3u8"
"github.com/livepeer/catalyst-api/cache"
"github.com/livepeer/catalyst-api/clients"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -93,7 +93,7 @@ func TestItCanGenerateAndWriteManifests(t *testing.T) {
err = GenerateAndUploadManifests(
*sourceMediaPlaylist,
masterManifestPath,
[]cache.EncodedProfile{
[]clients.EncodedProfile{
{
Name: "lowlowlow",
FPS: 60,
Expand Down
81 changes: 74 additions & 7 deletions transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,32 @@ import (
"bytes"
"fmt"
"io"
"net/url"

"github.com/livepeer/catalyst-api/cache"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
)

type TranscodeSegmentRequest struct {
SourceFile string `json:"source_location"`
CallbackURL string `json:"callback_url"`
UploadURL string `json:"upload_url"`
StreamKey string `json:"streamKey"`
AccessToken string `json:"accessToken"`
TranscodeAPIUrl string `json:"transcodeAPIUrl"`
Profiles []clients.EncodedProfile `json:"profiles"`
Detection struct {
Freq uint `json:"freq"`
SampleRate uint `json:"sampleRate"`
SceneClassification []struct {
Name string `json:"name"`
} `json:"sceneClassification"`
} `json:"detection"`
SourceStreamInfo clients.MistStreamInfo
}

// The default set of encoding profiles to use when none are specified
var defaultTranscodeProfiles = []cache.EncodedProfile{
var defaultTranscodeProfiles = []clients.EncodedProfile{
{
Name: "720p",
Bitrate: 2000000,
Expand All @@ -28,15 +46,42 @@ var defaultTranscodeProfiles = []cache.EncodedProfile{
},
}

func RunTranscodeProcess(sourceManifestOSURL, targetManifestOSURL string, transcodeProfiles []cache.EncodedProfile, callbackURL string) error {
_ = config.Logger.Log("msg", "RunTranscodeProcess (v2) Beginning", "source", sourceManifestOSURL, "target", targetManifestOSURL)
var localBroadcasterClient clients.BroadcasterClient

func init() {
b, err := clients.NewLocalBroadcasterClient(config.DefaultBroadcasterURL)
if err != nil {
panic(fmt.Sprintf("Error initialising Local Broadcaster Client with URL %q: %s", config.DefaultBroadcasterURL, err))
}
localBroadcasterClient = b
}

func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName string, durationMillis int64) error {
_ = config.Logger.Log("msg", "RunTranscodeProcess (v2) Beginning", "source", transcodeRequest.SourceFile, "target", transcodeRequest.UploadURL)

// Create a separate subdirectory for the transcoded renditions
segmentedUploadURL, err := url.Parse(transcodeRequest.UploadURL)
if err != nil {
return fmt.Errorf("failed to parse transcodeRequest.UploadURL: %s", err)
}
relativeTranscodeURL, err := url.Parse("transcoded/index.m3u8")
if err != nil {
return fmt.Errorf("failed to parse relativeTranscodeURL: %s", err)
}

targetManifestOSURL := segmentedUploadURL.ResolveReference(relativeTranscodeURL)
// Grab some useful parameters to be used later from the TranscodeSegmentRequest
sourceManifestOSURL := transcodeRequest.UploadURL
transcodeProfiles := transcodeRequest.Profiles
callbackURL := transcodeRequest.CallbackURL

// If Profiles haven't been overridden, use the default set
if len(transcodeProfiles) == 0 {
transcodeProfiles = defaultTranscodeProfiles
}

// Download the "source" manifest that contains all the segments we'll be transcoding
println(sourceManifestOSURL)
sourceManifest, err := DownloadRenditionManifest(sourceManifestOSURL)
if err != nil {
return fmt.Errorf("error downloading source manifest: %s", err)
Expand All @@ -56,23 +101,45 @@ func RunTranscodeProcess(sourceManifestOSURL, targetManifestOSURL string, transc
}

// Download and read the segment, log the size in bytes and discard for now
// TODO: Push the segments through the transcoder
// TODO: Upload the output segments
buf := &bytes.Buffer{}
nRead, err := io.Copy(buf, rc)
if err != nil {
return fmt.Errorf("failed to read source segment data %q: %s", u, err)
}
_ = config.Logger.Log("msg", "downloaded source segment", "url", u, "size_bytes", nRead, "error", err)

// If an AccessToken is provided via the request for transcode, then use remote Broadcasters.
// Otherwise, use the local harcoded Broadcaster.
if transcodeRequest.AccessToken != "" {
creds := clients.Credentials{
AccessToken: transcodeRequest.AccessToken,
CustomAPIURL: transcodeRequest.TranscodeAPIUrl,
}
broadcasterClient, _ := clients.NewRemoteBroadcasterClient(creds)

tr, err := broadcasterClient.TranscodeSegmentWithRemoteBroadcaster(buf, int64(i), transcodeProfiles, streamName, durationMillis)
if err != nil {
return fmt.Errorf("failed to run TranscodeSegmentWithRemoteBroadcaster: %s", err)
}
fmt.Println("transcodeResult", tr) //remove this
// TODO: Upload the output segments
} else {
tr, err := localBroadcasterClient.TranscodeSegment(buf, int64(i), transcodeProfiles, durationMillis)
if err != nil {
return fmt.Errorf("failed to run TranscodeSegment: %s", err)
}
fmt.Println("transcodeResult", tr) //remove this
// TODO: Upload the output segments
}

var completedRatio = calculateCompletedRatio(len(sourceSegmentURLs), i+1)
if err = clients.DefaultCallbackClient.SendTranscodeStatus(callbackURL, clients.TranscodeStatusTranscoding, completedRatio); err != nil {
_ = config.Logger.Log("msg", "failed to send transcode status callback", "url", callbackURL, "error", err)
}
}

// Build the manifests and push them to storage
err = GenerateAndUploadManifests(sourceManifest, targetManifestOSURL, transcodeProfiles)
err = GenerateAndUploadManifests(sourceManifest, targetManifestOSURL.String(), transcodeProfiles)
if err != nil {
return err
}
Expand Down
Loading