diff --git a/clients/input_copy.go b/clients/input_copy.go index deb864588..026f3d057 100644 --- a/clients/input_copy.go +++ b/clients/input_copy.go @@ -4,10 +4,15 @@ import ( "context" "errors" "fmt" + "io" + "net/http" "net/url" + "time" + xerrors "github.com/livepeer/catalyst-api/errors" "github.com/livepeer/catalyst-api/log" "github.com/livepeer/catalyst-api/video" + "github.com/livepeer/go-tools/drivers" ) type InputCopy struct { @@ -64,3 +69,54 @@ func (s *InputCopy) CopyInputToS3(args TranscodeJobArgs, s3HTTPTransferURL *url. args.InputFile = s3URL return args, nil } + +func CopyFile(ctx context.Context, sourceURL, destOSBaseURL, filename, requestID string) (int64, error) { + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + writtenBytes := ByteAccumulatorWriter{count: 0} + c, err := getFile(ctx, sourceURL, requestID) + if err != nil { + return writtenBytes.count, fmt.Errorf("download error: %w", err) + } + defer c.Close() + + content := io.TeeReader(c, &writtenBytes) + + err = UploadToOSURL(destOSBaseURL, filename, content, 5*time.Minute) + if err != nil { + return writtenBytes.count, fmt.Errorf("upload error: %w", err) + } + + return writtenBytes.count, nil +} + +func getFile(ctx context.Context, url, requestID string) (io.ReadCloser, error) { + _, err := drivers.ParseOSURL(url, true) + if err == nil { + return DownloadOSURL(url) + } else if IsDStorageResource(url) { + return DownloadDStorageFromGatewayList(url, requestID) + } else { + return getFileHTTP(ctx, url) + } +} + +func getFileHTTP(ctx context.Context, url string) (io.ReadCloser, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, xerrors.Unretriable(fmt.Errorf("error creating http request: %w", err)) + } + resp, err := retryableHttpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("error on import request: %w", err) + } + if resp.StatusCode >= 300 { + resp.Body.Close() + err := fmt.Errorf("bad status code from import request: %d %s", resp.StatusCode, resp.Status) + if resp.StatusCode < 500 { + err = xerrors.Unretriable(err) + } + return nil, err + } + return resp.Body, nil +} diff --git a/clients/mediaconvert.go b/clients/mediaconvert.go index bad5c36a5..7b4637348 100644 --- a/clients/mediaconvert.go +++ b/clients/mediaconvert.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "net/http" "net/url" "path" @@ -17,10 +16,8 @@ import ( "github.com/aws/aws-sdk-go/service/mediaconvert" "github.com/aws/aws-sdk-go/service/s3" "github.com/hashicorp/go-retryablehttp" - xerrors "github.com/livepeer/catalyst-api/errors" "github.com/livepeer/catalyst-api/log" "github.com/livepeer/catalyst-api/video" - "github.com/livepeer/go-tools/drivers" "golang.org/x/sync/errgroup" ) @@ -408,57 +405,6 @@ func output(container, name string, height, maxBitrate int64) *mediaconvert.Outp } } -func getFile(ctx context.Context, url, requestID string) (io.ReadCloser, error) { - _, err := drivers.ParseOSURL(url, true) - if err == nil { - return DownloadOSURL(url) - } else if IsDStorageResource(url) { - return DownloadDStorageFromGatewayList(url, requestID) - } else { - return getFileHTTP(ctx, url) - } -} - -func getFileHTTP(ctx context.Context, url string) (io.ReadCloser, error) { - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return nil, xerrors.Unretriable(fmt.Errorf("error creating http request: %w", err)) - } - resp, err := retryableHttpClient.Do(req) - if err != nil { - return nil, fmt.Errorf("error on import request: %w", err) - } - if resp.StatusCode >= 300 { - resp.Body.Close() - err := fmt.Errorf("bad status code from import request: %d %s", resp.StatusCode, resp.Status) - if resp.StatusCode < 500 { - err = xerrors.Unretriable(err) - } - return nil, err - } - return resp.Body, nil -} - -func CopyFile(ctx context.Context, sourceURL, destOSBaseURL, filename, requestID string) (int64, error) { - ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) - defer cancel() - writtenBytes := ByteAccumulatorWriter{count: 0} - c, err := getFile(ctx, sourceURL, requestID) - if err != nil { - return writtenBytes.count, fmt.Errorf("download error: %w", err) - } - defer c.Close() - - content := io.TeeReader(c, &writtenBytes) - - err = UploadToOSURL(destOSBaseURL, filename, content, 5*time.Minute) - if err != nil { - return writtenBytes.count, fmt.Errorf("upload error: %w", err) - } - - return writtenBytes.count, nil -} - func copyDir(source, dest *url.URL, args TranscodeJobArgs) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel()