Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race #492

Merged
merged 3 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions clients/arweave_ipfs_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
const SCHEME_IPFS = "ipfs"
const SCHEME_ARWEAVE = "ar"

var dstorageRetryBackoff = backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), 2)

func CopyDStorageToS3(url, s3URL string, requestID string) error {
return backoff.Retry(func() error {
content, err := DownloadDStorageFromGatewayList(url, requestID)
Expand All @@ -31,7 +29,7 @@ func CopyDStorageToS3(url, s3URL string, requestID string) error {
}

return nil
}, dstorageRetryBackoff)
}, DStorageRetryBackoff())
}

func DownloadDStorageFromGatewayList(u string, requestID string) (io.ReadCloser, error) {
Expand Down Expand Up @@ -128,3 +126,7 @@ func parseDStorageGatewayURL(u *url.URL) (string, string, string) {

return "", "", ""
}

func DStorageRetryBackoff() backoff.BackOff {
return backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), 2)
}
4 changes: 1 addition & 3 deletions clients/input_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ const MAX_COPY_FILE_DURATION = 30 * time.Minute
const MaxInputFileSizeBytes = 10 * 1024 * 1024 * 1024 // 10 GiB
const PresignDuration = 10 * time.Minute

var RETRY_BACKOFF = backoff.WithMaxRetries(newExponentialBackOffExecutor(), 5)

type InputCopier interface {
CopyInputToS3(requestID string, inputFile, osTransferURL *url.URL) (video.InputVideo, string, error)
}
Expand Down Expand Up @@ -115,7 +113,7 @@ func CopyFile(ctx context.Context, sourceURL, destOSBaseURL, filename, requestID
content := io.TeeReader(c, &byteAccWriter)

return UploadToOSURL(destOSBaseURL, filename, content, MAX_COPY_FILE_DURATION)
}, RETRY_BACKOFF)
}, UploadRetryBackoff())
return
}

Expand Down
4 changes: 4 additions & 0 deletions clients/object_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,7 @@ func newExponentialBackOffExecutor() *backoff.ExponentialBackOff {

return backOff
}

func UploadRetryBackoff() backoff.BackOff {
return backoff.WithMaxRetries(newExponentialBackOffExecutor(), 5)
}
6 changes: 3 additions & 3 deletions transcode/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func DownloadRenditionManifest(sourceManifestOSURL string) (m3u8.MediaPlaylist,
return fmt.Errorf("error decoding manifest: %s", err)
}
return nil
}, transcodeRetryBackoff)
}, TranscodeRetryBackoff())
if err != nil {
return m3u8.MediaPlaylist{}, err
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func GenerateAndUploadManifests(sourceManifest m3u8.MediaPlaylist, targetOSURL s
renditionManifestBaseURL := fmt.Sprintf("%s/%s", targetOSURL, profile.Name)
err = backoff.Retry(func() error {
return clients.UploadToOSURL(renditionManifestBaseURL, manifestFilename, strings.NewReader(renditionPlaylist.String()), UPLOAD_TIMEOUT)
}, clients.RETRY_BACKOFF)
}, clients.UploadRetryBackoff())
if err != nil {
return "", fmt.Errorf("failed to upload rendition playlist: %s", err)
}
Expand All @@ -152,7 +152,7 @@ func GenerateAndUploadManifests(sourceManifest m3u8.MediaPlaylist, targetOSURL s

err := backoff.Retry(func() error {
return clients.UploadToOSURL(targetOSURL, MASTER_MANIFEST_FILENAME, strings.NewReader(masterPlaylist.String()), UPLOAD_TIMEOUT)
}, clients.RETRY_BACKOFF)
}, clients.UploadRetryBackoff())
if err != nil {
return "", fmt.Errorf("failed to upload master playlist: %s", err)
}
Expand Down
10 changes: 6 additions & 4 deletions transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (

const UPLOAD_TIMEOUT = 5 * time.Minute

var transcodeRetryBackoff = backoff.WithMaxRetries(backoff.NewConstantBackOff(5*time.Second), 10)

type TranscodeSegmentRequest struct {
SourceFile string `json:"source_location"`
CallbackURL string `json:"callback_url"`
Expand Down Expand Up @@ -188,7 +186,7 @@ func transcodeSegment(
}
}
return nil
}, transcodeRetryBackoff)
}, TranscodeRetryBackoff())

if err != nil {
return err
Expand All @@ -210,7 +208,7 @@ func transcodeSegment(

err = backoff.Retry(func() error {
return clients.UploadToOSURL(targetRenditionURL, fmt.Sprintf("%d.ts", segment.Index), bytes.NewReader(transcodedSegment.MediaData), UPLOAD_TIMEOUT)
}, clients.RETRY_BACKOFF)
}, clients.UploadRetryBackoff())
if err != nil {
return fmt.Errorf("failed to upload master playlist: %s", err)
}
Expand Down Expand Up @@ -276,3 +274,7 @@ type RenditionStats struct {
ManifestLocation string
BitsPerSecond uint32
}

func TranscodeRetryBackoff() backoff.BackOff {
return backoff.WithMaxRetries(backoff.NewConstantBackOff(5*time.Second), 10)
}