Skip to content

Commit

Permalink
feat: add support for streaming (adelsz#569)
Browse files Browse the repository at this point in the history
  • Loading branch information
EloB authored and zth committed Apr 27, 2024
1 parent fbe031e commit f9f51f5
Showing 1 changed file with 52 additions and 0 deletions.
52 changes: 52 additions & 0 deletions packages/runtime/src/tag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ import { SQLQueryIR, parseTSQuery, TSQueryAST } from '@pgtyped/parser';
import { processSQLQueryIR } from './preprocessor-sql.js';
import { processTSQueryAST } from './preprocessor-ts.js';

export interface ICursor<T> {
read(rowCount: number): Promise<T>;
close(): Promise<void>;
}

export interface IDatabaseConnection {
query: (query: string, bindings: any[]) => Promise<{ rows: any[] }>;
stream?: (query: string, bindings: any[]) => ICursor<any[]>;
}

/** Check for column modifier suffixes (exclamation and question marks). */
Expand Down Expand Up @@ -32,6 +38,11 @@ export class TaggedQuery<TTypePair extends { params: any; result: any }> {
dbConnection: IDatabaseConnection,
) => Promise<Array<TTypePair['result']>>;

public stream: (
params: TTypePair['params'],
dbConnection: IDatabaseConnection,
) => ICursor<Array<TTypePair['result']>>;

private readonly query: TSQueryAST;

constructor(query: TSQueryAST) {
Expand All @@ -44,6 +55,24 @@ export class TaggedQuery<TTypePair extends { params: any; result: any }> {
const result = await connection.query(processedQuery, bindings);
return mapQueryResultRows(result.rows);
};
this.stream = (params, connection) => {
const { query: processedQuery, bindings } = processTSQueryAST(
this.query,
params as any,
);
if (connection.stream == null)
throw new Error("Connection doesn't support streaming.");
const cursor = connection.stream(processedQuery, bindings);
return {
async read(rowCount: number) {
const rows = await cursor.read(rowCount);
return mapQueryResultRows(rows);
},
async close() {
await cursor.close();
},
};
};
}
}

Expand All @@ -66,6 +95,11 @@ export class PreparedQuery<TParamType, TResultType> {
dbConnection: IDatabaseConnection,
) => Promise<Array<TResultType>>;

public stream: (
params: TParamType,
dbConnection: IDatabaseConnection,
) => ICursor<Array<TResultType>>;

private readonly queryIR: SQLQueryIR;

constructor(queryIR: SQLQueryIR) {
Expand All @@ -78,6 +112,24 @@ export class PreparedQuery<TParamType, TResultType> {
const result = await connection.query(processedQuery, bindings);
return mapQueryResultRows(result.rows);
};
this.stream = (params, connection) => {
const { query: processedQuery, bindings } = processSQLQueryIR(
this.queryIR,
params as any,
);
if (connection.stream == null)
throw new Error("Connection doesn't support streaming.");
const cursor = connection.stream(processedQuery, bindings);
return {
async read(rowCount: number) {
const rows = await cursor.read(rowCount);
return mapQueryResultRows(rows);
},
async close() {
await cursor.close();
},
};
};
}
}

Expand Down

0 comments on commit f9f51f5

Please sign in to comment.