diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ec6387..edc4d6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# 1.4.0 (Node.js) + +## New features + +- (Node.js only) The `exec` method now accepts an optional `values` parameter, which allows you to pass the request body as a `Stream.Readable`. This can be useful in case of custom insert streaming with arbitrary ClickHouse data formats (which might not be explicitly supported and allowed by the client in the `insert` method yet). NB: in this case, you are expected to serialize the data in the stream in the required input format yourself. + # 1.3.0 (Common, Node.js, Web) ## New features diff --git a/packages/client-common/src/client.ts b/packages/client-common/src/client.ts index 1935488..60d3ca4 100644 --- a/packages/client-common/src/client.ts +++ b/packages/client-common/src/client.ts @@ -61,10 +61,22 @@ export type QueryResult = ? BaseResultSet : BaseResultSet -export interface ExecParams extends BaseQueryParams { - /** Statement to execute. */ +export type ExecParams = BaseQueryParams & { + /** Statement to execute (including the FORMAT clause). By default, the query will be sent in the request body; + * If {@link ExecParamsWithValues.values} are defined, the query is sent as a request parameter, + * and the values are sent in the request body instead. */ query: string } +export type ExecParamsWithValues = ExecParams & { + /** If you have a custom INSERT statement to run with `exec`, + * the data from this stream will be inserted. + * + * NB: the data in the stream is expected to be serialized accordingly to the FORMAT clause + * used in {@link ExecParams.query} in this case. + * + * @see https://clickhouse.com/docs/en/interfaces/formats */ + values: Stream +} export type CommandParams = ExecParams export type CommandResult = { query_id: string } & WithClickHouseSummary & @@ -214,10 +226,14 @@ export class ClickHouseClient { * but format clause is not applicable. The caller of this method is expected to consume the stream, * otherwise, the request will eventually be timed out. */ - async exec(params: ExecParams): Promise> { + async exec( + params: ExecParams | ExecParamsWithValues, + ): Promise> { const query = removeTrailingSemi(params.query.trim()) + const values = 'values' in params ? params.values : undefined return await this.connection.exec({ query, + values, ...this.withClientQueryParams(params), }) } diff --git a/packages/client-common/src/connection.ts b/packages/client-common/src/connection.ts index 6420e54..6d824d5 100644 --- a/packages/client-common/src/connection.ts +++ b/packages/client-common/src/connection.ts @@ -39,6 +39,10 @@ export interface ConnInsertParams extends ConnBaseQueryParams { values: string | Stream } +export interface ConnExecParams extends ConnBaseQueryParams { + values?: Stream +} + export interface ConnBaseResult extends WithResponseHeaders { query_id: string } @@ -66,6 +70,6 @@ export interface Connection { close(): Promise query(params: ConnBaseQueryParams): Promise> insert(params: ConnInsertParams): Promise - exec(params: ConnBaseQueryParams): Promise> + exec(params: ConnExecParams): Promise> command(params: ConnBaseQueryParams): Promise } diff --git a/packages/client-common/src/index.ts b/packages/client-common/src/index.ts index e069c75..233c091 100644 --- a/packages/client-common/src/index.ts +++ b/packages/client-common/src/index.ts @@ -93,6 +93,7 @@ export type { Connection, ConnectionParams, ConnInsertResult, + ConnExecParams, ConnExecResult, ConnQueryResult, ConnBaseQueryParams, diff --git a/packages/client-common/src/version.ts b/packages/client-common/src/version.ts index 5dd9b13..91f74a7 100644 --- a/packages/client-common/src/version.ts +++ b/packages/client-common/src/version.ts @@ -1 +1 @@ -export default '1.3.0' +export default '1.4.0' diff --git a/packages/client-node/__tests__/integration/node_exec.test.ts b/packages/client-node/__tests__/integration/node_exec.test.ts index 8afd27f..a343a9f 100644 --- a/packages/client-node/__tests__/integration/node_exec.test.ts +++ b/packages/client-node/__tests__/integration/node_exec.test.ts @@ -1,10 +1,12 @@ import type { ClickHouseClient } from '@clickhouse/client-common' import { createTestClient } from '@test/utils' -import type Stream from 'stream' +import { guid } from '@test/utils' +import { createSimpleTable } from '@test/fixtures/simple_table' +import Stream from 'stream' import { getAsText } from '../../src/utils' -import { ResultSet } from '../../src' +import { drainStream, ResultSet } from '../../src' -describe('[Node.js] exec result streaming', () => { +describe('[Node.js] exec', () => { let client: ClickHouseClient beforeEach(() => { client = createTestClient() @@ -66,4 +68,101 @@ describe('[Node.js] exec result streaming', () => { expect(await rs.json()).toEqual([{ number: '0' }]) }) }) + + describe('custom insert streaming with exec', () => { + let tableName: string + beforeEach(async () => { + tableName = `test_node_exec_insert_stream_${guid()}` + await createSimpleTable(client, tableName) + }) + + it('should send an insert stream', async () => { + const stream = Stream.Readable.from(['42,foobar,"[1,2]"'], { + objectMode: false, + }) + const execResult = await client.exec({ + query: `INSERT INTO ${tableName} FORMAT CSV`, + values: stream, + }) + // the result stream contains nothing useful for an insert and should be immediately drained to release the socket + await drainStream(execResult.stream) + await checkInsertedValues([ + { + id: '42', + name: 'foobar', + sku: [1, 2], + }, + ]) + }) + + it('should not fail with an empty stream', async () => { + const stream = new Stream.Readable({ + read() { + // required + }, + objectMode: false, + }) + const execPromise = client.exec({ + query: `INSERT INTO ${tableName} FORMAT CSV`, + values: stream, + }) + // close the empty stream after the request is sent + stream.push(null) + // the result stream contains nothing useful for an insert and should be immediately drained to release the socket + const execResult = await execPromise + await drainStream(execResult.stream) + await checkInsertedValues([]) + }) + + it('should not fail with an already closed stream', async () => { + const stream = new Stream.Readable({ + read() { + // required + }, + objectMode: false, + }) + stream.push('42,foobar,"[1,2]"\n') + // close the stream with some values + stream.push(null) + const execResult = await client.exec({ + query: `INSERT INTO ${tableName} FORMAT CSV`, + values: stream, + }) + // the result stream contains nothing useful for an insert and should be immediately drained to release the socket + await drainStream(execResult.stream) + await checkInsertedValues([ + { + id: '42', + name: 'foobar', + sku: [1, 2], + }, + ]) + }) + + it('should not fail with an empty and already closed stream', async () => { + const stream = new Stream.Readable({ + read() { + // required + }, + objectMode: false, + }) + // close the empty stream immediately + stream.push(null) + const execResult = await client.exec({ + query: `INSERT INTO ${tableName} FORMAT CSV`, + values: stream, + }) + // the result stream contains nothing useful for an insert and should be immediately drained to release the socket + await drainStream(execResult.stream) + await checkInsertedValues([]) + }) + + async function checkInsertedValues(expected: Array) { + const rs = await client.query({ + query: `SELECT * FROM ${tableName}`, + format: 'JSONEachRow', + }) + expect(await rs.json()).toEqual(expected) + } + }) }) diff --git a/packages/client-node/src/connection/node_base_connection.ts b/packages/client-node/src/connection/node_base_connection.ts index 8befb3d..0c5309b 100644 --- a/packages/client-node/src/connection/node_base_connection.ts +++ b/packages/client-node/src/connection/node_base_connection.ts @@ -5,6 +5,7 @@ import type { ConnCommandResult, Connection, ConnectionParams, + ConnExecParams, ConnExecResult, ConnInsertParams, ConnInsertResult, @@ -225,7 +226,7 @@ export abstract class NodeBaseConnection } async exec( - params: ConnBaseQueryParams, + params: ConnExecParams, ): Promise> { return this.runExec({ ...params, @@ -368,20 +369,23 @@ export abstract class NodeBaseConnection params: RunExecParams, ): Promise> { const query_id = this.getQueryId(params.query_id) - const searchParams = toSearchParams({ + const sendQueryInParams = params.values !== undefined + const toSearchParamsOptions = { + query: sendQueryInParams ? params.query : undefined, database: this.params.database, clickhouse_settings: params.clickhouse_settings, query_params: params.query_params, session_id: params.session_id, query_id, - }) + } + const searchParams = toSearchParams(toSearchParamsOptions) const { controller, controllerCleanup } = this.getAbortController(params) try { const { stream, summary, response_headers } = await this.request( { method: 'POST', url: transformUrl({ url: this.params.url, searchParams }), - body: params.query, + body: sendQueryInParams ? params.values : params.query, abort_signal: controller.signal, parse_summary: true, headers: this.buildRequestHeaders(params), @@ -617,4 +621,5 @@ interface SocketInfo { type RunExecParams = ConnBaseQueryParams & { op: 'Exec' | 'Command' + values?: ConnExecParams['values'] } diff --git a/packages/client-node/src/connection/stream.ts b/packages/client-node/src/connection/stream.ts index 6044926..5684157 100644 --- a/packages/client-node/src/connection/stream.ts +++ b/packages/client-node/src/connection/stream.ts @@ -1,6 +1,8 @@ import type Stream from 'stream' -/** See https://github.com/ClickHouse/clickhouse-js/pull/203 */ +/** Drains the response stream, as calling `destroy` on a {@link Stream.Readable} response stream + * will result in closing the underlying socket, and negate the KeepAlive feature benefits. + * See https://github.com/ClickHouse/clickhouse-js/pull/203 */ export async function drainStream(stream: Stream.Readable): Promise { return new Promise((resolve, reject) => { function dropData() { diff --git a/packages/client-node/src/index.ts b/packages/client-node/src/index.ts index a4c70a9..d577b6f 100644 --- a/packages/client-node/src/index.ts +++ b/packages/client-node/src/index.ts @@ -5,6 +5,7 @@ export type { export { createClient } from './client' export { type NodeClickHouseClientConfigOptions as ClickHouseClientConfigOptions } from './config' export { ResultSet, type StreamReadable } from './result_set' +export { drainStream } from './connection/stream' /** Re-export @clickhouse/client-common types */ export { diff --git a/packages/client-node/src/version.ts b/packages/client-node/src/version.ts index 5dd9b13..91f74a7 100644 --- a/packages/client-node/src/version.ts +++ b/packages/client-node/src/version.ts @@ -1 +1 @@ -export default '1.3.0' +export default '1.4.0' diff --git a/packages/client-web/src/client.ts b/packages/client-web/src/client.ts index 9bda2ae..fa3f63f 100644 --- a/packages/client-web/src/client.ts +++ b/packages/client-web/src/client.ts @@ -1,5 +1,7 @@ import type { DataFormat, + ExecParams, + ExecResult, InputJSON, InputJSONObjectEachRow, InsertParams, @@ -20,7 +22,10 @@ export type QueryResult = ? ResultSet : ResultSet -export type WebClickHouseClient = Omit & { +export type WebClickHouseClient = Omit< + WebClickHouseClientImpl, + 'insert' | 'exec' +> & { /** See {@link ClickHouseClient.insert}. * * ReadableStream is removed from possible insert values @@ -30,6 +35,10 @@ export type WebClickHouseClient = Omit & { values: ReadonlyArray | InputJSON | InputJSONObjectEachRow }, ): Promise + /** See {@link ClickHouseClient.exec}. + * + * Custom values are currently not supported in the web versions. */ + exec(params: ExecParams): Promise> } class WebClickHouseClientImpl extends ClickHouseClient { diff --git a/packages/client-web/src/version.ts b/packages/client-web/src/version.ts index 5dd9b13..91f74a7 100644 --- a/packages/client-web/src/version.ts +++ b/packages/client-web/src/version.ts @@ -1 +1 @@ -export default '1.3.0' +export default '1.4.0'