Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define new API endpoint + test #17

Merged
merged 4 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/http-server/http-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func StartCatalystAPIRouter(mc *handlers.MistClient) *httprouter.Router {

router.GET("/ok", withLogging(middleware.IsAuthorized(catalystApiHandlers.Ok())))
router.POST("/api/vod", withLogging(middleware.IsAuthorized(catalystApiHandlers.UploadVOD())))
router.POST("/api/transcode/file", withLogging(middleware.IsAuthorized(catalystApiHandlers.TranscodeSegment())))
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))

return router
Expand Down
15 changes: 15 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"encoding/json"
"log"
"net/http"
"strings"

"github.com/xeipuuv/gojsonschema"
)

type apiError struct {
Expand Down Expand Up @@ -37,3 +40,15 @@ func WriteHTTPUnsupportedMediaType(w http.ResponseWriter, msg string, err error)
func WriteHTTPInternalServerError(w http.ResponseWriter, msg string, err error) apiError {
return writeHttpError(w, msg, http.StatusInternalServerError, err)
}

func WriteHTTPBadBodySchema(where string, w http.ResponseWriter, errors []gojsonschema.ResultError) apiError {
sb := strings.Builder{}
sb.WriteString("Body validation error in ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see people use the String Builder very often in Go, normally stuff like this is done with a fmt.Sprintf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted.

sb.WriteString(where)
sb.WriteString(" ")
for i := 0; i < len(errors); i++ {
sb.WriteString(errors[i].String())
sb.WriteString(" ")
}
return writeHttpError(w, sb.String(), http.StatusBadRequest, nil)
}
88 changes: 33 additions & 55 deletions handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package handlers
import (
"encoding/json"
"fmt"
"github.com/livepeer/catalyst-api/clients"
"io"
"io/ioutil"
"math/rand"
Expand All @@ -12,6 +11,8 @@ import (
"strings"
"time"

"github.com/livepeer/catalyst-api/clients"

"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/errors"
"github.com/xeipuuv/gojsonschema"
Expand All @@ -32,62 +33,39 @@ func (d *CatalystAPIHandlersCollection) Ok() httprouter.Handle {
}
}

func (d *CatalystAPIHandlersCollection) UploadVOD() httprouter.Handle {
schemaLoader := gojsonschema.NewStringLoader(`{
"type": "object",
"properties": {
"url": { "type": "string", "format": "uri" },
"callback_url": { "type": "string", "format": "uri" },
"output_locations": {
"type": "array",
"items": {
"oneOf": [
{
"type": "object",
"properties": {
"type": { "type": "string", "const": "object_store" },
"url": { "type": "string", "format": "uri" }
},
"required": [ "type", "url" ],
"additional_properties": false
},
{
"type": "object",
"properties": {
"type": { "type": "string", "const": "pinata" },
"pinata_access_key": { "type": "string", "minLength": 1 }
},
"required": [ "type", "pinata_access_key" ],
"additional_properties": false
}
]
},
"minItems": 1
}
},
"required": [ "url", "callback_url", "output_locations" ],
"additional_properties": false
}`)

schema, err := gojsonschema.NewSchema(schemaLoader)
if err != nil {
panic(err)
}
func (d *CatalystAPIHandlersCollection) TranscodeSegment() httprouter.Handle {
schema := inputSchemasCompiled["TranscodeSegment"]
return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
var transcodeRequest TranscodeSegmentRequest
payload, err := ioutil.ReadAll(req.Body)
if err != nil {
errors.WriteHTTPInternalServerError(w, "Cannot read body", err)
return
}
result, err := schema.Validate(gojsonschema.NewBytesLoader(payload))
if err != nil {
errors.WriteHTTPInternalServerError(w, "body schema validation failed", err)
return
}
if !result.Valid() {
errors.WriteHTTPBadBodySchema("TranscodeSegment", w, result.Errors())
return
}
if err := json.Unmarshal(payload, &transcodeRequest); err != nil {
errors.WriteHTTPBadRequest(w, "Invalid request payload", err)
return
}

type UploadVODRequest struct {
Url string `json:"url"`
CallbackUrl string `json:"callback_url"`
OutputLocations []struct {
Type string `json:"type"`
URL string `json:"url"`
PinataAccessKey string `json:"pinata_access_key"`
Outputs struct {
SourceMp4 bool `json:"source_mp4"`
SourceSegments bool `json:"source_segments"`
TranscodedSegments bool `json:"transcoded_segments"`
} `json:"outputs,omitempty"`
} `json:"output_locations,omitempty"`
callbackClient := clients.NewCallbackClient()
if err := callbackClient.SendTranscodeStatusError(transcodeRequest.CallbackUrl, "NYI - not yet implemented"); err != nil {
errors.WriteHTTPInternalServerError(w, "error send transcode error", err)
}
io.WriteString(w, "OK") // TODO later
}
}

func (d *CatalystAPIHandlersCollection) UploadVOD() httprouter.Handle {
schema := inputSchemasCompiled["UploadVOD"]

return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
var uploadVODRequest UploadVODRequest
Expand Down
156 changes: 156 additions & 0 deletions handlers/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package handlers

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/clients"
"github.com/stretchr/testify/require"
)

Expand All @@ -24,6 +29,157 @@ func TestOKHandler(t *testing.T) {
require.Equal(rr.Body.String(), "OK")
}

func TestSegmentCallback(t *testing.T) {
var jsonData = `{
"source_location": "http://localhost/input",
"callback_url": "CALLBACK_URL",
"manifestID": "somestream",
"profiles": [
{
"name": "720p",
"width": 1280,
"height": 720,
"bitrate": 700000,
"fps": 30
}, {
"name": "360p",
"width": 640,
"height": 360,
"bitrate": 200000,
"fps": 30
}
],
"verificationFreq": 1
}`

callbacks := make(chan string, 10)
callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
payload, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Printf("WebhookReceiver error reading req body\n")
w.WriteHeader(451)
return
}
w.WriteHeader(200)
callbacks <- string(payload)
}))
defer callbackServer.Close()
jsonData = strings.ReplaceAll(jsonData, "CALLBACK_URL", callbackServer.URL)

catalystApiHandlers := CatalystAPIHandlersCollection{MistClient: StubMistClient{}}

router := httprouter.New()

req, _ := http.NewRequest("POST", "/api/transcode/file", bytes.NewBuffer([]byte(jsonData)))
req.Header.Set("Content-Type", "application/json")

rr := httptest.NewRecorder()
router.POST("/api/transcode/file", catalystApiHandlers.TranscodeSegment())
router.ServeHTTP(rr, req)

require.Equal(t, 200, rr.Result().StatusCode)
require.Equal(t, "OK", rr.Body.String())

// Wait for callback
select {
case data := <-callbacks:
message := &clients.TranscodeStatusMessage{}
err := json.Unmarshal([]byte(data), message)
require.NoErrorf(t, err, "json unmarshal failed, src=%s", data)
require.Equal(t, "error", message.Status)
require.Equal(t, "NYI - not yet implemented", message.Error)
case <-time.After(300 * time.Millisecond):
require.FailNow(t, "Callback not fired by handler")
}
}

func TestSegmentBodyFormat(t *testing.T) {
require := require.New(t)

badRequests := [][]byte{
// missing source_location
[]byte(`{
"callback_url": "http://localhost:8080/callback",
"manifestID": "somestream",
"profiles": [{"name": "t","width": 1280,"height": 720,"bitrate": 70000,"fps": 30}],
"verificationFreq": 1
}`),
// missing callback_url
[]byte(`{
"source_location": "http://localhost/input",
"manifestID": "somestream",
"profiles": [{"name": "t","width": 1280,"height": 720,"bitrate": 70000,"fps": 30}],
"verificationFreq": 1
}`),
// missing manifestID
[]byte(`{
"source_location": "http://localhost/input",
"callback_url": "http://localhost:8080/callback",
"profiles": [{"name": "t","width": 1280,"height": 720,"bitrate": 70000,"fps": 30}],
"verificationFreq": 1
}`),
// missing profiles
[]byte(`{
"source_location": "http://localhost/input",
"callback_url": "http://localhost:8080/callback",
"manifestID": "somestream",
"verificationFreq": 1
}`),
// missing name
[]byte(`{
"source_location": "http://localhost/input",
"callback_url": "http://localhost:8080/callback",
"manifestID": "somestream",
"profiles": [{"width": 1280,"height": 720,"bitrate": 70000,"fps": 30}],
"verificationFreq": 1
}`),
// missing width
[]byte(`{
"source_location": "http://localhost/input",
"callback_url": "http://localhost:8080/callback",
"manifestID": "somestream",
"profiles": [{"name": "t","height": 720,"bitrate": 70000,"fps": 30}],
"verificationFreq": 1
}`),
// missing height
[]byte(`{
"source_location": "http://localhost/input",
"callback_url": "http://localhost:8080/callback",
"manifestID": "somestream",
"profiles": [{"name": "t","width": 1280,"bitrate": 70000,"fps": 30}],
"verificationFreq": 1
}`),
// missing bitrate
[]byte(`{
"source_location": "http://localhost/input",
"callback_url": "http://localhost:8080/callback",
"manifestID": "somestream",
"profiles": [{"name": "t","width": 1280,"height": 720,"fps": 30}],
"verificationFreq": 1
}`),
// missing verificationFreq
[]byte(`{
"source_location": "http://localhost/input",
"callback_url": "http://localhost:8080/callback",
"manifestID": "somestream",
"profiles": [{"name": "t","width": 1280,"height": 720,"bitrate": 70000,"fps": 30}]
}`),
}

catalystApiHandlers := CatalystAPIHandlersCollection{MistClient: StubMistClient{}}
router := httprouter.New()

router.POST("/api/transcode/file", catalystApiHandlers.TranscodeSegment())
for _, payload := range badRequests {
req, _ := http.NewRequest("POST", "/api/transcode/file", bytes.NewBuffer(payload))
req.Header.Set("Content-Type", "application/json")

rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)
require.Equal(400, rr.Result().StatusCode, string(payload))
}
}

func TestSuccessfulVODUploadHandler(t *testing.T) {
require := require.New(t)

Expand Down
24 changes: 24 additions & 0 deletions handlers/json_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package handlers

import "github.com/xeipuuv/gojsonschema"

var inputSchemas map[string]string = map[string]string{
"TranscodeSegment": TranscodeSegmentRequestSchemaDefinition,
"UploadVOD": UploadVODRequestSchemaDefinition,
}

func compileJsonSchemas() map[string]*gojsonschema.Schema {
compiled := make(map[string]*gojsonschema.Schema, 0)
for name, text := range inputSchemas {
schema, err := gojsonschema.NewSchema(gojsonschema.NewStringLoader(text))
if err != nil {
// rase panic on program start
panic(err) // fix schema text
}
compiled[name] = schema
}
return compiled
}

// Run compile step on program start:
var inputSchemasCompiled map[string]*gojsonschema.Schema = compileJsonSchemas()
52 changes: 52 additions & 0 deletions handlers/segmenting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package handlers

type UploadVODRequest struct {
Url string `json:"url"`
CallbackUrl string `json:"callback_url"`
OutputLocations []struct {
Type string `json:"type"`
URL string `json:"url"`
PinataAccessKey string `json:"pinata_access_key"`
Outputs struct {
SourceMp4 bool `json:"source_mp4"`
SourceSegments bool `json:"source_segments"`
TranscodedSegments bool `json:"transcoded_segments"`
} `json:"outputs,omitempty"`
} `json:"output_locations,omitempty"`
}

var UploadVODRequestSchemaDefinition string = `{
"type": "object",
"properties": {
"url": { "type": "string", "format": "uri" },
"callback_url": { "type": "string", "format": "uri" },
"output_locations": {
"type": "array",
"items": {
"oneOf": [
{
"type": "object",
"properties": {
"type": { "type": "string", "const": "object_store" },
"url": { "type": "string", "format": "uri" }
},
"required": [ "type", "url" ],
"additional_properties": false
},
{
"type": "object",
"properties": {
"type": { "type": "string", "const": "pinata" },
"pinata_access_key": { "type": "string", "minLength": 1 }
},
"required": [ "type", "pinata_access_key" ],
"additional_properties": false
}
]
},
"minItems": 1
}
},
"required": [ "url", "callback_url", "output_locations" ],
"additional_properties": false
}`
Loading