Skip to content

Commit

Permalink
feat: add audio-to-text pipeline (livepeer#3078)
Browse files Browse the repository at this point in the history
* Add speech-to-text pipeline, refactor processAIRequest and handleAIRequest to allow for various response types

* Pin gomod to ai-runner for testing

* Revert "Pin gomod to ai-runner for testing"

This reverts commit d4ba500.

* Update go mod dep for ai-worker

* Calculate pixel value of audio file

* fix go-mod deps

* Adjust price calculation

* one second per pixel

* cleanup, fix missing duration

* Add supported file types, calculate price by milliseconds

* Add bad request response for unsupported file types

* Update name of function

* Update go mod to ai-runner

* Use ffmpeg to get duration

* update install_ffmpeg.sh to parse audio better

* Check for audio codec instead of video codec

* gomod edits

* add docker file

* Update install_ffmpeg.sh to improve audio support, Add duration validation and logging, pin lpms

* rename speech-to-text to audio-to-text

* Update go-mod

* cleanup

* update go mod

* remove comment

* update gomod

* Update lpms mod

* Update to latest lpms

* Update lpms

* feat(ai): apply code improvements to AudioToText pipeline

This commit applies several code improvements to the AudioToText
codebase.

* Remove unnecessary logic

* Remove unused error

* Fix missing err

* Update go.mod and tidy

* chore(ai): update ai-worker and lpms to latest version

This commit ensures that the ai-worker and lpms are at the latest
versions which contain the changes needed for the audio-to-text
pipeline.

---------

Co-authored-by: 0xb79orch <[email protected]>
Co-authored-by: Rick Staa <[email protected]>
  • Loading branch information
3 people committed Jul 26, 2024
1 parent 3077092 commit 3635140
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 49 deletions.
12 changes: 12 additions & 0 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,18 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
constraints[core.Capability_Upscale].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_Upscale, config.ModelID, big.NewRat(config.PricePerUnit, config.PixelsPerUnit))
case "audio-to-text":
_, ok := constraints[core.Capability_AudioToText]
if !ok {
aiCaps = append(aiCaps, core.Capability_AudioToText)
constraints[core.Capability_AudioToText] = &core.Constraints{
Models: make(map[string]*core.ModelConstraint),
}
}

constraints[core.Capability_AudioToText].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_AudioToText, config.ModelID, big.NewRat(config.PricePerUnit, config.PixelsPerUnit))
}

if len(aiCaps) > 0 {
Expand Down
25 changes: 25 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/jaypipes/ghw/pkg/pci"
"github.com/livepeer/go-livepeer/net"
ffmpeg "github.com/livepeer/lpms/ffmpeg"
"github.com/oapi-codegen/runtime/types"
"github.com/pkg/errors"
"google.golang.org/grpc/peer"
)
Expand Down Expand Up @@ -74,6 +75,8 @@ var (
ErrProfEncoder = fmt.Errorf("unknown VideoProfile encoder for protobufs")
ErrProfName = fmt.Errorf("unknown VideoProfile profile name")

ErrAudioDurationCalculation = fmt.Errorf("audio duration calculation failed")

ext2mime = map[string]string{
".ts": "video/mp2t",
".mp4": "video/mp4",
Expand Down Expand Up @@ -530,3 +533,25 @@ func ParseEthAddr(strJsonKey string) (string, error) {
}
return "", errors.New("Error parsing address from keyfile")
}

// CalculateAudioDuration calculates audio file duration using the lpms/ffmpeg package.
func CalculateAudioDuration(audio types.File) (int64, error) {
read, err := audio.Reader()
if err != nil {
return 0, err
}
defer read.Close()

bytearr, _ := audio.Bytes()
_, mediaFormat, err := ffmpeg.GetCodecInfoBytes(bytearr)
if err != nil {
return 0, errors.New("Error getting codec info")
}

duration := int64(mediaFormat.DurSecs)
if duration <= 0 {
return 0, ErrAudioDurationCalculation
}

return duration, nil
}
1 change: 1 addition & 0 deletions core/ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type AI interface {
ImageToImage(context.Context, worker.ImageToImageMultipartRequestBody) (*worker.ImageResponse, error)
ImageToVideo(context.Context, worker.ImageToVideoMultipartRequestBody) (*worker.VideoResponse, error)
Upscale(context.Context, worker.UpscaleMultipartRequestBody) (*worker.ImageResponse, error)
AudioToText(context.Context, worker.AudioToTextMultipartRequestBody) (*worker.TextResponse, error)
Warm(context.Context, string, string, worker.RunnerEndpoint, worker.OptimizationFlags) error
Stop(context.Context) error
HasCapacity(pipeline, modelID string) bool
Expand Down
3 changes: 3 additions & 0 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
Capability_ImageToImage
Capability_ImageToVideo
Capability_Upscale
Capability_AudioToText
)

var CapabilityNameLookup = map[Capability]string{
Expand Down Expand Up @@ -106,6 +107,7 @@ var CapabilityNameLookup = map[Capability]string{
Capability_ImageToImage: "Image to image",
Capability_ImageToVideo: "Image to video",
Capability_Upscale: "Upscale",
Capability_AudioToText: "Audio to text",
}

var CapabilityTestLookup = map[Capability]CapabilityTest{
Expand Down Expand Up @@ -195,6 +197,7 @@ func OptionalCapabilities() []Capability {
Capability_ImageToImage,
Capability_ImageToVideo,
Capability_Upscale,
Capability_AudioToText,
}
}

Expand Down
8 changes: 8 additions & 0 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (orch *orchestrator) Upscale(ctx context.Context, req worker.UpscaleMultipa
return orch.node.upscale(ctx, req)
}

func (orch *orchestrator) AudioToText(ctx context.Context, req worker.AudioToTextMultipartRequestBody) (*worker.TextResponse, error) {
return orch.node.AudioToText(ctx, req)
}

func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Payment, manifestID ManifestID) error {
if orch.node == nil || orch.node.Recipient == nil {
return nil
Expand Down Expand Up @@ -950,6 +954,10 @@ func (n *LivepeerNode) upscale(ctx context.Context, req worker.UpscaleMultipartR
return n.AIWorker.Upscale(ctx, req)
}

func (n *LivepeerNode) AudioToText(ctx context.Context, req worker.AudioToTextMultipartRequestBody) (*worker.TextResponse, error) {
return n.AIWorker.AudioToText(ctx, req)
}

func (n *LivepeerNode) imageToVideo(ctx context.Context, req worker.ImageToVideoMultipartRequestBody) (*worker.ImageResponse, error) {
// We might support generating more than one video in the future (i.e. multiple input images/prompts)
numVideos := 1
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ require (
github.com/getkin/kin-openapi v0.124.0
github.com/golang/glog v1.1.1
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.3
github.com/golang/protobuf v1.5.4
github.com/jaypipes/ghw v0.10.0
github.com/jaypipes/pcidb v1.0.0
github.com/livepeer/ai-worker v0.0.8
github.com/livepeer/ai-worker v0.1.0
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18
github.com/livepeer/lpms v0.0.0-20240120150405-de94555cdc69
github.com/livepeer/lpms v0.0.0-20240711175220-227325841434
github.com/livepeer/m3u8 v0.11.1
github.com/mattn/go-sqlite3 v1.14.18
github.com/oapi-codegen/nethttp-middleware v1.0.1
Expand All @@ -32,7 +32,7 @@ require (
go.uber.org/goleak v1.3.0
golang.org/x/net v0.25.0
google.golang.org/grpc v1.57.1
google.golang.org/protobuf v1.31.0
google.golang.org/protobuf v1.33.0
pgregory.net/rapid v1.1.0
)

Expand Down Expand Up @@ -85,6 +85,7 @@ require (
github.com/go-openapi/swag v0.22.8 // indirect
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/go-test/deep v1.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw=
github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4=
github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM=
github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down Expand Up @@ -299,8 +299,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk=
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
Expand Down Expand Up @@ -533,16 +533,16 @@ github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4n
github.com/libp2p/go-netroute v0.2.0/go.mod h1:Vio7LTzZ+6hoT4CMZi5/6CpY3Snzh2vgZhWgxMNwlQI=
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc=
github.com/livepeer/ai-worker v0.0.8 h1:FAjYJgSOaZslA06Wb6MolYohI30IMIujDTB26nfw8YE=
github.com/livepeer/ai-worker v0.0.8/go.mod h1:Xlnb0nFG2VsGeMG9hZmReVQXeFt0Dv28ODiUT2ooyLE=
github.com/livepeer/ai-worker v0.1.0 h1:SJBZuxeK0vEzJPBzf5osdgVCxHYZt7ZKR2CvZ7Q7iog=
github.com/livepeer/ai-worker v0.1.0/go.mod h1:Xlnb0nFG2VsGeMG9hZmReVQXeFt0Dv28ODiUT2ooyLE=
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b h1:VQcnrqtCA2UROp7q8ljkh2XA/u0KRgVv0S1xoUvOweE=
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b/go.mod h1:hwJ5DKhl+pTanFWl+EUpw1H7ukPO/H+MFpgA7jjshzw=
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded h1:ZQlvR5RB4nfT+cOQee+WqmaDOgGtP2oDMhcVvR4L0yA=
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18 h1:4oH3NqV0NvcdS44Ld3zK2tO8IUiNozIggm74yobQeZg=
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18/go.mod h1:Jpf4jHK+fbWioBHRDRM1WadNT1qmY27g2YicTdO0Rtc=
github.com/livepeer/lpms v0.0.0-20240120150405-de94555cdc69 h1:4A6geMb+HfxBBfaS24t8R3ddpEDfWbpx7NTQZMt5Fp4=
github.com/livepeer/lpms v0.0.0-20240120150405-de94555cdc69/go.mod h1:Hr/JhxxPDipOVd4ZrGYWrdJfpVF8/SEI0nNr2ctAlkM=
github.com/livepeer/lpms v0.0.0-20240711175220-227325841434 h1:E7PKN6q/jMLapEV+eEwlwv87Xe5zacaVhvZ8T6AJR3c=
github.com/livepeer/lpms v0.0.0-20240711175220-227325841434/go.mod h1:Hr/JhxxPDipOVd4ZrGYWrdJfpVF8/SEI0nNr2ctAlkM=
github.com/livepeer/m3u8 v0.11.1 h1:VkUJzfNTyjy9mqsgp5JPvouwna8wGZMvd/gAfT5FinU=
github.com/livepeer/m3u8 v0.11.1/go.mod h1:IUqAtwWPAG2CblfQa4SVzTQoDcEMPyfNOaBSxqHMS04=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
Expand Down Expand Up @@ -1219,8 +1219,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
8 changes: 4 additions & 4 deletions install_ffmpeg.sh
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ if [[ ! -e "$ROOT/ffmpeg/libavcodec/libavcodec.a" ]]; then
./configure ${TARGET_OS:-} $DISABLE_FFMPEG_COMPONENTS --fatal-warnings \
--enable-libx264 --enable-gpl \
--enable-protocol=rtmp,file,pipe \
--enable-muxer=mpegts,hls,segment,mp4,hevc,matroska,webm,null --enable-demuxer=flv,mpegts,mp4,mov,webm,matroska,image2 \
--enable-muxer=mp3,wav,flac,mpegts,hls,segment,mp4,hevc,matroska,webm,null --enable-demuxer=mp3,wav,flac,flv,mpegts,mp4,mov,webm,matroska,image2 \
--enable-bsf=h264_mp4toannexb,aac_adtstoasc,h264_metadata,h264_redundant_pps,hevc_mp4toannexb,extract_extradata \
--enable-parser=aac,aac_latm,h264,hevc,vp8,vp9,png \
--enable-parser=mpegaudio,vorbis,opus,flac,aac,aac_latm,h264,hevc,vp8,vp9,png \
--enable-filter=abuffer,buffer,abuffersink,buffersink,afifo,fifo,aformat,format \
--enable-filter=aresample,asetnsamples,fps,scale,hwdownload,select,livepeer_dnn,signature \
--enable-encoder=aac,opus,libx264 \
--enable-decoder=aac,opus,h264,png \
--enable-encoder=mp3,vorbis,flac,aac,opus,libx264 \
--enable-decoder=mp3,vorbis,flac,aac,opus,h264,png \
--extra-cflags="${EXTRA_CFLAGS} -I${ROOT}/compiled/include -I/usr/local/cuda/include" \
--extra-ldflags="${EXTRA_FFMPEG_LDFLAGS} -L${ROOT}/compiled/lib -L/usr/local/cuda/lib64" \
--prefix="$ROOT/compiled" \
Expand Down
48 changes: 43 additions & 5 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func startAIServer(lp lphttp) error {
lp.transRPC.Handle("/image-to-image", oapiReqValidator(lp.ImageToImage()))
lp.transRPC.Handle("/image-to-video", oapiReqValidator(lp.ImageToVideo()))
lp.transRPC.Handle("/upscale", oapiReqValidator(lp.Upscale()))
lp.transRPC.Handle("/audio-to-text", oapiReqValidator(lp.AudioToText()))

return nil
}
Expand Down Expand Up @@ -132,6 +133,29 @@ func (h *lphttp) Upscale() http.Handler {
})
}

func (h *lphttp) AudioToText() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
orch := h.orchestrator

remoteAddr := getRemoteAddr(r)
ctx := clog.AddVal(r.Context(), clog.ClientIP, remoteAddr)

multiRdr, err := r.MultipartReader()
if err != nil {
respondWithError(w, err.Error(), http.StatusBadRequest)
return
}

var req worker.AudioToTextMultipartRequestBody
if err := runtime.BindMultipart(&req, *multiRdr); err != nil {
respondWithError(w, err.Error(), http.StatusInternalServerError)
return
}

handleAIRequest(ctx, w, r, orch, req)
})
}

func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, orch Orchestrator, req interface{}) {
payment, err := getPayment(r.Header.Get(paymentHeader))
if err != nil {
Expand All @@ -149,15 +173,15 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
var cap core.Capability
var pipeline string
var modelID string
var submitFn func(context.Context) (*worker.ImageResponse, error)
var submitFn func(context.Context) (interface{}, error)
var outPixels int64

switch v := req.(type) {
case worker.TextToImageJSONRequestBody:
pipeline = "text-to-image"
cap = core.Capability_TextToImage
modelID = *v.ModelId
submitFn = func(ctx context.Context) (*worker.ImageResponse, error) {
submitFn = func(ctx context.Context) (interface{}, error) {
return orch.TextToImage(ctx, v)
}

Expand All @@ -176,7 +200,7 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
pipeline = "image-to-image"
cap = core.Capability_ImageToImage
modelID = *v.ModelId
submitFn = func(ctx context.Context) (*worker.ImageResponse, error) {
submitFn = func(ctx context.Context) (interface{}, error) {
return orch.ImageToImage(ctx, v)
}

Expand All @@ -195,7 +219,7 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
pipeline = "upscale"
cap = core.Capability_Upscale
modelID = *v.ModelId
submitFn = func(ctx context.Context) (*worker.ImageResponse, error) {
submitFn = func(ctx context.Context) (interface{}, error) {
return orch.Upscale(ctx, v)
}

Expand All @@ -214,7 +238,7 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
pipeline = "image-to-video"
cap = core.Capability_ImageToVideo
modelID = *v.ModelId
submitFn = func(ctx context.Context) (*worker.ImageResponse, error) {
submitFn = func(ctx context.Context) (interface{}, error) {
return orch.ImageToVideo(ctx, v)
}

Expand All @@ -231,6 +255,20 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
frames := int64(25)

outPixels = height * width * int64(frames)
case worker.AudioToTextMultipartRequestBody:
pipeline = "audio-to-text"
cap = core.Capability_AudioToText
modelID = *v.ModelId
submitFn = func(ctx context.Context) (interface{}, error) {
return orch.AudioToText(ctx, v)
}

outPixels, err = common.CalculateAudioDuration(v.Audio)
if err != nil {
respondWithError(w, "Unable to calculate duration", http.StatusBadRequest)
return
}
outPixels *= 1000 // Convert to milliseconds
default:
respondWithError(w, "Unknown request type", http.StatusBadRequest)
return
Expand Down
54 changes: 54 additions & 0 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func startAIMediaServer(ls *LivepeerServer) error {
ls.HTTPMux.Handle("/upscale", oapiReqValidator(ls.Upscale()))
ls.HTTPMux.Handle("/image-to-video", oapiReqValidator(ls.ImageToVideo()))
ls.HTTPMux.Handle("/image-to-video/result", ls.ImageToVideoResult())
ls.HTTPMux.Handle("/audio-to-text", oapiReqValidator(ls.AudioToText()))

return nil
}
Expand Down Expand Up @@ -320,6 +321,59 @@ func (ls *LivepeerServer) Upscale() http.Handler {
})
}

func (ls *LivepeerServer) AudioToText() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
remoteAddr := getRemoteAddr(r)
ctx := clog.AddVal(r.Context(), clog.ClientIP, remoteAddr)
requestID := string(core.RandomManifestID())
ctx = clog.AddVal(ctx, "request_id", requestID)

multiRdr, err := r.MultipartReader()
if err != nil {
respondJsonError(ctx, w, err, http.StatusBadRequest)
return
}

var req worker.AudioToTextMultipartRequestBody
if err := runtime.BindMultipart(&req, *multiRdr); err != nil {
respondJsonError(ctx, w, err, http.StatusBadRequest)
return
}

clog.V(common.VERBOSE).Infof(ctx, "Received AudioToText request audioSize=%v model_id=%v", req.Audio.FileSize(), *req.ModelId)

params := aiRequestParams{
node: ls.LivepeerNode,
os: drivers.NodeStorage.NewSession(requestID),
sessManager: ls.AISessionManager,
}

start := time.Now()
resp, err := processAudioToText(ctx, params, req)
if err != nil {
var serviceUnavailableErr *ServiceUnavailableError
var badRequestErr *BadRequestError
if errors.As(err, &serviceUnavailableErr) {
respondJsonError(ctx, w, err, http.StatusServiceUnavailable)
return
}
if errors.As(err, &badRequestErr) {
respondJsonError(ctx, w, err, http.StatusBadRequest)
return
}
respondJsonError(ctx, w, err, http.StatusInternalServerError)
return
}

took := time.Since(start)
clog.V(common.VERBOSE).Infof(ctx, "Processed AudioToText request model_id=%v took=%v", *req.ModelId, took)

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(resp)
})
}

func (ls *LivepeerServer) ImageToVideoResult() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
remoteAddr := getRemoteAddr(r)
Expand Down
Loading

0 comments on commit 3635140

Please sign in to comment.