Skip to content

Commit

Permalink
Run MistInHLS mist binary on push end for each rendition (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexKordic authored Oct 10, 2022
1 parent 17e8d6a commit 1cbf6db
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 52 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ var Version string
// Used so that we can generate fixed timestamps in tests
var Clock TimestampGenerator = RealTimestampGenerator{}

// Path to Mist's "Livepeer" process that we shell out to for the transcoding
const PathMistProcLivepeer = "/usr/local/bin/MistProcLivepeer"
// Path to Mist's binaries that we shell out to for transcoding and header file creation
var PathMistDir = "/usr/local/bin"

// Port that the local Broadcaster runs on
const DefaultBroadcasterPort = 8935
Expand Down
31 changes: 30 additions & 1 deletion handlers/misttriggers/push_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"log"
"net/http"
"net/url"
"os/exec"
"path"
"strings"
"time"

"github.com/livepeer/catalyst-api/cache"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/subprocess"
)

// TriggerPushEnd responds to PUSH_END trigger
Expand Down Expand Up @@ -63,7 +66,7 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite
return
}

uploadSuccess := pushStatus == "null"
uploadSuccess := pushStatus != "null"
if uploadSuccess {
// TODO: Do some maths so that we don't always send 0.5
if err := clients.DefaultCallbackClient.SendTranscodeStatus(info.CallbackUrl, clients.TranscodeStatusTranscoding, 0.5); err != nil {
Expand All @@ -76,6 +79,10 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite
}
}

if err := createDtsh(actualDestination); err != nil {
_ = config.Logger.Log("msg", "createDtsh failed", "err", err, "destination", actualDestination)
}

// We do not delete triggers as source stream is wildcard stream: RENDITION_PREFIX
cache.DefaultStreamCache.Transcoding.RemovePushDestination(streamName, destination)
if cache.DefaultStreamCache.Transcoding.AreDestinationsEmpty(streamName) {
Expand Down Expand Up @@ -143,3 +150,25 @@ func uuidFromPushUrl(uri string) (string, error) {
}
return path[len(path)-2], nil
}

func createDtsh(destination string) error {
url, err := url.Parse(destination)
if err != nil {
return err
}
url.RawQuery = ""
url.Fragment = ""
headerPrepare := exec.Command(path.Join(config.PathMistDir, "MistInHLS"), "-H", url.String())
if err = subprocess.LogOutputs(headerPrepare); err != nil {
return err
}
if err = headerPrepare.Start(); err != nil {
return err
}
go func() {
if err := headerPrepare.Wait(); err != nil {
_ = config.Logger.Log("msg", "createDtsh return code", "code", err, "destination", destination)
}
}()
return nil
}
21 changes: 21 additions & 0 deletions handlers/misttriggers/triggers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import (
"bytes"
"encoding/json"
"io"
"io/fs"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path"
"testing"
"time"

Expand Down Expand Up @@ -107,6 +111,23 @@ func TestRecordingCompleted(t *testing.T) {
}
}

func TestMistInHLSStart(t *testing.T) {
dir := t.TempDir()
config.PathMistDir = dir
destination := "unused"
err := createDtsh("invalid://user:abc{[email protected]:5432/db?sslmode=require")
require.IsType(t, &url.Error{}, err)
err = createDtsh(destination)
require.IsType(t, &fs.PathError{}, err)

script := path.Join(dir, "MistInHLS")
_ = os.WriteFile(script, []byte("#!/bin/sh\necho livepeer\n"), 0744)

err = createDtsh(destination)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond)
}

type StreamSample struct {
streamName string
expected PipelineId
Expand Down
55 changes: 6 additions & 49 deletions handlers/transcode.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package handlers

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/exec"
"path"

"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/cache"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/subprocess"
"github.com/xeipuuv/gojsonschema"
)

Expand Down Expand Up @@ -74,30 +73,6 @@ func (d *CatalystAPIHandlersCollection) TranscodeSegment() httprouter.Handle {
}
}

// stream from a source to a destination buffer while also printing
func streamOutput(src io.Reader, dst *bytes.Buffer, out io.Writer) error {
mw := io.MultiWriter(dst, out)
s := bufio.NewReader(src)
for {
var line []byte
line, err := s.ReadSlice('\n')
if err == io.EOF && len(line) == 0 {
break
}
if err == io.EOF {
return fmt.Errorf("Improper termination: %v", line)
}
if err != nil {
return err
}
_, err = mw.Write(line)
if err != nil {
return err
}
}
return nil
}

// RunTranscodeProcess starts `MistLivepeeerProc` as a subprocess to transcode inputStream into renditionsStream.
func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegmentRequest) error {

Expand All @@ -117,36 +92,18 @@ func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegm
}
args := string(configPayload)

transcodeCommand := exec.Command(config.PathMistProcLivepeer, args, "--debug", "8", "--kickoff")

var stdout, stderr bytes.Buffer
stderrPipe, err := transcodeCommand.StderrPipe()
if err != nil {
return fmt.Errorf("Failed to open stderr pipe: %s", err)
}
stdoutPipe, err := transcodeCommand.StdoutPipe()
if err != nil {
return fmt.Errorf("Failed to open stdout pipe: %s", err)
transcodeCommand := exec.Command(path.Join(config.PathMistDir, "MistProcLivepeer"), args, "--debug", "8", "--kickoff")
if err = subprocess.LogOutputs(transcodeCommand); err != nil {
return err
}

// Start the Transcode Command asynchronously - we call Wait() later in this method
fmt.Printf("Starting transcode via: %s\n", transcodeCommand.String())
err = transcodeCommand.Start()
if err != nil {
return fmt.Errorf("Failed to start MistProcLivepeer: %s", err)
return fmt.Errorf("failed to start MistProcLivepeer: %s", err)
}

go func() {
if streamOutput(stdoutPipe, &stdout, os.Stdout) != nil {
_ = fmt.Errorf("Failed to stream output from stdout")
}
}()
go func() {
if streamOutput(stderrPipe, &stderr, os.Stderr) != nil {
_ = fmt.Errorf("Failed to stream output from stderr")
}
}()

dir, _ := url.Parse(".")
uploadDir := inputUrl.ResolveReference(dir)
// Cache the stream data, later used in the trigger handlers called by Mist
Expand Down
50 changes: 50 additions & 0 deletions subprocess/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package subprocess

import (
"bufio"
"fmt"
"io"
"os"
"os/exec"

"github.com/livepeer/catalyst-api/config"
)

func streamOutput(src io.Reader, out io.Writer) {
s := bufio.NewReader(src)
for {
var line []byte
line, err := s.ReadSlice('\n')
if err == io.EOF && len(line) == 0 {
break
}
if err == io.EOF {
_ = config.Logger.Log("msg", "streamOutput() improper termination", "line", line)
return
}
if err != nil {
_ = config.Logger.Log("msg", "streamOutput ReadSlice error", "err", err)
return
}
_, err = out.Write(line)
if err != nil {
_ = config.Logger.Log("msg", "streamOutput out.Write error", "err", err)
return
}
}
}

// LogOutputs starts new goroutines to print cmd's stdout & stderr to our stdout & stderr
func LogOutputs(cmd *exec.Cmd) error {
stderrPipe, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("Failed to open stderr pipe: %s", err)
}
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("Failed to open stdout pipe: %s", err)
}
go streamOutput(stderrPipe, os.Stderr)
go streamOutput(stdoutPipe, os.Stdout)
return nil
}

0 comments on commit 1cbf6db

Please sign in to comment.