Skip to content

Commit

Permalink
move copy functions to input_copy
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Feb 16, 2023
1 parent 623cadd commit 403c156
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 54 deletions.
56 changes: 56 additions & 0 deletions clients/input_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"time"

xerrors "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"
"github.com/livepeer/catalyst-api/video"
"github.com/livepeer/go-tools/drivers"
)

type InputCopy struct {
Expand Down Expand Up @@ -64,3 +69,54 @@ func (s *InputCopy) CopyInputToS3(args TranscodeJobArgs, s3HTTPTransferURL *url.
args.InputFile = s3URL
return args, nil
}

func CopyFile(ctx context.Context, sourceURL, destOSBaseURL, filename, requestID string) (int64, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
writtenBytes := ByteAccumulatorWriter{count: 0}
c, err := getFile(ctx, sourceURL, requestID)
if err != nil {
return writtenBytes.count, fmt.Errorf("download error: %w", err)
}
defer c.Close()

content := io.TeeReader(c, &writtenBytes)

err = UploadToOSURL(destOSBaseURL, filename, content, 5*time.Minute)
if err != nil {
return writtenBytes.count, fmt.Errorf("upload error: %w", err)
}

return writtenBytes.count, nil
}

func getFile(ctx context.Context, url, requestID string) (io.ReadCloser, error) {
_, err := drivers.ParseOSURL(url, true)
if err == nil {
return DownloadOSURL(url)
} else if IsDStorageResource(url) {
return DownloadDStorageFromGatewayList(url, requestID)
} else {
return getFileHTTP(ctx, url)
}
}

func getFileHTTP(ctx context.Context, url string) (io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, xerrors.Unretriable(fmt.Errorf("error creating http request: %w", err))
}
resp, err := retryableHttpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error on import request: %w", err)
}
if resp.StatusCode >= 300 {
resp.Body.Close()
err := fmt.Errorf("bad status code from import request: %d %s", resp.StatusCode, resp.Status)
if resp.StatusCode < 500 {
err = xerrors.Unretriable(err)
}
return nil, err
}
return resp.Body, nil
}
54 changes: 0 additions & 54 deletions clients/mediaconvert.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
Expand All @@ -17,10 +16,8 @@ import (
"github.com/aws/aws-sdk-go/service/mediaconvert"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/hashicorp/go-retryablehttp"
xerrors "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"
"github.com/livepeer/catalyst-api/video"
"github.com/livepeer/go-tools/drivers"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -408,57 +405,6 @@ func output(container, name string, height, maxBitrate int64) *mediaconvert.Outp
}
}

func getFile(ctx context.Context, url, requestID string) (io.ReadCloser, error) {
_, err := drivers.ParseOSURL(url, true)
if err == nil {
return DownloadOSURL(url)
} else if IsDStorageResource(url) {
return DownloadDStorageFromGatewayList(url, requestID)
} else {
return getFileHTTP(ctx, url)
}
}

func getFileHTTP(ctx context.Context, url string) (io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, xerrors.Unretriable(fmt.Errorf("error creating http request: %w", err))
}
resp, err := retryableHttpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error on import request: %w", err)
}
if resp.StatusCode >= 300 {
resp.Body.Close()
err := fmt.Errorf("bad status code from import request: %d %s", resp.StatusCode, resp.Status)
if resp.StatusCode < 500 {
err = xerrors.Unretriable(err)
}
return nil, err
}
return resp.Body, nil
}

func CopyFile(ctx context.Context, sourceURL, destOSBaseURL, filename, requestID string) (int64, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
writtenBytes := ByteAccumulatorWriter{count: 0}
c, err := getFile(ctx, sourceURL, requestID)
if err != nil {
return writtenBytes.count, fmt.Errorf("download error: %w", err)
}
defer c.Close()

content := io.TeeReader(c, &writtenBytes)

err = UploadToOSURL(destOSBaseURL, filename, content, 5*time.Minute)
if err != nil {
return writtenBytes.count, fmt.Errorf("upload error: %w", err)
}

return writtenBytes.count, nil
}

func copyDir(source, dest *url.URL, args TranscodeJobArgs) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
Expand Down

0 comments on commit 403c156

Please sign in to comment.