Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@uppy/companion: fix double tus uploads #4816

Merged
merged 4 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions packages/@uppy/companion-client/src/RequestClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export default class RequestClient {
return stripSlash(companion && companion[host] ? companion[host] : host)
}

async headers (emptyBody = false) {
async headers(emptyBody = false) {
const defaultHeaders = {
Accept: 'application/json',
...(emptyBody ? undefined : {
Expand Down Expand Up @@ -397,15 +397,6 @@ export default class RequestClient {

isPaused = newPausedState
if (socket) sendState()

if (newPausedState) {
// Remove this file from the queue so another file can start in its place.
socketAbortController?.abort?.() // close socket to free up the request for other uploads
} else {
// Resuming an upload should be queued, else you could pause and then
// resume a queued upload to make it skip the queue.
createWebsocket()
}
}

const onFileRemove = (targetFile) => {
Expand Down
57 changes: 34 additions & 23 deletions packages/@uppy/companion/src/server/Uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ function sanitizeMetadata(inputMetadata) {
return outputMetadata
}

class AbortError extends Error {
isAbortError = true
}

class ValidationError extends Error {
constructor(message) {
super(message)
Expand Down Expand Up @@ -139,6 +135,13 @@ function validateOptions(options) {
}
}

const states = {
idle: 'idle',
uploading: 'uploading',
paused: 'paused',
done: 'done',
}

class Uploader {
/**
* Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart)
Expand Down Expand Up @@ -176,10 +179,7 @@ class Uploader {
? this.options.metadata.name.substring(0, MAX_FILENAME_LENGTH)
: this.fileName

this.uploadStopped = false

this.storage = options.storage
this._paused = false

this.downloadedBytes = 0

Expand All @@ -188,15 +188,17 @@ class Uploader {
if (this.options.protocol === PROTOCOLS.tus) {
emitter().on(`pause:${this.token}`, () => {
logger.debug('Received from client: pause', 'uploader', this.shortToken)
this._paused = true
if (this.#uploadState !== states.uploading) return
this.#uploadState = states.paused
if (this.tus) {
this.tus.abort()
}
})

emitter().on(`resume:${this.token}`, () => {
logger.debug('Received from client: resume', 'uploader', this.shortToken)
this._paused = false
if (this.#uploadState !== states.paused) return
this.#uploadState = states.uploading
if (this.tus) {
this.tus.start()
}
Expand All @@ -205,17 +207,21 @@ class Uploader {

emitter().on(`cancel:${this.token}`, () => {
logger.debug('Received from client: cancel', 'uploader', this.shortToken)
this._paused = true
if (this.tus) {
const shouldTerminate = !!this.tus.url
this.tus.abort(shouldTerminate).catch(() => { })
}
this.abortReadStream(new AbortError())
this.#canceled = true
this.abortReadStream(new Error('Canceled'))
})
}

#uploadState = 'idle'
mifi marked this conversation as resolved.
Show resolved Hide resolved

#canceled = false

abortReadStream(err) {
this.uploadStopped = true
this.#uploadState = states.done
if (this.readStream) this.readStream.destroy(err)
}

Expand Down Expand Up @@ -244,7 +250,9 @@ class Uploader {

const onData = (chunk) => {
this.downloadedBytes += chunk.length
if (exceedsMaxFileSize(this.options.companionOptions.maxFileSize, this.downloadedBytes)) this.abortReadStream(new Error('maxFileSize exceeded'))
if (exceedsMaxFileSize(this.options.companionOptions.maxFileSize, this.downloadedBytes)) {
this.abortReadStream(new Error('maxFileSize exceeded'))
}
this.onProgress(0, undefined)
}

Expand All @@ -271,9 +279,11 @@ class Uploader {
*/
async uploadStream(stream) {
try {
if (this.uploadStopped) throw new Error('Cannot upload stream after upload stopped')
if (this.#uploadState !== states.idle) throw new Error('Can only start an upload in the idle state')
if (this.readStream) throw new Error('Already uploading')

this.#uploadState = states.uploading

this.readStream = stream
if (this._needDownloadFirst()) {
logger.debug('need to download the whole file first', 'controller.get.provider.size', this.shortToken)
Expand All @@ -282,7 +292,7 @@ class Uploader {
// The stream will then typically come from a "Transfer-Encoding: chunked" response
await this._downloadStreamAsFile(this.readStream)
}
if (this.uploadStopped) return undefined
if (this.#uploadState !== states.uploading) return undefined

const { url, extraData } = await Promise.race([
this._uploadByProtocol(),
Expand All @@ -291,6 +301,7 @@ class Uploader {
])
return { url, extraData }
} finally {
this.#uploadState = states.done
logger.debug('cleanup', this.shortToken)
if (this.readStream && !this.readStream.destroyed) this.readStream.destroy()
await this.tryDeleteTmpPath()
Expand All @@ -314,11 +325,10 @@ class Uploader {
const { url, extraData } = ret
this.#emitSuccess(url, extraData)
} catch (err) {
if (err?.isAbortError) {
if (this.#canceled) {
logger.error('Aborted upload', 'uploader.aborted', this.shortToken)
return
}
// console.log(err)
logger.error(err, 'uploader.error', this.shortToken)
this.#emitError(err)
} finally {
Expand Down Expand Up @@ -458,7 +468,7 @@ class Uploader {

const formattedPercentage = percentage.toFixed(2)

if (this._paused || this.uploadStopped) {
if (this.#uploadState !== states.uploading) {
return
}

Expand Down Expand Up @@ -519,7 +529,8 @@ class Uploader {
const chunkSize = this.options.chunkSize || (isFileStream ? Infinity : 50e6)

return new Promise((resolve, reject) => {
this.tus = new tus.Upload(stream, {

const tusOptions = {
endpoint: this.options.endpoint,
uploadUrl: this.options.uploadUrl,
uploadLengthDeferred: !isFileStream,
Expand Down Expand Up @@ -564,11 +575,11 @@ class Uploader {
onSuccess() {
resolve({ url: uploader.tus.url })
},
})

if (!this._paused) {
this.tus.start()
}

this.tus = new tus.Upload(stream, tusOptions)

this.tus.start()
})
}

Expand Down