From 6ad288334247d90004c4f0ee9abfbbfb371ccc66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=86=E3=81=95=E3=81=BF=E3=82=87=E3=82=93=28myon2019?= =?UTF-8?q?=29/mtripg6666tdr?= <56076195+mtripg6666tdr@users.noreply.github.com> Date: Wed, 7 Sep 2022 21:47:54 +0900 Subject: [PATCH] Fix(stream): stream stops unexpectedly before it ends (#424) * Fix(stream): stream stops unexpectedly before it ends * Chore: update gitignore * Fix: building --- .gitignore | 2 +- .vscode/launch.json | 6 +++++- src/AudioSource/niconico.ts | 2 +- src/AudioSource/soundcloud.ts | 2 +- .../youtube/strategies/youtube-dl.ts | 2 +- src/AudioSource/youtube/stream.ts | 2 +- src/Component/PlayManager.ts | 17 ++++++++--------- src/Component/streams/index.ts | 12 +++++++----- src/Util/general.ts | 2 +- 9 files changed, 26 insertions(+), 21 deletions(-) diff --git a/.gitignore b/.gitignore index ae65a6a1d..89e4d45d2 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,4 @@ node *.log addon config.json -logs/* +logs diff --git a/.vscode/launch.json b/.vscode/launch.json index e11915cdf..fb7bd1c5c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -13,8 +13,10 @@ "/**" ], "program": "${workspaceFolder}/dist/index.js", + "runtimeExecutable": "${workspaceFolder}/node_modules/.bin/node", "windows": { "program": "${workspaceFolder}\\dist\\index.js", + "runtimeExecutable": "${workspaceFolder}\\node_modules\\.bin\\node", } }, { @@ -30,8 +32,10 @@ "/**" ], "program": "${workspaceFolder}\\dist\\index.js", + "runtimeExecutable": "${workspaceFolder}/node_modules/.bin/node", "windows": { - "program": "${workspaceFolder}/dist/index.js" + "program": "${workspaceFolder}/dist/index.js", + "runtimeExecutable": "${workspaceFolder}\\node_modules\\.bin\\node", } } ] diff --git a/src/AudioSource/niconico.ts b/src/AudioSource/niconico.ts index c30d7de31..46aeaf3fc 100644 --- a/src/AudioSource/niconico.ts +++ b/src/AudioSource/niconico.ts @@ -58,7 +58,7 @@ export class NicoNicoS extends AudioSource { } async fetch():Promise{ - const stream = Util.general.InitPassThrough(); + const stream = Util.general.createPassThrough(); const source = await this.nico.download() as Readable; source .on("error", e => !stream.destroyed ? stream.destroy(e) : stream.emit("error", e)) diff --git a/src/AudioSource/soundcloud.ts b/src/AudioSource/soundcloud.ts index 89b4febbd..26f89d850 100644 --- a/src/AudioSource/soundcloud.ts +++ b/src/AudioSource/soundcloud.ts @@ -55,7 +55,7 @@ export class SoundCloudS extends AudioSource { async fetch():Promise{ const sc = new SoundCloud(); const source = await sc.util.streamTrack(this.Url) as Readable; - const stream = Util.general.InitPassThrough(); + const stream = Util.general.createPassThrough(); source .on("error", e => !stream.destroyed ? stream.destroy(e) : stream.emit("error", e)) .pipe(stream) diff --git a/src/AudioSource/youtube/strategies/youtube-dl.ts b/src/AudioSource/youtube/strategies/youtube-dl.ts index daeb26140..59507d39f 100644 --- a/src/AudioSource/youtube/strategies/youtube-dl.ts +++ b/src/AudioSource/youtube/strategies/youtube-dl.ts @@ -85,7 +85,7 @@ export class youtubeDlStrategy extends Strategy, }; } // don't use initpassthrough here - const stream = Util.general.InitPassThrough(); + const stream = Util.general.createPassThrough(); const req = m3u8stream(format[0].url, { begin: Date.now(), parser: "m3u8", diff --git a/src/AudioSource/youtube/stream.ts b/src/AudioSource/youtube/stream.ts index be838cbd1..e43c8c294 100644 --- a/src/AudioSource/youtube/stream.ts +++ b/src/AudioSource/youtube/stream.ts @@ -21,7 +21,7 @@ import * as ytdl from "ytdl-core"; import { Util } from "../../Util"; export function createChunkedYTStream(info:ytdl.videoInfo, format:ytdl.videoFormat, options:ytdl.downloadOptions, chunkSize:number = 512 * 1024){ - const stream = Util.general.InitPassThrough(); + const stream = Util.general.createPassThrough(); let current = -1; const contentLength = Number(format.contentLength); if(contentLength < chunkSize){ diff --git a/src/Component/PlayManager.ts b/src/Component/PlayManager.ts index 522d7d3b6..d3828b29e 100644 --- a/src/Component/PlayManager.ts +++ b/src/Component/PlayManager.ts @@ -180,9 +180,7 @@ export class PlayManager extends ManagerBase { const getStreamReadable = () => !(stream.readableEnded || stream.destroyed || errorWhileWaiting) && stream.readableLength > 0; if(!getStreamReadable()){ this.Log("Stream has not been readable yet. Waiting...", "debug"); - await Util.general.waitForEnteringState(getStreamReadable, 20 * 1000, { - rejectOnTimeout: true, - }); + await Util.general.waitForEnteringState(getStreamReadable, 20 * 1000); } if(errorWhileWaiting){ throw errorWhileWaiting; @@ -199,7 +197,7 @@ export class PlayManager extends ManagerBase { }); // setup volume this.setVolume(this.volume); - stream.on("end", this.onStreamFinished.bind(this)); + ((connection.piper as any)["_endStream"]).once("end", this.onStreamFinished.bind(this)); // wait for entering playing state await Util.general.waitForEnteringState(() => this.server.connection.playing); this.preparing = false; @@ -344,13 +342,14 @@ export class PlayManager extends ManagerBase { } private async onStreamFinished(){ - if(!this.server.connection) return; this.Log("onStreamFinished called"); if(this.server.connection){ - await Util.general.waitForEnteringState(() => !this.server.connection.playing, 20 * 1000).catch(() => { - this.Log("Stream has not ended in time and will force stream into destroying", "warn"); - this.stop(); - }); + await Util.general.waitForEnteringState(() => !this.server.connection || !this.server.connection.playing, 20 * 1000) + .catch(() => { + this.Log("Stream has not ended in time and will force stream into destroying", "warn"); + this.stop(); + }) + ; } // ストリームが終了したら時間を確認しつつ次の曲へ移行 this.Log("Stream finished"); diff --git a/src/Component/streams/index.ts b/src/Component/streams/index.ts index affbdad21..643c1c26d 100644 --- a/src/Component/streams/index.ts +++ b/src/Component/streams/index.ts @@ -22,7 +22,7 @@ import type { Readable, TransformOptions } from "stream"; import { opus } from "prism-media"; import Util from "../../Util"; -import { InitPassThrough } from "../../Util/general"; +import { createPassThrough } from "../../Util/general"; import { transformThroughFFmpeg } from "./ffmpeg"; type PlayableStreamInfo = PartialPlayableStream & { @@ -74,7 +74,7 @@ export function resolveStreamToPlayable(streamInfo:StreamInfo, effects:string[], // Total: 3 Util.logger.log(`[StreamResolver] stream edges: raw(${streamInfo.streamType || "unknown"}) --(FFmpeg)--> Ogg/Opus (cost: 3)`); const ffmpeg = transformThroughFFmpeg(streamInfo, bitrate, effects, seek, "ogg"); - const passThrough = InitPassThrough(); + const passThrough = createPassThrough(); ffmpeg .on("error", e => destroyStream(passThrough, e)) .pipe(passThrough) @@ -97,10 +97,12 @@ export function resolveStreamToPlayable(streamInfo:StreamInfo, effects:string[], const decoder = new opus.Decoder({ rate: 48000, channels: 2, - frameSize: 960 + frameSize: 960, }); - const passThrough = InitPassThrough(); + const passThrough = createPassThrough(); + const normalizeThrough = createPassThrough(); rawStream.stream + .pipe(normalizeThrough) .on("error", e => destroyStream(demuxer, e)) .pipe(demuxer) .on("error", e => destroyStream(decoder, e)) @@ -123,7 +125,7 @@ export function resolveStreamToPlayable(streamInfo:StreamInfo, effects:string[], // Total: 5 Util.logger.log(`[StreamResolver] stream edges: raw(${streamInfo.streamType || "unknown"}) --(FFmpeg) --> PCM (cost: 5)`); const ffmpegPCM = transformThroughFFmpeg(streamInfo, bitrate, effects, seek, "pcm"); - const passThrough = InitPassThrough(); + const passThrough = createPassThrough(); ffmpegPCM .on("error", e => destroyStream(passThrough, e)) .pipe(passThrough) diff --git a/src/Util/general.ts b/src/Util/general.ts index 8d3778f73..7b83646fc 100644 --- a/src/Util/general.ts +++ b/src/Util/general.ts @@ -68,7 +68,7 @@ export function StringifyObject(obj:any):string{ * 空のPassThroughを生成します * @returns PassThrough */ -export function InitPassThrough(opts:TransformOptions = {}):PassThrough{ +export function createPassThrough(opts:TransformOptions = {}):PassThrough{ const id = Date.now(); log(`[PassThrough] initialized (id: ${id})`, "debug"); const stream = new PassThrough(Object.assign(opts, {