From ded719ffb61aefa8a044cb2efe36836b78d366dc Mon Sep 17 00:00:00 2001 From: Yusuke Sakurai Date: Tue, 25 Feb 2020 12:49:39 +0900 Subject: [PATCH] fix: [http] Consume unread body and trailers before reading next request (denoland/deno#3990) - Added `ServerRequest.finalize()`: consuming all unread body stream and trailers. - This is cleanup method for reading next request from same keep-alive connection. - Needed when handler didn't consume all body and trailers even after responding. - refactor: `ServerRequest._bodyStream()`, `ServerRequestBody` are removed. - Now using `bodyReader()` and `chunkedBodyReader()` instead. - fix: Trailers should only be read `transfer-encoding` is `chunked` and `trailer` header is set and its value is valid. - fix: use `Headers.append()` on reading trailers. - fix: delete `trailer` field from headers after reading trailers. - reorg: Several functions related to IO are moved into `http/io.ts` --- http/io.ts | 213 +++++++++++++++++++++++++++++++++++++ http/io_test.ts | 167 +++++++++++++++++++++++++++++ http/racing_server.ts | 36 +++++-- http/racing_server_test.ts | 44 +++++--- http/server.ts | 200 +++++++--------------------------- http/server_test.ts | 139 +++++++++++++----------- 6 files changed, 553 insertions(+), 246 deletions(-) create mode 100644 http/io.ts create mode 100644 http/io_test.ts diff --git a/http/io.ts b/http/io.ts new file mode 100644 index 000000000000..a51fada54bf0 --- /dev/null +++ b/http/io.ts @@ -0,0 +1,213 @@ +import { BufReader, UnexpectedEOFError, BufWriter } from "../io/bufio.ts"; +import { TextProtoReader } from "../textproto/mod.ts"; +import { assert } from "../testing/asserts.ts"; +import { encoder } from "../strings/mod.ts"; + +export function emptyReader(): Deno.Reader { + return { + async read(_: Uint8Array): Promise { + return Deno.EOF; + } + }; +} + +export function bodyReader(contentLength: number, r: BufReader): Deno.Reader { + let totalRead = 0; + let finished = false; + async function read(buf: Uint8Array): Promise { + if (finished) return Deno.EOF; + let result: number | Deno.EOF; + const remaining = contentLength - totalRead; + if (remaining >= buf.byteLength) { + result = await r.read(buf); + } else { + const readBuf = buf.subarray(0, remaining); + result = await r.read(readBuf); + } + if (result !== Deno.EOF) { + totalRead += result; + } + finished = totalRead === contentLength; + return result; + } + return { read }; +} + +export function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader { + // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 + const tp = new TextProtoReader(r); + let finished = false; + const chunks: Array<{ + offset: number; + data: Uint8Array; + }> = []; + async function read(buf: Uint8Array): Promise { + if (finished) return Deno.EOF; + const [chunk] = chunks; + if (chunk) { + const chunkRemaining = chunk.data.byteLength - chunk.offset; + const readLength = Math.min(chunkRemaining, buf.byteLength); + for (let i = 0; i < readLength; i++) { + buf[i] = chunk.data[chunk.offset + i]; + } + chunk.offset += readLength; + if (chunk.offset === chunk.data.byteLength) { + chunks.shift(); + // Consume \r\n; + if ((await tp.readLine()) === Deno.EOF) { + throw new UnexpectedEOFError(); + } + } + return readLength; + } + const line = await tp.readLine(); + if (line === Deno.EOF) throw new UnexpectedEOFError(); + // TODO: handle chunk extension + const [chunkSizeString] = line.split(";"); + const chunkSize = parseInt(chunkSizeString, 16); + if (Number.isNaN(chunkSize) || chunkSize < 0) { + throw new Error("Invalid chunk size"); + } + if (chunkSize > 0) { + if (chunkSize > buf.byteLength) { + let eof = await r.readFull(buf); + if (eof === Deno.EOF) { + throw new UnexpectedEOFError(); + } + const restChunk = new Uint8Array(chunkSize - buf.byteLength); + eof = await r.readFull(restChunk); + if (eof === Deno.EOF) { + throw new UnexpectedEOFError(); + } else { + chunks.push({ + offset: 0, + data: restChunk + }); + } + return buf.byteLength; + } else { + const bufToFill = buf.subarray(0, chunkSize); + const eof = await r.readFull(bufToFill); + if (eof === Deno.EOF) { + throw new UnexpectedEOFError(); + } + // Consume \r\n + if ((await tp.readLine()) === Deno.EOF) { + throw new UnexpectedEOFError(); + } + return chunkSize; + } + } else { + assert(chunkSize === 0); + // Consume \r\n + if ((await r.readLine()) === Deno.EOF) { + throw new UnexpectedEOFError(); + } + await readTrailers(h, r); + finished = true; + return Deno.EOF; + } + } + return { read }; +} + +const kProhibitedTrailerHeaders = [ + "transfer-encoding", + "content-length", + "trailer" +]; + +/** + * Read trailer headers from reader and append values to headers. + * "trailer" field will be deleted. + * */ +export async function readTrailers( + headers: Headers, + r: BufReader +): Promise { + const keys = parseTrailer(headers.get("trailer")); + if (!keys) return; + const tp = new TextProtoReader(r); + const result = await tp.readMIMEHeader(); + assert(result != Deno.EOF, "trailer must be set"); + for (const [k, v] of result) { + if (!keys.has(k)) { + throw new Error("Undeclared trailer field"); + } + keys.delete(k); + headers.append(k, v); + } + assert(keys.size === 0, "Missing trailers"); + headers.delete("trailer"); +} + +function parseTrailer(field: string | null): Set | undefined { + if (field == null) { + return undefined; + } + const keys = field.split(",").map(v => v.trim()); + if (keys.length === 0) { + throw new Error("Empty trailer"); + } + for (const invalid of kProhibitedTrailerHeaders) { + if (keys.includes(invalid)) { + throw new Error(`Prohibited field for trailer`); + } + } + return new Set(keys); +} + +export async function writeChunkedBody( + w: Deno.Writer, + r: Deno.Reader +): Promise { + const writer = BufWriter.create(w); + for await (const chunk of Deno.toAsyncIterator(r)) { + if (chunk.byteLength <= 0) continue; + const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`); + const end = encoder.encode("\r\n"); + await writer.write(start); + await writer.write(chunk); + await writer.write(end); + } + + const endChunk = encoder.encode("0\r\n\r\n"); + await writer.write(endChunk); +} + +/** write trailer headers to writer. it mostly should be called after writeResponse */ +export async function writeTrailers( + w: Deno.Writer, + headers: Headers, + trailers: Headers +): Promise { + const trailer = headers.get("trailer"); + if (trailer === null) { + throw new Error('response headers must have "trailer" header field'); + } + const transferEncoding = headers.get("transfer-encoding"); + if (transferEncoding === null || !transferEncoding.match(/^chunked/)) { + throw new Error( + `trailer headers is only allowed for "transfer-encoding: chunked": got "${transferEncoding}"` + ); + } + const writer = BufWriter.create(w); + const trailerHeaderFields = trailer + .split(",") + .map(s => s.trim().toLowerCase()); + for (const f of trailerHeaderFields) { + assert( + !kProhibitedTrailerHeaders.includes(f), + `"${f}" is prohibited for trailer header` + ); + } + for (const [key, value] of trailers) { + assert( + trailerHeaderFields.includes(key), + `Not trailer header field: ${key}` + ); + await writer.write(encoder.encode(`${key}: ${value}\r\n`)); + } + await writer.write(encoder.encode("\r\n")); + await writer.flush(); +} diff --git a/http/io_test.ts b/http/io_test.ts new file mode 100644 index 000000000000..7e7701596365 --- /dev/null +++ b/http/io_test.ts @@ -0,0 +1,167 @@ +import { + AssertionError, + assertThrowsAsync, + assertEquals +} from "../testing/asserts.ts"; +import { bodyReader, writeTrailers, readTrailers } from "./io.ts"; +import { encode, decode } from "../strings/mod.ts"; +import { BufReader } from "../io/bufio.ts"; +import { chunkedBodyReader } from "./io.ts"; +const { test, Buffer } = Deno; + +test("bodyReader", async () => { + const text = "Hello, Deno"; + const r = bodyReader(text.length, new BufReader(new Buffer(encode(text)))); + assertEquals(decode(await Deno.readAll(r)), text); +}); +function chunkify(n: number, char: string): string { + const v = Array.from({ length: n }) + .map(() => `${char}`) + .join(""); + return `${n.toString(16)}\r\n${v}\r\n`; +} +test("chunkedBodyReader", async () => { + const body = [ + chunkify(3, "a"), + chunkify(5, "b"), + chunkify(11, "c"), + chunkify(22, "d"), + chunkify(0, "") + ].join(""); + const h = new Headers(); + const r = chunkedBodyReader(h, new BufReader(new Buffer(encode(body)))); + let result: number | Deno.EOF; + // Use small buffer as some chunks exceed buffer size + const buf = new Uint8Array(5); + const dest = new Buffer(); + while ((result = await r.read(buf)) !== Deno.EOF) { + const len = Math.min(buf.byteLength, result); + await dest.write(buf.subarray(0, len)); + } + const exp = "aaabbbbbcccccccccccdddddddddddddddddddddd"; + assertEquals(dest.toString(), exp); +}); + +test("chunkedBodyReader with trailers", async () => { + const body = [ + chunkify(3, "a"), + chunkify(5, "b"), + chunkify(11, "c"), + chunkify(22, "d"), + chunkify(0, ""), + "deno: land\r\n", + "node: js\r\n", + "\r\n" + ].join(""); + const h = new Headers({ + trailer: "deno,node" + }); + const r = chunkedBodyReader(h, new BufReader(new Buffer(encode(body)))); + assertEquals(h.has("trailer"), true); + assertEquals(h.has("deno"), false); + assertEquals(h.has("node"), false); + const act = decode(await Deno.readAll(r)); + const exp = "aaabbbbbcccccccccccdddddddddddddddddddddd"; + assertEquals(act, exp); + assertEquals(h.has("trailer"), false); + assertEquals(h.get("deno"), "land"); + assertEquals(h.get("node"), "js"); +}); + +test("readTrailers", async () => { + const h = new Headers({ + trailer: "deno,node" + }); + const trailer = ["deno: land", "node: js", "", ""].join("\r\n"); + await readTrailers(h, new BufReader(new Buffer(encode(trailer)))); + assertEquals(h.has("trailer"), false); + assertEquals(h.get("deno"), "land"); + assertEquals(h.get("node"), "js"); +}); + +test("readTrailer should throw if undeclared headers found in trailer", async () => { + const patterns = [ + ["deno,node", "deno: land\r\nnode: js\r\ngo: lang\r\n\r\n"], + ["deno", "node: js\r\n\r\n"], + ["deno", "node:js\r\ngo: lang\r\n\r\n"] + ]; + for (const [header, trailer] of patterns) { + const h = new Headers({ + trailer: header + }); + await assertThrowsAsync( + async () => { + await readTrailers(h, new BufReader(new Buffer(encode(trailer)))); + }, + Error, + "Undeclared trailer field" + ); + } +}); + +test("readTrailer should throw if trailer contains prohibited fields", async () => { + for (const f of ["content-length", "trailer", "transfer-encoding"]) { + const h = new Headers({ + trailer: f + }); + await assertThrowsAsync( + async () => { + await readTrailers(h, new BufReader(new Buffer())); + }, + Error, + "Prohibited field for trailer" + ); + } +}); + +test("writeTrailer", async () => { + const w = new Buffer(); + await writeTrailers( + w, + new Headers({ "transfer-encoding": "chunked", trailer: "deno,node" }), + new Headers({ deno: "land", node: "js" }) + ); + assertEquals(w.toString(), "deno: land\r\nnode: js\r\n\r\n"); +}); + +test("writeTrailer should throw", async () => { + const w = new Buffer(); + await assertThrowsAsync( + () => { + return writeTrailers(w, new Headers(), new Headers()); + }, + Error, + 'must have "trailer"' + ); + await assertThrowsAsync( + () => { + return writeTrailers(w, new Headers({ trailer: "deno" }), new Headers()); + }, + Error, + "only allowed" + ); + for (const f of ["content-length", "trailer", "transfer-encoding"]) { + await assertThrowsAsync( + () => { + return writeTrailers( + w, + new Headers({ "transfer-encoding": "chunked", trailer: f }), + new Headers({ [f]: "1" }) + ); + }, + AssertionError, + "prohibited" + ); + } + await assertThrowsAsync( + () => { + return writeTrailers( + w, + new Headers({ "transfer-encoding": "chunked", trailer: "deno" }), + new Headers({ node: "js" }) + ); + }, + AssertionError, + "Not trailer" + ); +}); diff --git a/http/racing_server.ts b/http/racing_server.ts index 629fef2db19e..0b0e5a8a5652 100644 --- a/http/racing_server.ts +++ b/http/racing_server.ts @@ -5,12 +5,15 @@ import { delay } from "../util/async.ts"; const addr = Deno.args[1] || "127.0.0.1:4501"; const server = serve(addr); -const body = new TextEncoder().encode("Hello 1\n"); -const body4 = new TextEncoder().encode("World 4\n"); - -async function delayedRespond(request: ServerRequest): Promise { +function body(i: number): string { + return `Step${i}\n`; +} +async function delayedRespond( + request: ServerRequest, + step: number +): Promise { await delay(3000); - await request.respond({ status: 200, body }); + await request.respond({ status: 200, body: body(step) }); } async function largeRespond(request: ServerRequest, c: string): Promise { @@ -19,6 +22,13 @@ async function largeRespond(request: ServerRequest, c: string): Promise { await request.respond({ status: 200, body: b }); } +async function ignoreToConsume( + request: ServerRequest, + step: number +): Promise { + await request.respond({ status: 200, body: body(step) }); +} + console.log("Racing server listening...\n"); let step = 1; @@ -28,7 +38,7 @@ for await (const request of server) { // Try to wait long enough. // For pipelining, this should cause all the following response // to block. - delayedRespond(request); + delayedRespond(request, step); break; case 2: // HUGE body. @@ -38,8 +48,20 @@ for await (const request of server) { // HUGE body. largeRespond(request, "b"); break; + case 4: + // Ignore to consume body (content-length) + ignoreToConsume(request, step); + break; + case 5: + // Ignore to consume body (chunked) + ignoreToConsume(request, step); + break; + case 6: + // Ignore to consume body (chunked + trailers) + ignoreToConsume(request, step); + break; default: - request.respond({ status: 200, body: body4 }); + request.respond({ status: 200, body: body(step) }); break; } step++; diff --git a/http/racing_server_test.ts b/http/racing_server_test.ts index 63993533905c..07df92baefbd 100644 --- a/http/racing_server_test.ts +++ b/http/racing_server_test.ts @@ -1,7 +1,7 @@ const { connect, run } = Deno; import { assert, assertEquals } from "../testing/asserts.ts"; -import { BufReader } from "../io/bufio.ts"; +import { BufReader, BufWriter } from "../io/bufio.ts"; import { TextProtoReader } from "../textproto/mod.ts"; let server: Deno.Process; @@ -21,20 +21,20 @@ function killServer(): void { server.stdout?.close(); } -const input = `GET / HTTP/1.1 - -GET / HTTP/1.1 - -GET / HTTP/1.1 - -GET / HTTP/1.1 - -`; +const input = [ + "GET / HTTP/1.1\r\n\r\n", + "GET / HTTP/1.1\r\n\r\n", + "GET / HTTP/1.1\r\n\r\n", + "POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\ndeno", + "POST / HTTP/1.1\r\ntransfer-encoding: chunked\r\n\r\n4\r\ndeno\r\n0\r\n\r\n", + "POST / HTTP/1.1\r\ntransfer-encoding: chunked\r\ntrailer: deno\r\n\r\n4\r\ndeno\r\n0\r\n\r\ndeno: land\r\n\r\n", + "GET / HTTP/1.1\r\n\r\n" +].join(""); const HUGE_BODY_SIZE = 1024 * 1024; const output = `HTTP/1.1 200 OK -content-length: 8 +content-length: 6 -Hello 1 +Step1 HTTP/1.1 200 OK content-length: ${HUGE_BODY_SIZE} @@ -42,9 +42,21 @@ ${"a".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK content-length: ${HUGE_BODY_SIZE} ${"b".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK -content-length: 8 +content-length: 6 + +Step4 +HTTP/1.1 200 OK +content-length: 6 + +Step5 +HTTP/1.1 200 OK +content-length: 6 + +Step6 +HTTP/1.1 200 OK +content-length: 6 -World 4 +Step7 `; Deno.test(async function serverPipelineRace(): Promise { @@ -52,7 +64,9 @@ Deno.test(async function serverPipelineRace(): Promise { const conn = await connect({ port: 4501 }); const r = new TextProtoReader(new BufReader(conn)); - await conn.write(new TextEncoder().encode(input)); + const w = new BufWriter(conn); + await w.write(new TextEncoder().encode(input)); + await w.flush(); const outLines = output.split("\n"); // length - 1 to disregard last empty line for (let i = 0; i < outLines.length - 1; i++) { diff --git a/http/server.ts b/http/server.ts index 9e9cde0168f9..e7d6bd598534 100644 --- a/http/server.ts +++ b/http/server.ts @@ -1,5 +1,5 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -const { listen, listenTLS, copy, toAsyncIterator } = Deno; +const { listen, listenTLS, copy } = Deno; type Listener = Deno.Listener; type Conn = Deno.Conn; type Reader = Deno.Reader; @@ -9,6 +9,13 @@ import { TextProtoReader } from "../textproto/mod.ts"; import { STATUS_TEXT } from "./http_status.ts"; import { assert } from "../testing/asserts.ts"; import { deferred, Deferred, MuxAsyncIterator } from "../util/async.ts"; +import { + bodyReader, + chunkedBodyReader, + writeChunkedBody, + writeTrailers, + emptyReader +} from "./io.ts"; const encoder = new TextEncoder(); @@ -30,64 +37,6 @@ export function setContentLength(r: Response): void { } } -async function writeChunkedBody(w: Writer, r: Reader): Promise { - const writer = BufWriter.create(w); - - for await (const chunk of toAsyncIterator(r)) { - if (chunk.byteLength <= 0) continue; - const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`); - const end = encoder.encode("\r\n"); - await writer.write(start); - await writer.write(chunk); - await writer.write(end); - } - - const endChunk = encoder.encode("0\r\n\r\n"); - await writer.write(endChunk); -} -const kProhibitedTrailerHeaders = [ - "transfer-encoding", - "content-length", - "trailer" -]; - -/** write trailer headers to writer. it mostly should be called after writeResponse */ -export async function writeTrailers( - w: Writer, - headers: Headers, - trailers: Headers -): Promise { - const trailer = headers.get("trailer"); - if (trailer === null) { - throw new Error('response headers must have "trailer" header field'); - } - const transferEncoding = headers.get("transfer-encoding"); - if (transferEncoding === null || !transferEncoding.match(/^chunked/)) { - throw new Error( - `trailer headers is only allowed for "transfer-encoding: chunked": got "${transferEncoding}"` - ); - } - const writer = BufWriter.create(w); - const trailerHeaderFields = trailer - .split(",") - .map(s => s.trim().toLowerCase()); - for (const f of trailerHeaderFields) { - assert( - !kProhibitedTrailerHeaders.includes(f), - `"${f}" is prohibited for trailer header` - ); - } - for (const [key, value] of trailers) { - assert( - trailerHeaderFields.includes(key), - `Not trailer header field: ${key}` - ); - await writer.write(encoder.encode(`${key}: ${value}\r\n`)); - } - await writer.write(encoder.encode("\r\n")); - await writer.flush(); -} - export async function writeResponse(w: Writer, r: Response): Promise { const protoMajor = 1; const protoMinor = 1; @@ -138,17 +87,6 @@ export async function writeResponse(w: Writer, r: Response): Promise { await writer.flush(); } -export class ServerRequestBody implements Reader { - constructor(private it: AsyncIterator) {} - async read(p: Uint8Array): Promise { - const res = await this.it.next(p); - if (res.done) { - return Deno.EOF; - } - return res.value; - } -} - export class ServerRequest { url!: string; method!: string; @@ -184,7 +122,7 @@ export class ServerRequest { return this._contentLength; } - private _body: ServerRequestBody | null = null; + private _body: Deno.Reader | null = null; /** * Body of the request. @@ -200,100 +138,28 @@ export class ServerRequest { * bufSlice = bufSlice.subarray(nread); * } */ - get body(): ServerRequestBody { + get body(): Deno.Reader { if (!this._body) { - const stream = this._bodyStream(); - stream.next(); // drop dummy such that first read is not empty. - this._body = new ServerRequestBody(stream); - } - return this._body; - } - - /** - * Internal: actually reading body. Each step, buf to use is passed - * in through yield result. - * Returns on no more data to read or error. - */ - private async *_bodyStream(): AsyncIterator { - let buf = yield 0; // dummy yield to retrieve user provided buf. - if (this.headers.has("content-length")) { - const len = this.contentLength; - if (len === null) { - return; - } - let rr = await this.r.read(buf); - let nread = rr === Deno.EOF ? 0 : rr; - let nreadTotal = nread; - while (rr !== Deno.EOF && nreadTotal < len) { - buf = yield nread; - rr = await this.r.read(buf); - nread = rr === Deno.EOF ? 0 : rr; - nreadTotal += nread; - } - yield nread; - } else { - const transferEncoding = this.headers.get("transfer-encoding"); - if (transferEncoding) { - const parts = transferEncoding - .split(",") - .map((e): string => e.trim().toLowerCase()); - if (parts.includes("chunked")) { - // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 - const tp = new TextProtoReader(this.r); - let line = await tp.readLine(); - if (line === Deno.EOF) throw new UnexpectedEOFError(); - // TODO: handle chunk extension - const [chunkSizeString] = line.split(";"); - let chunkSize = parseInt(chunkSizeString, 16); - if (Number.isNaN(chunkSize) || chunkSize < 0) { - throw new Error("Invalid chunk size"); - } - while (chunkSize > 0) { - let currChunkOffset = 0; - // Since given readBuffer might be smaller, loop. - while (currChunkOffset < chunkSize) { - // Try to be as large as chunkSize. Might be smaller though. - const bufferToFill = buf.subarray(0, chunkSize); - if ((await this.r.readFull(bufferToFill)) === Deno.EOF) { - throw new UnexpectedEOFError(); - } - currChunkOffset += bufferToFill.length; - buf = yield bufferToFill.length; - } - await this.r.readLine(); // Consume \r\n - line = await tp.readLine(); - if (line === Deno.EOF) throw new UnexpectedEOFError(); - chunkSize = parseInt(line, 16); - } - const entityHeaders = await tp.readMIMEHeader(); - if (entityHeaders !== Deno.EOF) { - for (const [k, v] of entityHeaders) { - this.headers.set(k, v); - } - } - /* Pseudo code from https://tools.ietf.org/html/rfc2616#section-19.4.6 - length := 0 - read chunk-size, chunk-extension (if any) and CRLF - while (chunk-size > 0) { - read chunk-data and CRLF - append chunk-data to entity-body - length := length + chunk-size - read chunk-size and CRLF - } - read entity-header - while (entity-header not empty) { - append entity-header to existing header fields - read entity-header - } - Content-Length := length - Remove "chunked" from Transfer-Encoding - */ - return; // Must return here to avoid fall through + if (this.contentLength != null) { + this._body = bodyReader(this.contentLength, this.r); + } else { + const transferEncoding = this.headers.get("transfer-encoding"); + if (transferEncoding != null) { + const parts = transferEncoding + .split(",") + .map((e): string => e.trim().toLowerCase()); + assert( + parts.includes("chunked"), + 'transfer-encoding must include "chunked" if content-length is not set' + ); + this._body = chunkedBodyReader(this.headers, this.r); + } else { + // Neither content-length nor transfer-encoding: chunked + this._body = emptyReader(); } - // TODO: handle other transfer-encoding types } - // Otherwise... Do nothing } + return this._body; } async respond(r: Response): Promise { @@ -316,6 +182,16 @@ export class ServerRequest { throw err; } } + + private finalized = false; + async finalize(): Promise { + if (this.finalized) return; + // Consume unread body + const body = this.body; + const buf = new Uint8Array(1024); + while ((await body.read(buf)) !== Deno.EOF) {} + this.finalized = true; + } } function fixLength(req: ServerRequest): void { @@ -462,6 +338,8 @@ export class Server implements AsyncIterable { // req.done implies this connection already closed, so we can just return. return; } + // Consume unread body and trailers if receiver didn't consume those data + await req.finalize(); } if (req === Deno.EOF) { diff --git a/http/server_test.ts b/http/server_test.ts index b145b8353c0f..70ce5f2f176b 100644 --- a/http/server_test.ts +++ b/http/server_test.ts @@ -11,8 +11,6 @@ import { assert, assertEquals, assertNotEquals, - assertThrowsAsync, - AssertionError, assertNotEOF } from "../testing/asserts.ts"; import { @@ -21,8 +19,7 @@ import { writeResponse, serve, readRequest, - parseHTTPVersion, - writeTrailers + parseHTTPVersion } from "./server.ts"; import { BufReader, @@ -32,6 +29,7 @@ import { } from "../io/bufio.ts"; import { delay, deferred } from "../util/async.ts"; import { StringReader } from "../io/readers.ts"; +import { encode } from "../strings/mod.ts"; interface ResponseTest { response: Response; @@ -43,7 +41,7 @@ const dec = new TextDecoder(); type Handler = () => void; -const mockConn = { +const mockConn = (): Deno.Conn => ({ localAddr: { transport: "tcp", hostname: "", @@ -64,7 +62,7 @@ const mockConn = { return -1; }, close: (): void => {} -}; +}); const responseTests: ResponseTest[] = [ // Default response @@ -100,7 +98,7 @@ test(async function responseWrite(): Promise { const request = new ServerRequest(); request.w = bufw; - request.conn = mockConn as Deno.Conn; + request.conn = mockConn(); await request.respond(testCase.response); assertEquals(buf.toString(), testCase.raw); @@ -142,6 +140,25 @@ test(async function requestContentLength(): Promise { } }); +interface TotalReader extends Deno.Reader { + total: number; +} +function totalReader(r: Deno.Reader): TotalReader { + let _total = 0; + async function read(p: Uint8Array): Promise { + const result = await r.read(p); + if (typeof result === "number") { + _total += result; + } + return result; + } + return { + read, + get total(): number { + return _total; + } + }; +} test(async function requestBodyWithContentLength(): Promise { { const req = new ServerRequest(); @@ -164,8 +181,53 @@ test(async function requestBodyWithContentLength(): Promise { const body = dec.decode(await Deno.readAll(req.body)); assertEquals(body, longText); } + // Handler ignored to consume body +}); +test("ServerRequest.finalize() should consume unread body / content-length", async () => { + const text = "deno.land"; + const req = new ServerRequest(); + req.headers = new Headers(); + req.headers.set("content-length", "" + text.length); + const tr = totalReader(new Buffer(encode(text))); + req.r = new BufReader(tr); + req.w = new BufWriter(new Buffer()); + await req.respond({ status: 200, body: "ok" }); + assertEquals(tr.total, 0); + await req.finalize(); + assertEquals(tr.total, text.length); +}); +test("ServerRequest.finalize() should consume unread body / chunked, trailers", async () => { + const text = [ + "5", + "Hello", + "4", + "Deno", + "0", + "", + "deno: land", + "node: js", + "", + "" + ].join("\r\n"); + const req = new ServerRequest(); + req.headers = new Headers(); + req.headers.set("transfer-encoding", "chunked"); + req.headers.set("trailer", "deno,node"); + const body = encode(text); + const tr = totalReader(new Buffer(body)); + req.r = new BufReader(tr); + req.w = new BufWriter(new Buffer()); + await req.respond({ status: 200, body: "ok" }); + assertEquals(tr.total, 0); + assertEquals(req.headers.has("trailer"), true); + assertEquals(req.headers.has("deno"), false); + assertEquals(req.headers.has("node"), false); + await req.finalize(); + assertEquals(tr.total, body.byteLength); + assertEquals(req.headers.has("trailer"), false); + assertEquals(req.headers.get("deno"), "land"); + assertEquals(req.headers.get("node"), "js"); }); - test(async function requestBodyWithTransferEncoding(): Promise { { const shortText = "Hello"; @@ -465,7 +527,7 @@ malformedHeader const reader = new BufReader(new StringReader(input)); let err; try { - await readRequest(mockConn as Deno.Conn, reader); + await readRequest(mockConn(), reader); } catch (e) { err = e; } @@ -543,7 +605,7 @@ test(async function testReadRequestError(): Promise { let err; let req: ServerRequest | Deno.EOF | undefined; try { - req = await readRequest(mockConn as Deno.Conn, reader); + req = await readRequest(mockConn(), reader); } catch (e) { err = e; } @@ -655,7 +717,10 @@ test({ try { const r = new TextProtoReader(new BufReader(p.stdout!)); const s = await r.readLine(); - assert(s !== Deno.EOF && s.includes("server listening")); + assert( + s !== Deno.EOF && s.includes("server listening"), + "server must be started" + ); let serverIsRunning = true; p.status() @@ -765,55 +830,3 @@ if (Deno.build.os !== "win") { } }); } - -test("writeTrailer", async () => { - const w = new Buffer(); - await writeTrailers( - w, - new Headers({ "transfer-encoding": "chunked", trailer: "deno,node" }), - new Headers({ deno: "land", node: "js" }) - ); - assertEquals(w.toString(), "deno: land\r\nnode: js\r\n\r\n"); -}); - -test("writeTrailer should throw", async () => { - const w = new Buffer(); - await assertThrowsAsync( - () => { - return writeTrailers(w, new Headers(), new Headers()); - }, - Error, - 'must have "trailer"' - ); - await assertThrowsAsync( - () => { - return writeTrailers(w, new Headers({ trailer: "deno" }), new Headers()); - }, - Error, - "only allowed" - ); - for (const f of ["content-length", "trailer", "transfer-encoding"]) { - await assertThrowsAsync( - () => { - return writeTrailers( - w, - new Headers({ "transfer-encoding": "chunked", trailer: f }), - new Headers({ [f]: "1" }) - ); - }, - AssertionError, - "prohibited" - ); - } - await assertThrowsAsync( - () => { - return writeTrailers( - w, - new Headers({ "transfer-encoding": "chunked", trailer: "deno" }), - new Headers({ node: "js" }) - ); - }, - AssertionError, - "Not trailer" - ); -});