Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An optional values parameter in the exec method #290

Merged
merged 4 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 1.4.0 (Node.js)

## New features

- (Node.js only) The `exec` method now accepts an optional `values` parameter, which allows you to pass the request body as a `Stream.Readable`. This can be useful in case of custom insert streaming with arbitrary ClickHouse data formats (which might not be explicitly supported and allowed by the client in the `insert` method yet). NB: in this case, you are expected to serialize the data in the stream in the required input format yourself.

# 1.3.0 (Common, Node.js, Web)

## New features
Expand Down
22 changes: 19 additions & 3 deletions packages/client-common/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,22 @@ export type QueryResult<Stream, Format extends DataFormat> =
? BaseResultSet<Stream, unknown>
: BaseResultSet<Stream, Format>

export interface ExecParams extends BaseQueryParams {
/** Statement to execute. */
export type ExecParams = BaseQueryParams & {
/** Statement to execute (including the FORMAT clause). By default, the query will be sent in the request body;
* If {@link ExecParamsWithValues.values} are defined, the query is sent as a request parameter,
* and the values are sent in the request body instead. */
query: string
}
export type ExecParamsWithValues<Stream> = ExecParams & {
/** If you have a custom INSERT statement to run with `exec`,
* the data from this stream will be inserted.
*
* NB: the data in the stream is expected to be serialized accordingly to the FORMAT clause
* used in {@link ExecParams.query} in this case.
*
* @see https://clickhouse.com/docs/en/interfaces/formats */
values: Stream
}

export type CommandParams = ExecParams
export type CommandResult = { query_id: string } & WithClickHouseSummary &
Expand Down Expand Up @@ -214,10 +226,14 @@ export class ClickHouseClient<Stream = unknown> {
* but format clause is not applicable. The caller of this method is expected to consume the stream,
* otherwise, the request will eventually be timed out.
*/
async exec(params: ExecParams): Promise<ExecResult<Stream>> {
async exec(
params: ExecParams | ExecParamsWithValues<Stream>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I avoided parametrizing ExecParams in this case, as it's a bit tricky to maintain full backward compatibility.

): Promise<ExecResult<Stream>> {
const query = removeTrailingSemi(params.query.trim())
const values = 'values' in params ? params.values : undefined
return await this.connection.exec({
query,
values,
...this.withClientQueryParams(params),
})
}
Expand Down
6 changes: 5 additions & 1 deletion packages/client-common/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ export interface ConnInsertParams<Stream> extends ConnBaseQueryParams {
values: string | Stream
}

export interface ConnExecParams<Stream> extends ConnBaseQueryParams {
values?: Stream
}

export interface ConnBaseResult extends WithResponseHeaders {
query_id: string
}
Expand Down Expand Up @@ -66,6 +70,6 @@ export interface Connection<Stream> {
close(): Promise<void>
query(params: ConnBaseQueryParams): Promise<ConnQueryResult<Stream>>
insert(params: ConnInsertParams<Stream>): Promise<ConnInsertResult>
exec(params: ConnBaseQueryParams): Promise<ConnExecResult<Stream>>
exec(params: ConnExecParams<Stream>): Promise<ConnExecResult<Stream>>
command(params: ConnBaseQueryParams): Promise<ConnCommandResult>
}
1 change: 1 addition & 0 deletions packages/client-common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ export type {
Connection,
ConnectionParams,
ConnInsertResult,
ConnExecParams,
ConnExecResult,
ConnQueryResult,
ConnBaseQueryParams,
Expand Down
2 changes: 1 addition & 1 deletion packages/client-common/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '1.3.0'
export default '1.4.0'
105 changes: 102 additions & 3 deletions packages/client-node/__tests__/integration/node_exec.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import type { ClickHouseClient } from '@clickhouse/client-common'
import { createTestClient } from '@test/utils'
import type Stream from 'stream'
import { guid } from '@test/utils'
import { createSimpleTable } from '@test/fixtures/simple_table'
import Stream from 'stream'
import { getAsText } from '../../src/utils'
import { ResultSet } from '../../src'
import { drainStream, ResultSet } from '../../src'

describe('[Node.js] exec result streaming', () => {
describe('[Node.js] exec', () => {
let client: ClickHouseClient<Stream.Readable>
beforeEach(() => {
client = createTestClient()
Expand Down Expand Up @@ -66,4 +68,101 @@ describe('[Node.js] exec result streaming', () => {
expect(await rs.json()).toEqual([{ number: '0' }])
})
})

describe('custom insert streaming with exec', () => {
slvrtrn marked this conversation as resolved.
Show resolved Hide resolved
let tableName: string
beforeEach(async () => {
tableName = `test_node_exec_insert_stream_${guid()}`
await createSimpleTable(client, tableName)
})

it('should send an insert stream', async () => {
const stream = Stream.Readable.from(['42,foobar,"[1,2]"'], {
objectMode: false,
})
const execResult = await client.exec({
query: `INSERT INTO ${tableName} FORMAT CSV`,
values: stream,
})
// the result stream contains nothing useful for an insert and should be immediately drained to release the socket
await drainStream(execResult.stream)
await checkInsertedValues([
{
id: '42',
name: 'foobar',
sku: [1, 2],
},
])
})

it('should not fail with an empty stream', async () => {
const stream = new Stream.Readable({
read() {
// required
},
objectMode: false,
})
const execPromise = client.exec({
query: `INSERT INTO ${tableName} FORMAT CSV`,
values: stream,
})
// close the empty stream after the request is sent
stream.push(null)
// the result stream contains nothing useful for an insert and should be immediately drained to release the socket
const execResult = await execPromise
await drainStream(execResult.stream)
await checkInsertedValues([])
})

it('should not fail with an already closed stream', async () => {
const stream = new Stream.Readable({
read() {
// required
},
objectMode: false,
})
stream.push('42,foobar,"[1,2]"\n')
// close the stream with some values
stream.push(null)
const execResult = await client.exec({
query: `INSERT INTO ${tableName} FORMAT CSV`,
values: stream,
})
// the result stream contains nothing useful for an insert and should be immediately drained to release the socket
await drainStream(execResult.stream)
await checkInsertedValues([
{
id: '42',
name: 'foobar',
sku: [1, 2],
},
])
})

it('should not fail with an empty and already closed stream', async () => {
const stream = new Stream.Readable({
read() {
// required
},
objectMode: false,
})
// close the empty stream immediately
stream.push(null)
const execResult = await client.exec({
query: `INSERT INTO ${tableName} FORMAT CSV`,
values: stream,
})
// the result stream contains nothing useful for an insert and should be immediately drained to release the socket
await drainStream(execResult.stream)
await checkInsertedValues([])
})

async function checkInsertedValues<T = unknown>(expected: Array<T>) {
const rs = await client.query({
query: `SELECT * FROM ${tableName}`,
format: 'JSONEachRow',
})
expect(await rs.json()).toEqual(expected)
}
})
})
13 changes: 9 additions & 4 deletions packages/client-node/src/connection/node_base_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
ConnCommandResult,
Connection,
ConnectionParams,
ConnExecParams,
ConnExecResult,
ConnInsertParams,
ConnInsertResult,
Expand Down Expand Up @@ -225,7 +226,7 @@ export abstract class NodeBaseConnection
}

async exec(
params: ConnBaseQueryParams,
params: ConnExecParams<Stream.Readable>,
): Promise<ConnExecResult<Stream.Readable>> {
return this.runExec({
...params,
Expand Down Expand Up @@ -368,20 +369,23 @@ export abstract class NodeBaseConnection
params: RunExecParams,
): Promise<ConnExecResult<Stream.Readable>> {
const query_id = this.getQueryId(params.query_id)
const searchParams = toSearchParams({
const sendQueryInParams = params.values !== undefined
const toSearchParamsOptions = {
query: sendQueryInParams ? params.query : undefined,
database: this.params.database,
clickhouse_settings: params.clickhouse_settings,
query_params: params.query_params,
session_id: params.session_id,
query_id,
})
}
const searchParams = toSearchParams(toSearchParamsOptions)
const { controller, controllerCleanup } = this.getAbortController(params)
try {
const { stream, summary, response_headers } = await this.request(
{
method: 'POST',
url: transformUrl({ url: this.params.url, searchParams }),
body: params.query,
body: sendQueryInParams ? params.values : params.query,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd prefer to keep a single logic path to send a request, but let's postpone until the next major release when we can always send query in URL and values as body (even if it's empty)

abort_signal: controller.signal,
parse_summary: true,
headers: this.buildRequestHeaders(params),
Expand Down Expand Up @@ -617,4 +621,5 @@ interface SocketInfo {

type RunExecParams = ConnBaseQueryParams & {
op: 'Exec' | 'Command'
values?: ConnExecParams<Stream.Readable>['values']
}
4 changes: 3 additions & 1 deletion packages/client-node/src/connection/stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type Stream from 'stream'

/** See https://github.com/ClickHouse/clickhouse-js/pull/203 */
/** Drains the response stream, as calling `destroy` on a {@link Stream.Readable} response stream
* will result in closing the underlying socket, and negate the KeepAlive feature benefits.
* See https://github.com/ClickHouse/clickhouse-js/pull/203 */
export async function drainStream(stream: Stream.Readable): Promise<void> {
return new Promise((resolve, reject) => {
function dropData() {
Expand Down
1 change: 1 addition & 0 deletions packages/client-node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type {
export { createClient } from './client'
export { type NodeClickHouseClientConfigOptions as ClickHouseClientConfigOptions } from './config'
export { ResultSet, type StreamReadable } from './result_set'
export { drainStream } from './connection/stream'

/** Re-export @clickhouse/client-common types */
export {
Expand Down
2 changes: 1 addition & 1 deletion packages/client-node/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '1.3.0'
export default '1.4.0'
11 changes: 10 additions & 1 deletion packages/client-web/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type {
DataFormat,
ExecParams,
ExecResult,
InputJSON,
InputJSONObjectEachRow,
InsertParams,
Expand All @@ -20,7 +22,10 @@ export type QueryResult<Format extends DataFormat> =
? ResultSet<unknown>
: ResultSet<Format>

export type WebClickHouseClient = Omit<WebClickHouseClientImpl, 'insert'> & {
export type WebClickHouseClient = Omit<
WebClickHouseClientImpl,
'insert' | 'exec'
> & {
/** See {@link ClickHouseClient.insert}.
*
* ReadableStream is removed from possible insert values
Expand All @@ -30,6 +35,10 @@ export type WebClickHouseClient = Omit<WebClickHouseClientImpl, 'insert'> & {
values: ReadonlyArray<T> | InputJSON<T> | InputJSONObjectEachRow<T>
},
): Promise<InsertResult>
/** See {@link ClickHouseClient.exec}.
*
* Custom values are currently not supported in the web versions. */
exec(params: ExecParams): Promise<ExecResult<ReadableStream>>
}

class WebClickHouseClientImpl extends ClickHouseClient<ReadableStream> {
Expand Down
2 changes: 1 addition & 1 deletion packages/client-web/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '1.3.0'
export default '1.4.0'
Loading