Skip to content

Commit

Permalink
Fix(stream): stream stops unexpectedly before it ends (#424)
Browse files Browse the repository at this point in the history
* Fix(stream): stream stops unexpectedly before it ends

* Chore: update gitignore

* Fix: building
  • Loading branch information
mtripg6666tdr authored Sep 7, 2022
1 parent 8994054 commit 6ad2883
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ node
*.log
addon
config.json
logs/*
logs
6 changes: 5 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
"<node_internals>/**"
],
"program": "${workspaceFolder}/dist/index.js",
"runtimeExecutable": "${workspaceFolder}/node_modules/.bin/node",
"windows": {
"program": "${workspaceFolder}\\dist\\index.js",
"runtimeExecutable": "${workspaceFolder}\\node_modules\\.bin\\node",
}
},
{
Expand All @@ -30,8 +32,10 @@
"<node_internals>/**"
],
"program": "${workspaceFolder}\\dist\\index.js",
"runtimeExecutable": "${workspaceFolder}/node_modules/.bin/node",
"windows": {
"program": "${workspaceFolder}/dist/index.js"
"program": "${workspaceFolder}/dist/index.js",
"runtimeExecutable": "${workspaceFolder}\\node_modules\\.bin\\node",
}
}
]
Expand Down
2 changes: 1 addition & 1 deletion src/AudioSource/niconico.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export class NicoNicoS extends AudioSource {
}

async fetch():Promise<ReadableStreamInfo>{
const stream = Util.general.InitPassThrough();
const stream = Util.general.createPassThrough();
const source = await this.nico.download() as Readable;
source
.on("error", e => !stream.destroyed ? stream.destroy(e) : stream.emit("error", e))
Expand Down
2 changes: 1 addition & 1 deletion src/AudioSource/soundcloud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class SoundCloudS extends AudioSource {
async fetch():Promise<ReadableStreamInfo>{
const sc = new SoundCloud();
const source = await sc.util.streamTrack(this.Url) as Readable;
const stream = Util.general.InitPassThrough();
const stream = Util.general.createPassThrough();
source
.on("error", e => !stream.destroyed ? stream.destroy(e) : stream.emit("error", e))
.pipe(stream)
Expand Down
2 changes: 1 addition & 1 deletion src/AudioSource/youtube/strategies/youtube-dl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class youtubeDlStrategy extends Strategy<Cache<youtubeDl, YoutubeDlInfo>,
};
}
// don't use initpassthrough here
const stream = Util.general.InitPassThrough();
const stream = Util.general.createPassThrough();
const req = m3u8stream(format[0].url, {
begin: Date.now(),
parser: "m3u8",
Expand Down
2 changes: 1 addition & 1 deletion src/AudioSource/youtube/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import * as ytdl from "ytdl-core";
import { Util } from "../../Util";

export function createChunkedYTStream(info:ytdl.videoInfo, format:ytdl.videoFormat, options:ytdl.downloadOptions, chunkSize:number = 512 * 1024){
const stream = Util.general.InitPassThrough();
const stream = Util.general.createPassThrough();
let current = -1;
const contentLength = Number(format.contentLength);
if(contentLength < chunkSize){
Expand Down
17 changes: 8 additions & 9 deletions src/Component/PlayManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,7 @@ export class PlayManager extends ManagerBase {
const getStreamReadable = () => !(stream.readableEnded || stream.destroyed || errorWhileWaiting) && stream.readableLength > 0;
if(!getStreamReadable()){
this.Log("Stream has not been readable yet. Waiting...", "debug");
await Util.general.waitForEnteringState(getStreamReadable, 20 * 1000, {
rejectOnTimeout: true,
});
await Util.general.waitForEnteringState(getStreamReadable, 20 * 1000);
}
if(errorWhileWaiting){
throw errorWhileWaiting;
Expand All @@ -199,7 +197,7 @@ export class PlayManager extends ManagerBase {
});
// setup volume
this.setVolume(this.volume);
stream.on("end", this.onStreamFinished.bind(this));
((connection.piper as any)["_endStream"]).once("end", this.onStreamFinished.bind(this));
// wait for entering playing state
await Util.general.waitForEnteringState(() => this.server.connection.playing);
this.preparing = false;
Expand Down Expand Up @@ -344,13 +342,14 @@ export class PlayManager extends ManagerBase {
}

private async onStreamFinished(){
if(!this.server.connection) return;
this.Log("onStreamFinished called");
if(this.server.connection){
await Util.general.waitForEnteringState(() => !this.server.connection.playing, 20 * 1000).catch(() => {
this.Log("Stream has not ended in time and will force stream into destroying", "warn");
this.stop();
});
await Util.general.waitForEnteringState(() => !this.server.connection || !this.server.connection.playing, 20 * 1000)
.catch(() => {
this.Log("Stream has not ended in time and will force stream into destroying", "warn");
this.stop();
})
;
}
// ストリームが終了したら時間を確認しつつ次の曲へ移行
this.Log("Stream finished");
Expand Down
12 changes: 7 additions & 5 deletions src/Component/streams/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import type { Readable, TransformOptions } from "stream";
import { opus } from "prism-media";

import Util from "../../Util";
import { InitPassThrough } from "../../Util/general";
import { createPassThrough } from "../../Util/general";
import { transformThroughFFmpeg } from "./ffmpeg";

type PlayableStreamInfo = PartialPlayableStream & {
Expand Down Expand Up @@ -74,7 +74,7 @@ export function resolveStreamToPlayable(streamInfo:StreamInfo, effects:string[],
// Total: 3
Util.logger.log(`[StreamResolver] stream edges: raw(${streamInfo.streamType || "unknown"}) --(FFmpeg)--> Ogg/Opus (cost: 3)`);
const ffmpeg = transformThroughFFmpeg(streamInfo, bitrate, effects, seek, "ogg");
const passThrough = InitPassThrough();
const passThrough = createPassThrough();
ffmpeg
.on("error", e => destroyStream(passThrough, e))
.pipe(passThrough)
Expand All @@ -97,10 +97,12 @@ export function resolveStreamToPlayable(streamInfo:StreamInfo, effects:string[],
const decoder = new opus.Decoder({
rate: 48000,
channels: 2,
frameSize: 960
frameSize: 960,
});
const passThrough = InitPassThrough();
const passThrough = createPassThrough();
const normalizeThrough = createPassThrough();
rawStream.stream
.pipe(normalizeThrough)
.on("error", e => destroyStream(demuxer, e))
.pipe(demuxer)
.on("error", e => destroyStream(decoder, e))
Expand All @@ -123,7 +125,7 @@ export function resolveStreamToPlayable(streamInfo:StreamInfo, effects:string[],
// Total: 5
Util.logger.log(`[StreamResolver] stream edges: raw(${streamInfo.streamType || "unknown"}) --(FFmpeg) --> PCM (cost: 5)`);
const ffmpegPCM = transformThroughFFmpeg(streamInfo, bitrate, effects, seek, "pcm");
const passThrough = InitPassThrough();
const passThrough = createPassThrough();
ffmpegPCM
.on("error", e => destroyStream(passThrough, e))
.pipe(passThrough)
Expand Down
2 changes: 1 addition & 1 deletion src/Util/general.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export function StringifyObject(obj:any):string{
* 空のPassThroughを生成します
* @returns PassThrough
*/
export function InitPassThrough(opts:TransformOptions = {}):PassThrough{
export function createPassThrough(opts:TransformOptions = {}):PassThrough{
const id = Date.now();
log(`[PassThrough] initialized (id: ${id})`, "debug");
const stream = new PassThrough(Object.assign(opts, {
Expand Down

0 comments on commit 6ad2883

Please sign in to comment.