Skip to content

Commit

Permalink
Merge pull request #2624 from cloudflare/lambros/d1-replication-sessi…
Browse files Browse the repository at this point in the history
…ons-API-01

Refactor the D1 binding to use D1DatabaseSession for all actions and add experimental compatibility flag enable_d1_with_sessions_api for withSession API
  • Loading branch information
lambrospetrou committed Sep 4, 2024
2 parents 07f0e29 + 4b1c31e commit 485a7ba
Show file tree
Hide file tree
Showing 10 changed files with 1,062 additions and 528 deletions.
1 change: 1 addition & 0 deletions src/cloudflare/internal/compatibility-flags.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ export const specCompliantResponseRedirect: boolean;
export const workerdExperimental: boolean;
export const durableObjectGetExisting: boolean;
export const vectorizeQueryMetadataOptional: boolean;
export const enableD1WithSessionsAPI: boolean;
287 changes: 232 additions & 55 deletions src/cloudflare/internal/d1-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import flags from 'workerd:compatibility-flags';

interface Fetcher {
fetch: typeof fetch;
}
Expand Down Expand Up @@ -52,38 +54,113 @@ type SQLError = {

type ResultsFormat = 'ARRAY_OF_OBJECTS' | 'ROWS_AND_COLUMNS' | 'NONE';

type D1SessionCommitTokenOrConstraint = string;
type D1SessionCommitToken = string;
// Indicates that the first query should go to the primary, and the rest queries
// using the same D1DatabaseSession will go to any replica that is consistent with
// the commit token maintained by the session (returned by the first query).
const D1_SESSION_CONSTRAINT_FIRST_PRIMARY = 'first-primary';
// Indicates that the first query can go anywhere (primary or replica), and the rest queries
// using the same D1DatabaseSession will go to any replica that is consistent with
// the commit token maintained by the session (returned by the first query).
const D1_SESSION_CONSTRAINT_FIRST_UNCONSTRAINED = 'first-unconstrained';

// Parsed by the D1 eyeball worker.
const D1_SESSION_COMMIT_TOKEN_HTTP_HEADER = 'x-cf-d1-session-commit-token';

class D1Database {
private readonly fetcher: Fetcher;
private readonly alwaysPrimarySession: D1DatabaseSessionAlwaysPrimary;
protected readonly fetcher: Fetcher;

public constructor(fetcher: Fetcher) {
this.fetcher = fetcher;
this.alwaysPrimarySession = new D1DatabaseSessionAlwaysPrimary(
this.fetcher
);
}

public prepare(query: string): D1PreparedStatement {
return new D1PreparedStatement(this, query);
return new D1PreparedStatement(this.alwaysPrimarySession, query);
}

public async batch<T = unknown>(
statements: D1PreparedStatement[]
): Promise<D1Result<T>[]> {
return this.alwaysPrimarySession.batch(statements);
}

public async exec(query: string): Promise<D1ExecResult> {
return this.alwaysPrimarySession.exec(query);
}

// DEPRECATED, TO BE REMOVED WITH NEXT BREAKING CHANGE
public async dump(): Promise<ArrayBuffer> {
const response = await this.fetcher.fetch('http://d1/dump', {
method: 'POST',
headers: {
'content-type': 'application/json',
},
});
if (response.status !== 200) {
try {
const err = (await response.json()) as SQLError;
throw new Error(`D1_DUMP_ERROR: ${err.error}`, {
cause: new Error(err.error),
});
} catch {
throw new Error(`D1_DUMP_ERROR: Status + ${response.status}`, {
cause: new Error(`Status ${response.status}`),
});
}
return this.alwaysPrimarySession.dump();
}
}

class D1DatabaseWithSessionAPI extends D1Database {
public constructor(fetcher: Fetcher) {
super(fetcher);
}

public withSession(
constraintOrToken: D1SessionCommitTokenOrConstraint | null | undefined
): D1DatabaseSession {
constraintOrToken = constraintOrToken?.trim();
if (
constraintOrToken === null ||
constraintOrToken === undefined ||
constraintOrToken === ''
) {
constraintOrToken = D1_SESSION_CONSTRAINT_FIRST_UNCONSTRAINED;
}
return await response.arrayBuffer();
return new D1DatabaseSession(this.fetcher, constraintOrToken);
}
}

class D1DatabaseSession {
protected fetcher: Fetcher;
protected commitTokenOrConstraint: D1SessionCommitTokenOrConstraint;

public constructor(
fetcher: Fetcher,
commitTokenOrConstraint: D1SessionCommitTokenOrConstraint
) {
this.fetcher = fetcher;
this.commitTokenOrConstraint = commitTokenOrConstraint;

if (!this.commitTokenOrConstraint) {
throw new Error('D1_SESSION_ERROR: invalid commit token or constraint');
}
}

// Update the commit token IFF the given newCommitToken is more recent.
// The commit token held in the session should always be the latest value we
// have observed in the responses to our API. There can be cases where we have concurrent
// queries running within the same session, and therefore here we ensure we only
// retain the latest commit token received.
// @returns the final commit token after the update.
protected _updateCommitToken(
newCommitToken: D1SessionCommitToken
): D1SessionCommitToken | null {
newCommitToken = newCommitToken.trim();
if (!newCommitToken) {
// We should not be receiving invalid commit tokens, but just be defensive.
return this.getCommitToken();
}
const currentCommitToken = this.getCommitToken();
if (
currentCommitToken === null ||
currentCommitToken.localeCompare(newCommitToken) < 0
) {
this.commitTokenOrConstraint = newCommitToken;
}
return this.getCommitToken();
}

public prepare(sql: string): D1PreparedStatement {
return new D1PreparedStatement(this, sql);
}

public async batch<T = unknown>(
Expand All @@ -98,34 +175,52 @@ class D1Database {
return exec.map(toArrayOfObjects);
}

public async exec(query: string): Promise<D1ExecResult> {
const lines = query.trim().split('\n');
const _exec = await this._send('/execute', lines, [], 'NONE');
const exec = Array.isArray(_exec) ? _exec : [_exec];
const error = exec
.map((r) => {
return r.error ? 1 : 0;
})
.indexOf(1);
if (error !== -1) {
throw new Error(
`D1_EXEC_ERROR: Error in line ${error + 1}: ${lines[error]}: ${
exec[error]?.error
}`,
{
cause: new Error(
`Error in line ${error + 1}: ${lines[error]}: ${exec[error]?.error}`
),
}
);
// Returns the latest commit token we received from all responses processed so far.
// It does not return constraints that might have be passed during the session creation.
public getCommitToken(): D1SessionCommitToken | null {
switch (this.commitTokenOrConstraint) {
// First to any replica, and then anywhere that satisfies the commit token.
case D1_SESSION_CONSTRAINT_FIRST_UNCONSTRAINED:
// First to primary, and then anywhere that satisfies the commit token.
case D1_SESSION_CONSTRAINT_FIRST_PRIMARY:
return null;
default:
return this.commitTokenOrConstraint;
}
}

// fetch will append the commit token header to all outgoing fetch calls.
// The response headers are parsed automatically, extracting the commit token
// from the response headers and updating it through `_updateCommitToken(token)`.
protected async _wrappedFetch(
input: RequestInfo | URL,
init?: RequestInit
): Promise<Response> {
// We append the commit token to all fetch queries.
const h = new Headers(init?.headers);

// We send either a constraint, or a commit token, and the eyeball worker will figure out
// what to do based on the value. This simulates the same flow as the REST API would behave too.
if (this.commitTokenOrConstraint) {
h.set(D1_SESSION_COMMIT_TOKEN_HTTP_HEADER, this.commitTokenOrConstraint);
}

if (!init) {
init = { headers: h };
} else {
return {
count: exec.length,
duration: exec.reduce((p, c) => {
return p + (c.meta['duration'] as number);
}, 0),
};
init.headers = h;
}
return this.fetcher.fetch(input, init).then((resp) => {
const newCommitToken = resp.headers.get(
D1_SESSION_COMMIT_TOKEN_HTTP_HEADER
);
// TODO(soon): Add validation of the received commit token, in case we sent a commit token,
// otherwise sessions functionality could be inconsistent.
if (newCommitToken) {
this._updateCommitToken(newCommitToken);
}
return resp;
});
}

public async _sendOrThrow<T = unknown>(
Expand Down Expand Up @@ -165,7 +260,7 @@ class D1Database {

const url = new URL(endpoint, 'http://d1');
url.searchParams.set('resultsFormat', resultsFormat);
const response = await this.fetcher.fetch(url.href, {
const response = await this._wrappedFetch(url.href, {
method: 'POST',
headers: {
'content-type': 'application/json',
Expand Down Expand Up @@ -196,17 +291,96 @@ class D1Database {
}
}

class D1DatabaseSessionAlwaysPrimary extends D1DatabaseSession {
public constructor(fetcher: Fetcher) {
// Will always go to primary, since we won't be ever updating this constraint.
super(fetcher, D1_SESSION_CONSTRAINT_FIRST_PRIMARY);
}

// We ignore commit tokens for this special type of session,
// since all queries are sent to the primary.
public override _updateCommitToken(
_newCommitToken: D1SessionCommitToken
): D1SessionCommitToken | null {
return null;
}

// There is not commit token returned every by this special type of session,
// since all queries are sent to the primary.
public override getCommitToken(): D1SessionCommitToken | null {
return null;
}

//////////////////////////////////////////////////////////////////////////////////////////////
// These are only used by the D1Database which is our existing API pre-Sessions API.
// For backwards compatibility they always go to the primary database.
//

public async exec(query: string): Promise<D1ExecResult> {
const lines = query.trim().split('\n');
const _exec = await this._send('/execute', lines, [], 'NONE');
const exec = Array.isArray(_exec) ? _exec : [_exec];
const error = exec
.map((r) => {
return r.error ? 1 : 0;
})
.indexOf(1);
if (error !== -1) {
throw new Error(
`D1_EXEC_ERROR: Error in line ${error + 1}: ${lines[error]}: ${
exec[error]?.error
}`,
{
cause: new Error(
`Error in line ${error + 1}: ${lines[error]}: ${exec[error]?.error}`
),
}
);
} else {
return {
count: exec.length,
duration: exec.reduce((p, c) => {
return p + (c.meta['duration'] as number);
}, 0),
};
}
}

// DEPRECATED, TO BE REMOVED WITH NEXT BREAKING CHANGE
public async dump(): Promise<ArrayBuffer> {
const response = await this._wrappedFetch('http://d1/dump', {
method: 'POST',
headers: {
'content-type': 'application/json',
},
});
if (response.status !== 200) {
try {
const err = (await response.json()) as SQLError;
throw new Error(`D1_DUMP_ERROR: ${err.error}`, {
cause: new Error(err.error),
});
} catch {
throw new Error(`D1_DUMP_ERROR: Status + ${response.status}`, {
cause: new Error(`Status ${response.status}`),
});
}
}
return await response.arrayBuffer();
}
}

class D1PreparedStatement {
private readonly database: D1Database;
private readonly dbSession: D1DatabaseSession;
public readonly statement: string;
public readonly params: unknown[];

public constructor(
database: D1Database,
dbSession: D1DatabaseSession,
statement: string,
values?: unknown[]
) {
this.database = database;
this.dbSession = dbSession;
this.statement = statement;
this.params = values || [];
}
Expand Down Expand Up @@ -249,7 +423,7 @@ class D1PreparedStatement {
);
});
return new D1PreparedStatement(
this.database,
this.dbSession,
this.statement,
transformedValues
);
Expand All @@ -261,7 +435,7 @@ class D1PreparedStatement {
colName?: string
): Promise<Record<string, T> | T | null> {
const info = firstIfArray(
await this.database._sendOrThrow<Record<string, T>>(
await this.dbSession._sendOrThrow<Record<string, T>>(
'/query',
this.statement,
this.params,
Expand All @@ -288,7 +462,7 @@ class D1PreparedStatement {

public async run<T = Record<string, unknown>>(): Promise<D1Response> {
return firstIfArray(
await this.database._sendOrThrow<T>(
await this.dbSession._sendOrThrow<T>(
'/execute',
this.statement,
this.params,
Expand All @@ -300,7 +474,7 @@ class D1PreparedStatement {
public async all<T = Record<string, unknown>>(): Promise<D1Result<T[]>> {
return toArrayOfObjects(
firstIfArray(
await this.database._sendOrThrow<T[]>(
await this.dbSession._sendOrThrow<T[]>(
'/query',
this.statement,
this.params,
Expand All @@ -312,7 +486,7 @@ class D1PreparedStatement {

public async raw<T = unknown[]>(options?: D1RawOptions): Promise<T[]> {
const s = firstIfArray(
await this.database._sendOrThrow<Record<string, unknown>>(
await this.dbSession._sendOrThrow<Record<string, unknown>>(
'/query',
this.statement,
this.params,
Expand Down Expand Up @@ -400,5 +574,8 @@ async function toJson<T = unknown>(response: Response): Promise<T> {
}

export default function makeBinding(env: { fetcher: Fetcher }): D1Database {
if (flags.enableD1WithSessionsAPI) {
return new D1DatabaseWithSessionAPI(env.fetcher);
}
return new D1Database(env.fetcher);
}
Loading

0 comments on commit 485a7ba

Please sign in to comment.