Skip to content

Commit

Permalink
Recording start callback (#49)
Browse files Browse the repository at this point in the history
* PUSH_OUT_START trigger handler
* UUID generation
* studio callback
  • Loading branch information
AlexKordic authored Sep 30, 2022
1 parent 2e7f91d commit e38280c
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 1 deletion.
26 changes: 26 additions & 0 deletions clients/callback_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"time"

Expand Down Expand Up @@ -48,6 +49,22 @@ func (c CallbackClient) DoWithRetries(r *http.Request) error {
return nil
}

func (c CallbackClient) SendRecordingEvent(event *RecordingEvent) {
eventJson, err := json.Marshal(event)
if err != nil {
log.Printf("SendRecordingStarted json marshal %v", err)
return
}
req, err := http.NewRequest(http.MethodPost, config.RecordingCallback, bytes.NewReader(eventJson))
if err != nil {
log.Printf("SendRecordingStarted http.NewRequest %v", err)
return
}
if err := c.DoWithRetries(req); err != nil {
log.Printf("SendRecordingStarted callback %v", err)
}
}

// Sends a Transcode Status message to the Client (initially just Studio)
// The status strings will be useful for debugging where in the workflow we got to, but everything
// in Studio will be driven off the overall "Completion Ratio".
Expand Down Expand Up @@ -181,6 +198,15 @@ func (ts TranscodeStatus) String() string {

// The various status messages we can send

type RecordingEvent struct {
Event string `json:"event"`
StreamName string `json:"stream_name"`
RecordingId string `json:"recording_id"`
Hostname string `json:"host_name"`
Timestamp int64 `json:"timestamp"`
Success *bool `json:"success,omitempty"`
}

type TranscodeStatusMessage struct {
CompletionRatio float64 `json:"completion_ratio"` // No omitempty or we lose this for 0% completion case
Error string `json:"error,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ var Logger log.Logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
func init() {
Logger = log.With(Logger, "ts", log.DefaultTimestampUTC)
}

var RecordingCallback string = "http://127.0.0.1:8008/recording/status"
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/livepeer/catalyst-api
go 1.19

require (
github.com/google/uuid v1.3.0
github.com/julienschmidt/httprouter v1.3.0
github.com/livepeer/go-tools v0.0.0-20220926110222-2ebcbb5685b4
github.com/livepeer/livepeer-data v0.4.20
Expand All @@ -21,7 +22,6 @@ require (
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.4.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
59 changes: 59 additions & 0 deletions handlers/misttriggers/push_out_start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package misttriggers

import (
"fmt"
"log"
"net/http"
"net/url"
"strings"
"time"

"github.com/google/uuid"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/errors"
)

// TriggerPushOutStart responds to PUSH_OUT_START trigger
// This trigger is run right before an outgoing push is started. This trigger is stream-specific and must be blocking.
// The payload for this trigger is multiple lines, each separated by a single newline character (without an ending newline), containing data:
//
// stream name (string)
// push target URI (string)
func (d *MistCallbackHandlersCollection) TriggerPushOutStart(w http.ResponseWriter, req *http.Request, payload []byte) {
lines := strings.Split(strings.TrimSuffix(string(payload), "\n"), "\n")
if len(lines) != 2 {
errors.WriteHTTPBadRequest(w, "Bad request payload", fmt.Errorf("unknown payload '%s'", string(payload)))
return
}
streamName := lines[0]
destination := lines[1]
var destinationToReturn string
switch streamNameToPipeline(streamName) {
case Recording:
destinationToReturn = d.RecordingPushOutStart(w, req, streamName, destination)
default:
destinationToReturn = destination
}
if _, err := w.Write([]byte(destinationToReturn)); err != nil {
log.Printf("TriggerPushOutStart failed to send rewritten url: %v", err)
}
}

func (d *MistCallbackHandlersCollection) RecordingPushOutStart(w http.ResponseWriter, req *http.Request, streamName, destination string) string {
event := &clients.RecordingEvent{
Event: "start",
Timestamp: time.Now().UnixMilli(),
StreamName: streamName,
RecordingId: uuid.New().String(),
Hostname: req.Host,
}
pushUrl, err := url.Parse(destination)
if err != nil {
log.Printf("RecordingPushOutStart url.Parse %v", err)
return destination
}
// Add uuid after stream name
pushUrl.Path = strings.Replace(pushUrl.Path, "$stream", "$stream/"+event.RecordingId, 1)
go clients.DefaultCallbackClient.SendRecordingEvent(event)
return pushUrl.String()
}
3 changes: 3 additions & 0 deletions handlers/misttriggers/triggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

const TRIGGER_PUSH_END = "PUSH_END"
const TRIGGER_PUSH_OUT_START = "PUSH_OUT_START"
const TRIGGER_LIVE_TRACK_LIST = "LIVE_TRACK_LIST"

type MistCallbackHandlersCollection struct {
Expand All @@ -34,6 +35,8 @@ func (d *MistCallbackHandlersCollection) Trigger() httprouter.Handle {

triggerName := req.Header.Get("X-Trigger")
switch triggerName {
case TRIGGER_PUSH_OUT_START:
d.TriggerPushOutStart(w, req, payload)
case TRIGGER_PUSH_END:
d.TriggerPushEnd(w, req, payload)
case TRIGGER_LIVE_TRACK_LIST:
Expand Down
50 changes: 50 additions & 0 deletions handlers/misttriggers/triggers_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
package misttriggers

import (
"bytes"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
"github.com/stretchr/testify/require"
)

Expand All @@ -18,6 +27,47 @@ func TestPipelineId(t *testing.T) {
}
}

func TestRecordingStart(t *testing.T) {
testStartTime := time.Now().UnixMilli()
mistCallbackHandlers := &MistCallbackHandlersCollection{MistClient: clients.StubMistClient{}}
callbackHappened := make(chan bool, 10)
callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
payload, err := io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(200)
message := clients.RecordingEvent{}
err = json.Unmarshal(payload, &message)
require.NoError(t, err)
require.Equal(t, "videoSomeStreamName", message.StreamName)
require.Equal(t, "start", message.Event)
require.GreaterOrEqual(t, message.Timestamp, testStartTime)
require.Less(t, message.Timestamp, testStartTime+2)
require.NotEmpty(t, message.RecordingId)
callbackHappened <- true
}))
defer callbackServer.Close()
config.RecordingCallback = callbackServer.URL

router := httprouter.New()
router.POST("/api/mist/trigger", mistCallbackHandlers.Trigger())
pushOutTriggerPayload := "videoSomeStreamName\ns3+https://creds:[email protected]/region/livepeer-recordings-bucket/$stream/index.m3u8"
req, _ := http.NewRequest("POST", "/api/mist/trigger", bytes.NewBuffer([]byte(pushOutTriggerPayload)))
req.Header.Set("X-Trigger", "PUSH_OUT_START")
req.Header.Set("Host", "test.livepeer.monster")
rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)
require.Equal(t, 200, rr.Result().StatusCode)
result := rr.Body.String()
require.Equal(t, "s3+https://creds:[email protected]/region/livepeer-recordings-bucket/$stream/", result[:81])
require.Greater(t, len(result), 92)
require.Equal(t, "/index.m3u8", result[len(result)-11:])
select {
case <-callbackHappened:
case <-time.After(1 * time.Second):
require.FailNow(t, "no callback happened")
}
}

type StreamSample struct {
streamName string
expected PipelineId
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
func main() {
port := flag.Int("port", 4949, "Port to listen on")
mistPort := flag.Int("mist-port", 4242, "Port to listen on")
flag.StringVar(&config.RecordingCallback, "recording", "http://recording.livepeer.com/recording/status", "Callback URL for recording start&stop events")
mistJson := flag.Bool("j", false, "Print application info as JSON. Used by Mist to present flags in its UI.")
flag.Parse()

Expand Down

0 comments on commit e38280c

Please sign in to comment.