Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process Upload VOD #20

Merged
merged 11 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
- name: Build
run: |
mkdir -p build/ releases/
make all GO_BUILD_DIR="build/"
make build GO_BUILD_DIR="build/"

- name: Archive binaries for windows
if: matrix.platform == 'windows'
Expand Down Expand Up @@ -126,7 +126,7 @@ jobs:
- name: Build
run: |
mkdir -p build/ releases/
GOARCH="${{ matrix.arch }}" make -j4 all GO_BUILD_DIR="build/"
GOARCH="${{ matrix.arch }}" make -j4 build GO_BUILD_DIR="build/"
cd build/
for file in $(find . -type f -perm -a+x)
do
Expand Down Expand Up @@ -167,4 +167,4 @@ jobs:
uses: actions/upload-artifact@master
with:
name: release-artifacts
path: releases/
path: releases/
14 changes: 11 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,16 @@ GO_BUILD_DIR?=build/
ldflags := -X 'github.com/livepeer/catalyst-api/config.Version=$(shell git rev-parse HEAD)'

.PHONY: all
all: build-server
all: build fmt test

.PHONY: build-server
build-server:
.PHONY: build
build:
go build -ldflags="$(ldflags)" -o "$(GO_BUILD_DIR)catalyst-api" cmd/http-server/http-server.go

.PHONY: fmt
fmt:
go fmt ./...

.PHONY: test
test:
go test ./...
24 changes: 17 additions & 7 deletions cmd/http-server/http-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"net/http"
"os"

log "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log"

"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/config"
Expand All @@ -18,6 +18,7 @@ import (

func main() {
port := flag.Int("port", 4949, "Port to listen on")
mistPort := flag.Int("mist-port", 4242, "Port to listen on")
mistJson := flag.Bool("j", false, "Print application info as JSON. Used by Mist to present flags in its UI.")
flag.Parse()

Expand All @@ -26,26 +27,35 @@ func main() {
return
}

listen := fmt.Sprintf("0.0.0.0:%d", *port)
router := StartCatalystAPIRouter()
mc := &handlers.MistClient{
ApiUrl: fmt.Sprintf("http://localhost:%d/api2", *mistPort),
TriggerCallback: fmt.Sprintf("http://localhost:%d/api/mist/trigger", *port),
}

listen := fmt.Sprintf("localhost:%d", *port)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! This change reverted the listen address to localhost 😢

Copy link
Contributor

@thomshutt thomshutt Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed everywhere else in #22

router := StartCatalystAPIRouter(mc)

stdlog.Println("Starting Catalyst API version", config.Version, "listening on", listen)
err := http.ListenAndServe(listen, router)
stdlog.Fatal(err)

}

func StartCatalystAPIRouter() *httprouter.Router {
func StartCatalystAPIRouter(mc *handlers.MistClient) *httprouter.Router {
router := httprouter.New()

var logger log.Logger
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
withLogging := middleware.LogRequest(logger)

router.GET("/ok", withLogging(middleware.IsAuthorized(handlers.CatalystAPIHandlers.Ok())))
router.POST("/api/vod", withLogging(middleware.IsAuthorized(handlers.CatalystAPIHandlers.UploadVOD())))
router.POST("/api/mist/trigger", withLogging(handlers.MistCallbackHandlers.Trigger()))
sc := make(map[string]handlers.StreamInfo)
catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{MistClient: mc, StreamCache: sc}
mistCallbackHandlers := &handlers.MistCallbackHandlersCollection{MistClient: mc, StreamCache: sc}

router.GET("/ok", withLogging(middleware.IsAuthorized(catalystApiHandlers.Ok())))
router.POST("/api/vod", withLogging(middleware.IsAuthorized(catalystApiHandlers.UploadVOD())))
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))

return router
}
2 changes: 1 addition & 1 deletion cmd/http-server/http-server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func TestInitServer(t *testing.T) {
require := require.New(t)
router := StartCatalystAPIRouter()
router := StartCatalystAPIRouter(nil)

handle, _, _ := router.Lookup("GET", "/ok")
require.NotNil(handle)
Expand Down
20 changes: 14 additions & 6 deletions handlers/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@ import (
"sync"
)

type MistAPIClient interface {
AddStream(streamName, sourceUrl string) error
PushStart(streamName, targetURL string) error
DeleteStream(streamName string) error
AddTrigger(streamName, triggerName string) error
DeleteTrigger(streamName, triggerName string) error
}

type MistClient struct {
apiUrl string
triggerCallback string
ApiUrl string
TriggerCallback string
configMu sync.Mutex
}

Expand Down Expand Up @@ -47,7 +55,7 @@ func (mc *MistClient) AddTrigger(streamName, triggerName string) error {
if err != nil {
return err
}
c := commandAddTrigger(streamName, triggerName, mc.triggerCallback, triggers)
c := commandAddTrigger(streamName, triggerName, mc.TriggerCallback, triggers)
resp, err := mc.sendCommand(c)
return validateAddTrigger(streamName, triggerName, resp, err)
}
Expand Down Expand Up @@ -97,7 +105,7 @@ func (mc *MistClient) sendCommand(command interface{}) (string, error) {
return "", err
}
payload := payloadFor(c)
resp, err := http.Post(mc.apiUrl, "application/x-www-form-urlencoded", bytes.NewBuffer([]byte(payload)))
resp, err := http.Post(mc.ApiUrl, "application/json", bytes.NewBuffer([]byte(payload)))
if err != nil {
return "", err
}
Expand Down Expand Up @@ -200,7 +208,7 @@ func commandUpdateTrigger(streamName, triggerName string, currentTriggers Trigge
triggersMap := currentTriggers

triggers := triggersMap[triggerName]
triggers = filterTriggersWithoutStream(triggers, streamName)
triggers = deleteAllTriggersFor(triggers, streamName)
if len(replaceTrigger.Streams) != 0 {
triggers = append(triggers, replaceTrigger)
}
Expand All @@ -209,7 +217,7 @@ func commandUpdateTrigger(streamName, triggerName string, currentTriggers Trigge
return MistConfig{Config{Triggers: triggersMap}}
Copy link
Member

@victorges victorges Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: I'm testing the integration between task-runner and catalyst-api, and I'm getting a panic on the line above (:216) of assignment to entry in nil map:


[2022-08-12 22:34:52] livepeer-catalyst-api (205) INFO: ts=2022-08-12T22:34:52.472726115Z err="assignment to entry in nil map" trace="goroutine 25 [running]:\nruntime/debug.Stack()\n\t/opt/hostedtoolcache/go/1.19.0/x64/src/runtime/debug/stack.go:24 +0x65\ngithub.com/livepeer/catalyst-api/middleware.LogRequest.func1.1.1()\n\t/home/runner/work/catalyst-api/catalyst-api/middleware/logging.go:44 +0x8f\npanic({0x7b1320, 0x8b4260})\n\t/opt/hostedtoolcache/go/1.19.0/x64/src/runtime/panic.go:884 +0x212\ngithub.com/livepeer/catalyst-api/handlers.commandUpdateTrigger(...)\n\t/home/runner/work/catalyst-api/catalyst-api/handlers/client.go:216\ngithub.com/livepeer/catalyst-api/handlers.commandAddTrigger({0xc0002464c8, 0x15}, {0x80e24c, 0x8}, {0xc000236330?, 0x0?}, 0x0?)\n\t/home/runner/work/catalyst-api/catalyst-api/handlers/client.go:200 +0x277\ngithub.com/livepeer/catalyst-api/handlers.(*MistClient).AddTrigger(0xc000215140, {0xc0002464c8, 0x15}, {0x80e24c, 0x8})\n\t/home/runner/work/catalyst-api/catalyst-api/handlers/client.go:58 +0x125\ngithub.com/livepeer/catalyst-api/handlers.(*CatalystAPIHandlersCollection).processUploadVOD(0xc00021c150, {0xc0002464c8, 0x15}, {0xc0002024e0?, 0xc00025cec0?}, {0xc0002444d0, 0xa4})\n\t/home/runner/work/catalyst-api/catalyst-api/handlers/handlers.go:164 +0x74\ngithub.com/livepeer/catalyst-api/handlers.(*CatalystAPIHandlersCollection).UploadVOD.func1({0x8b7638?, 0xc000242a40}, 0xc00028e400, {0x0?, 0xc00029e7e0?, 0xc0000e48e0?})\n\t/home/runner/work/catalyst-api/catalyst-api/handlers/handlers.go:129 +0x3a6\ngithub.com/livepeer/catalyst-api/middleware.IsAuthorized.func1({0x8b7638, 0xc000242a40}, 0xc00028e400, {0x0, 0x0, 0x0})\n\t/home/runner/work/catalyst-api/catalyst-api/middleware/auth.go:29 +0x144\ngithub.com/livepeer/catalyst-api/middleware.LogRequest.func1.1({0x8b78a8?, 0xc000294540}, 0xc00028e400, {0x0, 0x0, 0x0})\n\t/home/runner/work/catalyst-api/catalyst-api/middleware/logging.go:48 +0x183\ngithub.com/julienschmidt/httprouter.(*Router).ServeHTTP(0xc0002222a0, {0x8b78a8, 0xc000294540}, 0xc00028e400)\n\t/home/runner/go/pkg/mod/github.com/julienschmidt/[email protected]/router.go:387 +0x81c\nnet/http.serverHandler.ServeHTTP({0x8b69d0?}, {0x8b78a8, 0xc000294540}, 0xc00028e400)\n\t/opt/hostedtoolcache/go/1.19.0/x64/src/net/http/server.go:2947 +0x30c\nnet/http.(*conn).serve(0xc0002272c0, {0x8b7d60, 0xc0002156b0})\n\t/opt/hostedtoolcache/go/1.19.0/x64/src/net/http/server.go:1991 +0x607\ncreated by net/http.(*Server).Serve\n\t/opt/hostedtoolcache/go/1.19.0/x64/src/net/http/server.go:3102 +0x4db\n"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reproduced locally by running catalyst-api and POSTing:

curl --location --request POST 'http://localhost:4949/api/vod' \
--header 'Authorization: Bearer IAmAuthorized' \
--header 'Content-Type: application/json' \
--data-raw '{
    "url": "http://example.com",
    "callback_url": "http://callback-lol.com",
    "output_locations": [
        {
				"type": "object_store",
				"url": "memory://localhost/output",
 				"outputs": {
					"source_segments": true
				}
			}
    ]
}'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #24 and #25

}

func filterTriggersWithoutStream(triggers []ConfigTrigger, streamName string) []ConfigTrigger {
func deleteAllTriggersFor(triggers []ConfigTrigger, streamName string) []ConfigTrigger {
var res []ConfigTrigger
for _, t := range triggers {
f := false
Expand Down
119 changes: 76 additions & 43 deletions handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package handlers
import (
"encoding/json"
"fmt"
"github.com/livepeer/catalyst-api/clients"
"io"
"io/ioutil"
"log"
"math/rand"
"mime"
"net/http"
Expand All @@ -17,25 +17,27 @@ import (
"github.com/xeipuuv/gojsonschema"
)

type CatalystAPIHandlersCollection struct{}
type StreamInfo struct {
callbackUrl string
}

var CatalystAPIHandlers = CatalystAPIHandlersCollection{}
type CatalystAPIHandlersCollection struct {
MistClient MistAPIClient
StreamCache map[string]StreamInfo
}

func (d *CatalystAPIHandlersCollection) Ok() httprouter.Handle {
return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
io.WriteString(w, "OK")
}
}

var processUpload = processUploadVOD

func (d *CatalystAPIHandlersCollection) UploadVOD() httprouter.Handle {
schemaLoader := gojsonschema.NewStringLoader(`{
"type": "object",
"properties": {
"url": { "type": "string", "format": "uri" },
"callback_url": { "type": "string", "format": "uri" },
"mp4_output": { "type": "boolean" },
"output_locations": {
"type": "array",
"items": {
Expand Down Expand Up @@ -75,11 +77,15 @@ func (d *CatalystAPIHandlersCollection) UploadVOD() httprouter.Handle {
type UploadVODRequest struct {
Url string `json:"url"`
CallbackUrl string `json:"callback_url"`
Mp4Output bool `json:"mp4_output"`
OutputLocations []struct {
Type string `json:"type"`
URL string `json:"url"`
PinataAccessKey string `json:"pinata_access_key"`
Outputs struct {
SourceMp4 bool `json:"source_mp4"`
SourceSegments bool `json:"source_segments"`
TranscodedSegments bool `json:"transcoded_segments"`
} `json:"outputs,omitempty"`
} `json:"output_locations,omitempty"`
}

Expand All @@ -103,10 +109,32 @@ func (d *CatalystAPIHandlersCollection) UploadVOD() httprouter.Handle {
return
}

if err := processUpload(uploadVODRequest.Url); err != nil {
// find source segment URL
var tURL string
for _, o := range uploadVODRequest.OutputLocations {
if o.Outputs.SourceSegments {
tURL = o.URL
break
}
}
if tURL == "" {
errors.WriteHTTPBadRequest(w, "Invalid request payload", fmt.Errorf("no source segment URL in request"))
return
}

streamName := randomStreamName("catalyst_vod_")
d.StreamCache[streamName] = StreamInfo{callbackUrl: uploadVODRequest.CallbackUrl}

// process the request
if err := d.processUploadVOD(streamName, uploadVODRequest.Url, tURL); err != nil {
errors.WriteHTTPInternalServerError(w, "Cannot process upload VOD request", err)
}

callbackClient := clients.NewCallbackClient()
if err := callbackClient.SendTranscodeStatus(uploadVODRequest.CallbackUrl, clients.TranscodeStatusPreparing, 0.0); err != nil {
errors.WriteHTTPInternalServerError(w, "Cannot send transcode status", err)
}

io.WriteString(w, fmt.Sprint(len(uploadVODRequest.OutputLocations)))
}
}
Expand All @@ -129,41 +157,17 @@ func HasContentType(r *http.Request, mimetype string) bool {
return false
}

func processUploadVOD(url string) error {
// TODO: This function is only a scaffold for now

// TODO: Update hostnames and ports
mc := MistClient{apiUrl: "http://localhost:4242/api2", triggerCallback: "http://host.docker.internal:8080/api/mist/trigger"}

streamName := randomStreamName("catalyst_vod_")
if err := mc.AddStream(streamName, url); err != nil {
func (d *CatalystAPIHandlersCollection) processUploadVOD(streamName, sourceURL, targetURL string) error {
if err := d.MistClient.AddStream(streamName, sourceURL); err != nil {
return err
}

// TODO: Move it to `Trigger()`
defer mc.DeleteStream(streamName)

if err := mc.AddTrigger(streamName, "PUSH_END"); err != nil {
if err := d.MistClient.AddTrigger(streamName, "PUSH_END"); err != nil {
return err
}
// TODO: Move it to `Trigger()`
defer mc.DeleteTrigger(streamName, "PUSH_END")

if err := mc.AddTrigger(streamName, "RECORDING_END"); err != nil {
if err := d.MistClient.PushStart(streamName, targetURL); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So will mist call the catalyst-uploader in order to save to that targetURL? Cause from what I understood, this will come from the URL inside the first output_location to have a source_segments output set to true, right?

The URLs in the output_location are actually Object Store URLs compatible with the drivers lib moved to go-tools from go-livepeer. This means that Mist won't be able to o a single PUT to that URL. The catalyst-uploader does have support for it tho, but from what I understood it is receiving not only the Object Store URL but the path of the file on the same URL. So someone would have to append the file paths to this Object Store URL for each segmented file that is saved. Do you know if mist is doing that for calling the catalyst-api?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good, question, calling catalyst-uploader from Mist is being implemented by @stronk-dev , so Marco may have some insights here.

return err
}

// TODO: Move it to `Trigger()`
defer mc.DeleteTrigger(streamName, "RECORDING_END")

// TODO: Change the output to the value from the request instead of the hardcoded "/media/recording/result.ts"
if err := mc.PushStart(streamName, "/media/recording/result.ts"); err != nil {
return err
}

// TODO: After moving cleanup to `Trigger()`, this is no longer needed
time.Sleep(10 * time.Second)

return nil
}

Expand All @@ -179,21 +183,50 @@ func randomStreamName(prefix string) string {
return fmt.Sprintf("%s%s", prefix, string(res))
}

type MistCallbackHandlersCollection struct{}

var MistCallbackHandlers = MistCallbackHandlersCollection{}
type MistCallbackHandlersCollection struct {
MistClient MistAPIClient
StreamCache map[string]StreamInfo
}

func (d *MistCallbackHandlersCollection) Trigger() httprouter.Handle {
return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
log.Println("Received Mist Trigger")
if t := req.Header.Get("X-Trigger"); t != "PUSH_END" {
errors.WriteHTTPBadRequest(w, "Unsupported X-Trigger", fmt.Errorf("unknown trigger '%s'", t))
return
}
payload, err := ioutil.ReadAll(req.Body)
if err != nil {
errors.WriteHTTPInternalServerError(w, "Cannot read payload", err)
return
}
lines := strings.Split(strings.TrimSuffix(string(payload), "\n"), "\n")
if len(lines) < 2 {
errors.WriteHTTPBadRequest(w, "Bad request payload", fmt.Errorf("unknown payload '%s'", string(payload)))
return
}

// TODO: Handle trigger results: 1) Check the trigger name, 2) Call callbackURL, 3) Perform stream cleanup
fmt.Println(string(payload))
io.WriteString(w, "OK")
// stream name is the second line in the Mist Trigger payload
s := lines[1]
// when uploading is done, remove trigger and stream from Mist
errT := d.MistClient.DeleteTrigger(s, "PUSH_END")
errS := d.MistClient.DeleteStream(s)
if errT != nil {
errors.WriteHTTPInternalServerError(w, fmt.Sprintf("Cannot remove PUSH_END trigger for stream '%s'", s), errT)
return
}
if errS != nil {
errors.WriteHTTPInternalServerError(w, fmt.Sprintf("Cannot remove stream '%s'", s), errS)
return
}

callbackClient := clients.NewCallbackClient()
if err := callbackClient.SendTranscodeStatus(d.StreamCache[s].callbackUrl, clients.TranscodeStatusTranscoding, 0.0); err != nil {
errors.WriteHTTPInternalServerError(w, "Cannot send transcode status", err)
}

delete(d.StreamCache, s)

// TODO: add timeout for the stream upload
// TODO: start transcoding
}
}
Loading