Skip to content

Commit

Permalink
fix: [http] Consume unread body and trailers before reading next requ…
Browse files Browse the repository at this point in the history
…est (denoland/deno#3990)

- Added `ServerRequest.finalize()`:  consuming all unread body stream and trailers.
  - This is cleanup method for reading next request from same keep-alive connection.
  - Needed when handler didn't consume all body and trailers even after responding.
- refactor: `ServerRequest._bodyStream()`, `ServerRequestBody` are removed.
  - Now using `bodyReader()` and `chunkedBodyReader()` instead.
- fix: Trailers should only be read `transfer-encoding` is `chunked` and `trailer` header is set and its value is valid.
- fix: use `Headers.append()` on reading trailers.
- fix: delete `trailer` field from headers after reading trailers.
- reorg: Several functions related to IO are moved into `http/io.ts`
  • Loading branch information
keroxp authored and denobot committed Feb 1, 2021
1 parent d76d46f commit 761c2c1
Show file tree
Hide file tree
Showing 6 changed files with 553 additions and 246 deletions.
213 changes: 213 additions & 0 deletions http/io.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
import { BufReader, UnexpectedEOFError, BufWriter } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";
import { assert } from "../testing/asserts.ts";
import { encoder } from "../strings/mod.ts";

export function emptyReader(): Deno.Reader {
return {
async read(_: Uint8Array): Promise<number | Deno.EOF> {
return Deno.EOF;
}
};
}

export function bodyReader(contentLength: number, r: BufReader): Deno.Reader {
let totalRead = 0;
let finished = false;
async function read(buf: Uint8Array): Promise<number | Deno.EOF> {
if (finished) return Deno.EOF;
let result: number | Deno.EOF;
const remaining = contentLength - totalRead;
if (remaining >= buf.byteLength) {
result = await r.read(buf);
} else {
const readBuf = buf.subarray(0, remaining);
result = await r.read(readBuf);
}
if (result !== Deno.EOF) {
totalRead += result;
}
finished = totalRead === contentLength;
return result;
}
return { read };
}

export function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);
let finished = false;
const chunks: Array<{
offset: number;
data: Uint8Array;
}> = [];
async function read(buf: Uint8Array): Promise<number | Deno.EOF> {
if (finished) return Deno.EOF;
const [chunk] = chunks;
if (chunk) {
const chunkRemaining = chunk.data.byteLength - chunk.offset;
const readLength = Math.min(chunkRemaining, buf.byteLength);
for (let i = 0; i < readLength; i++) {
buf[i] = chunk.data[chunk.offset + i];
}
chunk.offset += readLength;
if (chunk.offset === chunk.data.byteLength) {
chunks.shift();
// Consume \r\n;
if ((await tp.readLine()) === Deno.EOF) {
throw new UnexpectedEOFError();
}
}
return readLength;
}
const line = await tp.readLine();
if (line === Deno.EOF) throw new UnexpectedEOFError();
// TODO: handle chunk extension
const [chunkSizeString] = line.split(";");
const chunkSize = parseInt(chunkSizeString, 16);
if (Number.isNaN(chunkSize) || chunkSize < 0) {
throw new Error("Invalid chunk size");
}
if (chunkSize > 0) {
if (chunkSize > buf.byteLength) {
let eof = await r.readFull(buf);
if (eof === Deno.EOF) {
throw new UnexpectedEOFError();
}
const restChunk = new Uint8Array(chunkSize - buf.byteLength);
eof = await r.readFull(restChunk);
if (eof === Deno.EOF) {
throw new UnexpectedEOFError();
} else {
chunks.push({
offset: 0,
data: restChunk
});
}
return buf.byteLength;
} else {
const bufToFill = buf.subarray(0, chunkSize);
const eof = await r.readFull(bufToFill);
if (eof === Deno.EOF) {
throw new UnexpectedEOFError();
}
// Consume \r\n
if ((await tp.readLine()) === Deno.EOF) {
throw new UnexpectedEOFError();
}
return chunkSize;
}
} else {
assert(chunkSize === 0);
// Consume \r\n
if ((await r.readLine()) === Deno.EOF) {
throw new UnexpectedEOFError();
}
await readTrailers(h, r);
finished = true;
return Deno.EOF;
}
}
return { read };
}

const kProhibitedTrailerHeaders = [
"transfer-encoding",
"content-length",
"trailer"
];

/**
* Read trailer headers from reader and append values to headers.
* "trailer" field will be deleted.
* */
export async function readTrailers(
headers: Headers,
r: BufReader
): Promise<void> {
const keys = parseTrailer(headers.get("trailer"));
if (!keys) return;
const tp = new TextProtoReader(r);
const result = await tp.readMIMEHeader();
assert(result != Deno.EOF, "trailer must be set");
for (const [k, v] of result) {
if (!keys.has(k)) {
throw new Error("Undeclared trailer field");
}
keys.delete(k);
headers.append(k, v);
}
assert(keys.size === 0, "Missing trailers");
headers.delete("trailer");
}

function parseTrailer(field: string | null): Set<string> | undefined {
if (field == null) {
return undefined;
}
const keys = field.split(",").map(v => v.trim());
if (keys.length === 0) {
throw new Error("Empty trailer");
}
for (const invalid of kProhibitedTrailerHeaders) {
if (keys.includes(invalid)) {
throw new Error(`Prohibited field for trailer`);
}
}
return new Set(keys);
}

export async function writeChunkedBody(
w: Deno.Writer,
r: Deno.Reader
): Promise<void> {
const writer = BufWriter.create(w);
for await (const chunk of Deno.toAsyncIterator(r)) {
if (chunk.byteLength <= 0) continue;
const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`);
const end = encoder.encode("\r\n");
await writer.write(start);
await writer.write(chunk);
await writer.write(end);
}

const endChunk = encoder.encode("0\r\n\r\n");
await writer.write(endChunk);
}

/** write trailer headers to writer. it mostly should be called after writeResponse */
export async function writeTrailers(
w: Deno.Writer,
headers: Headers,
trailers: Headers
): Promise<void> {
const trailer = headers.get("trailer");
if (trailer === null) {
throw new Error('response headers must have "trailer" header field');
}
const transferEncoding = headers.get("transfer-encoding");
if (transferEncoding === null || !transferEncoding.match(/^chunked/)) {
throw new Error(
`trailer headers is only allowed for "transfer-encoding: chunked": got "${transferEncoding}"`
);
}
const writer = BufWriter.create(w);
const trailerHeaderFields = trailer
.split(",")
.map(s => s.trim().toLowerCase());
for (const f of trailerHeaderFields) {
assert(
!kProhibitedTrailerHeaders.includes(f),
`"${f}" is prohibited for trailer header`
);
}
for (const [key, value] of trailers) {
assert(
trailerHeaderFields.includes(key),
`Not trailer header field: ${key}`
);
await writer.write(encoder.encode(`${key}: ${value}\r\n`));
}
await writer.write(encoder.encode("\r\n"));
await writer.flush();
}
167 changes: 167 additions & 0 deletions http/io_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import {
AssertionError,
assertThrowsAsync,
assertEquals
} from "../testing/asserts.ts";
import { bodyReader, writeTrailers, readTrailers } from "./io.ts";
import { encode, decode } from "../strings/mod.ts";
import { BufReader } from "../io/bufio.ts";
import { chunkedBodyReader } from "./io.ts";
const { test, Buffer } = Deno;

test("bodyReader", async () => {
const text = "Hello, Deno";
const r = bodyReader(text.length, new BufReader(new Buffer(encode(text))));
assertEquals(decode(await Deno.readAll(r)), text);
});
function chunkify(n: number, char: string): string {
const v = Array.from({ length: n })
.map(() => `${char}`)
.join("");
return `${n.toString(16)}\r\n${v}\r\n`;
}
test("chunkedBodyReader", async () => {
const body = [
chunkify(3, "a"),
chunkify(5, "b"),
chunkify(11, "c"),
chunkify(22, "d"),
chunkify(0, "")
].join("");
const h = new Headers();
const r = chunkedBodyReader(h, new BufReader(new Buffer(encode(body))));
let result: number | Deno.EOF;
// Use small buffer as some chunks exceed buffer size
const buf = new Uint8Array(5);
const dest = new Buffer();
while ((result = await r.read(buf)) !== Deno.EOF) {
const len = Math.min(buf.byteLength, result);
await dest.write(buf.subarray(0, len));
}
const exp = "aaabbbbbcccccccccccdddddddddddddddddddddd";
assertEquals(dest.toString(), exp);
});

test("chunkedBodyReader with trailers", async () => {
const body = [
chunkify(3, "a"),
chunkify(5, "b"),
chunkify(11, "c"),
chunkify(22, "d"),
chunkify(0, ""),
"deno: land\r\n",
"node: js\r\n",
"\r\n"
].join("");
const h = new Headers({
trailer: "deno,node"
});
const r = chunkedBodyReader(h, new BufReader(new Buffer(encode(body))));
assertEquals(h.has("trailer"), true);
assertEquals(h.has("deno"), false);
assertEquals(h.has("node"), false);
const act = decode(await Deno.readAll(r));
const exp = "aaabbbbbcccccccccccdddddddddddddddddddddd";
assertEquals(act, exp);
assertEquals(h.has("trailer"), false);
assertEquals(h.get("deno"), "land");
assertEquals(h.get("node"), "js");
});

test("readTrailers", async () => {
const h = new Headers({
trailer: "deno,node"
});
const trailer = ["deno: land", "node: js", "", ""].join("\r\n");
await readTrailers(h, new BufReader(new Buffer(encode(trailer))));
assertEquals(h.has("trailer"), false);
assertEquals(h.get("deno"), "land");
assertEquals(h.get("node"), "js");
});

test("readTrailer should throw if undeclared headers found in trailer", async () => {
const patterns = [
["deno,node", "deno: land\r\nnode: js\r\ngo: lang\r\n\r\n"],
["deno", "node: js\r\n\r\n"],
["deno", "node:js\r\ngo: lang\r\n\r\n"]
];
for (const [header, trailer] of patterns) {
const h = new Headers({
trailer: header
});
await assertThrowsAsync(
async () => {
await readTrailers(h, new BufReader(new Buffer(encode(trailer))));
},
Error,
"Undeclared trailer field"
);
}
});

test("readTrailer should throw if trailer contains prohibited fields", async () => {
for (const f of ["content-length", "trailer", "transfer-encoding"]) {
const h = new Headers({
trailer: f
});
await assertThrowsAsync(
async () => {
await readTrailers(h, new BufReader(new Buffer()));
},
Error,
"Prohibited field for trailer"
);
}
});

test("writeTrailer", async () => {
const w = new Buffer();
await writeTrailers(
w,
new Headers({ "transfer-encoding": "chunked", trailer: "deno,node" }),
new Headers({ deno: "land", node: "js" })
);
assertEquals(w.toString(), "deno: land\r\nnode: js\r\n\r\n");
});

test("writeTrailer should throw", async () => {
const w = new Buffer();
await assertThrowsAsync(
() => {
return writeTrailers(w, new Headers(), new Headers());
},
Error,
'must have "trailer"'
);
await assertThrowsAsync(
() => {
return writeTrailers(w, new Headers({ trailer: "deno" }), new Headers());
},
Error,
"only allowed"
);
for (const f of ["content-length", "trailer", "transfer-encoding"]) {
await assertThrowsAsync(
() => {
return writeTrailers(
w,
new Headers({ "transfer-encoding": "chunked", trailer: f }),
new Headers({ [f]: "1" })
);
},
AssertionError,
"prohibited"
);
}
await assertThrowsAsync(
() => {
return writeTrailers(
w,
new Headers({ "transfer-encoding": "chunked", trailer: "deno" }),
new Headers({ node: "js" })
);
},
AssertionError,
"Not trailer"
);
});
Loading

0 comments on commit 761c2c1

Please sign in to comment.