From 1c9b28b4f36d718cecb8a9a6811207c6723dceef Mon Sep 17 00:00:00 2001 From: Serge Klochkov <3175289+slvrtrn@users.noreply.github.com> Date: Sat, 5 Oct 2024 16:21:22 +0200 Subject: [PATCH] Fix web impl streaming line breaks with large rows (#333) --- CHANGELOG.md | 6 ++ .../client-common/__tests__/utils/datasets.ts | 37 +++++++++ .../integration/node_streaming_e2e.test.ts | 42 ++-------- .../integration/web_select_streaming.test.ts | 79 ++++++++++++++++++- packages/client-web/src/result_set.ts | 67 +++++++++++----- 5 files changed, 174 insertions(+), 57 deletions(-) create mode 100644 packages/client-common/__tests__/utils/datasets.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b9024f9..0dd46f8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # 1.7.0 (Common, Node.js, Web) +## Bug fixes + +- (Web only) Fixed an issue where streaming large datasets could provide corrupted results. See [#333](https://github.com/ClickHouse/clickhouse-js/pull/333) (PR) for more details. + +## New features + - (Experimental) Exposed the `parseColumnType` function that takes a string representation of a ClickHouse type (e.g., `FixedString(16)`, `Nullable(Int32)`, etc.) and returns an AST-like object that represents the type. For example: ```ts diff --git a/packages/client-common/__tests__/utils/datasets.ts b/packages/client-common/__tests__/utils/datasets.ts new file mode 100644 index 00000000..1966ccb4 --- /dev/null +++ b/packages/client-common/__tests__/utils/datasets.ts @@ -0,0 +1,37 @@ +import type { ClickHouseClient } from '@clickhouse/client-common' +import { fakerRU } from '@faker-js/faker' +import { createTableWithFields } from '@test/fixtures/table_with_fields' + +export async function genLargeStringsDataset( + client: ClickHouseClient, + { + rows, + words, + }: { + rows: number + words: number + }, +): Promise<{ + table: string + values: { id: number; sentence: string; timestamp: string }[] +}> { + const table = await createTableWithFields( + client as ClickHouseClient, + `sentence String, timestamp String`, + ) + const values = [...new Array(rows)].map((_, id) => ({ + id, + // it seems that it is easier to trigger an incorrect behavior with non-ASCII symbols + sentence: fakerRU.lorem.sentence(words), + timestamp: new Date().toISOString(), + })) + await client.insert({ + table, + values, + format: 'JSONEachRow', + }) + return { + table, + values, + } +} diff --git a/packages/client-node/__tests__/integration/node_streaming_e2e.test.ts b/packages/client-node/__tests__/integration/node_streaming_e2e.test.ts index 38539f6f..a9f7184f 100644 --- a/packages/client-node/__tests__/integration/node_streaming_e2e.test.ts +++ b/packages/client-node/__tests__/integration/node_streaming_e2e.test.ts @@ -3,10 +3,9 @@ import { type ClickHouseClient, type ClickHouseSettings, } from '@clickhouse/client-common' -import { fakerRU } from '@faker-js/faker' import { createSimpleTable } from '@test/fixtures/simple_table' -import { createTableWithFields } from '@test/fixtures/table_with_fields' import { createTestClient, guid } from '@test/utils' +import { genLargeStringsDataset } from '@test/utils/datasets' import { tableFromIPC } from 'apache-arrow' import { Buffer } from 'buffer' import Fs from 'fs' @@ -152,40 +151,9 @@ describe('[Node.js] streaming e2e', () => { // Here we generate a large enough dataset to break into multiple chunks while streaming, // effectively testing the implementation of incomplete rows handling describe('should correctly process multiple chunks', () => { - async function generateData({ - rows, - words, - }: { - rows: number - words: number - }): Promise<{ - table: string - values: { id: number; sentence: string; timestamp: string }[] - }> { - const table = await createTableWithFields( - client as ClickHouseClient, - `sentence String, timestamp String`, - ) - const values = [...new Array(rows)].map((_, id) => ({ - id, - // it seems that it is easier to trigger an incorrect behavior with non-ASCII symbols - sentence: fakerRU.lorem.sentence(words), - timestamp: new Date().toISOString(), - })) - await client.insert({ - table, - values, - format: 'JSONEachRow', - }) - return { - table, - values, - } - } - describe('large amount of rows', () => { it('should work with .json()', async () => { - const { table, values } = await generateData({ + const { table, values } = await genLargeStringsDataset(client, { rows: 10000, words: 10, }) @@ -199,7 +167,7 @@ describe('[Node.js] streaming e2e', () => { }) it('should work with .stream()', async () => { - const { table, values } = await generateData({ + const { table, values } = await genLargeStringsDataset(client, { rows: 10000, words: 10, }) @@ -222,7 +190,7 @@ describe('[Node.js] streaming e2e', () => { describe("rows that don't fit into a single chunk", () => { it('should work with .json()', async () => { - const { table, values } = await generateData({ + const { table, values } = await genLargeStringsDataset(client, { rows: 5, words: 10000, }) @@ -236,7 +204,7 @@ describe('[Node.js] streaming e2e', () => { }) it('should work with .stream()', async () => { - const { table, values } = await generateData({ + const { table, values } = await genLargeStringsDataset(client, { rows: 5, words: 10000, }) diff --git a/packages/client-web/__tests__/integration/web_select_streaming.test.ts b/packages/client-web/__tests__/integration/web_select_streaming.test.ts index b8184096..66cf2934 100644 --- a/packages/client-web/__tests__/integration/web_select_streaming.test.ts +++ b/packages/client-web/__tests__/integration/web_select_streaming.test.ts @@ -1,5 +1,6 @@ import type { ClickHouseClient, Row } from '@clickhouse/client-common' import { createTestClient } from '@test/utils' +import { genLargeStringsDataset } from '@test/utils/datasets' describe('[Web] SELECT streaming', () => { let client: ClickHouseClient> @@ -7,7 +8,14 @@ describe('[Web] SELECT streaming', () => { await client.close() }) beforeEach(async () => { - client = createTestClient() + client = createTestClient({ + // It is required to disable keep-alive to allow for larger inserts + // https://fetch.spec.whatwg.org/#http-network-or-cache-fetch + // If contentLength is non-null and httpRequest’s keepalive is true, then: + // <...> + // If the sum of contentLength and inflightKeepaliveBytes is greater than 64 kibibytes, then return a network error. + keep_alive: { enabled: false }, + }) }) describe('consume the response only once', () => { @@ -199,6 +207,75 @@ describe('[Web] SELECT streaming', () => { ]) }) }) + + // See https://github.com/ClickHouse/clickhouse-js/issues/171 for more details + // Here we generate a large enough dataset to break into multiple chunks while streaming, + // effectively testing the implementation of incomplete rows handling + describe('should correctly process multiple chunks', () => { + describe('large amount of rows', () => { + it('should work with .json()', async () => { + const { table, values } = await genLargeStringsDataset(client, { + rows: 10000, + words: 10, + }) + const result = await client + .query({ + query: `SELECT * FROM ${table} ORDER BY id ASC`, + format: 'JSONEachRow', + }) + .then((r) => r.json()) + expect(result).toEqual(values) + }) + + it('should work with .stream()', async () => { + const { table, values } = await genLargeStringsDataset(client, { + rows: 10000, + words: 10, + }) + const stream = await client + .query({ + query: `SELECT * FROM ${table} ORDER BY id ASC`, + format: 'JSONEachRow', + }) + .then((r) => r.stream()) + + const result = await rowsJsonValues(stream) + expect(result).toEqual(values) + }) + }) + + describe("rows that don't fit into a single chunk", () => { + it('should work with .json()', async () => { + const { table, values } = await genLargeStringsDataset(client, { + rows: 5, + words: 10000, + }) + const result = await client + .query({ + query: `SELECT * FROM ${table} ORDER BY id ASC`, + format: 'JSONEachRow', + }) + .then((r) => r.json()) + expect(result).toEqual(values) + }) + + it('should work with .stream()', async () => { + const { table, values } = await genLargeStringsDataset(client, { + rows: 5, + words: 10000, + }) + const stream = await client + .query({ + query: `SELECT * FROM ${table} ORDER BY id ASC`, + format: 'JSONEachRow', + }) + .then((r) => r.stream()) + + const result = await rowsJsonValues(stream) + expect(result).toEqual(values) + }) + }) + }) }) async function rowsJsonValues( diff --git a/packages/client-web/src/result_set.ts b/packages/client-web/src/result_set.ts index e1ccf894..fc744812 100644 --- a/packages/client-web/src/result_set.ts +++ b/packages/client-web/src/result_set.ts @@ -9,10 +9,12 @@ import type { import { isNotStreamableJSONFamily, isStreamableJSONFamily, + validateStreamFormat, } from '@clickhouse/client-common' -import { validateStreamFormat } from '@clickhouse/client-common' import { getAsText } from './utils' +const NEWLINE = 0x0a as const + export class ResultSet implements BaseResultSet, Format> { @@ -67,40 +69,67 @@ export class ResultSet this.markAsConsumed() validateStreamFormat(this.format) - let decodedChunk = '' + let incompleteChunks: Uint8Array[] = [] + let totalIncompleteLength = 0 const decoder = new TextDecoder('utf-8') const transform = new TransformStream({ start() { // }, - transform: (chunk, controller) => { + transform: (chunk: Uint8Array, controller) => { if (chunk === null) { controller.terminate() } - decodedChunk += decoder.decode(chunk) const rows: Row[] = [] - // eslint-disable-next-line no-constant-condition - while (true) { - const idx = decodedChunk.indexOf('\n') - if (idx !== -1) { - const text = decodedChunk.slice(0, idx) - decodedChunk = decodedChunk.slice(idx + 1) + let idx: number + let lastIdx = 0 + do { + // an unescaped newline character denotes the end of a row + idx = chunk.indexOf(NEWLINE, lastIdx) + // there is no complete row in the rest of the current chunk + if (idx === -1) { + // to be processed during the next transform iteration + const incompleteChunk = chunk.slice(lastIdx) + incompleteChunks.push(incompleteChunk) + totalIncompleteLength += incompleteChunk.length + // send the extracted rows to the consumer, if any + if (rows.length > 0) { + controller.enqueue(rows) + } + } else { + let text: string + if (incompleteChunks.length > 0) { + const completeRowBytes = new Uint8Array( + totalIncompleteLength + idx, + ) + + // using the incomplete chunks from the previous iterations + let offset = 0 + incompleteChunks.forEach((incompleteChunk) => { + completeRowBytes.set(incompleteChunk, offset) + offset += incompleteChunk.length + }) + // finalize the row with the current chunk slice that ends with a newline + const finalChunk = chunk.slice(0, idx) + completeRowBytes.set(finalChunk, offset) + + // reset the incomplete chunks + incompleteChunks = [] + totalIncompleteLength = 0 + + text = decoder.decode(completeRowBytes) + } else { + text = decoder.decode(chunk.slice(lastIdx, idx)) + } rows.push({ text, json(): T { return JSON.parse(text) }, }) - } else { - if (rows.length) { - controller.enqueue(rows) - } - break + lastIdx = idx + 1 // skipping newline character } - } - }, - flush() { - decodedChunk = '' + } while (idx !== -1) }, })