Skip to content

Commit

Permalink
Move update_client_view entirely into database.
Browse files Browse the repository at this point in the history
  • Loading branch information
aboodman committed Dec 18, 2023
1 parent 91df934 commit 47f024c
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 154 deletions.
12 changes: 7 additions & 5 deletions server/src/pull/__tests__/cvr.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/*
import {test, expect, vi} from 'vitest';
import {
findCreates,
Expand All @@ -23,7 +24,7 @@ import {makeComment, makeDescription, makeIssue, reset} from './example-data';
test('getCVR', async () => {
const cgid = nanoid();
await withExecutor(async executor => {
await executor(/*sql*/ `INSERT INTO "client_view"
await executor(/*sql* / `INSERT INTO "client_view"
("client_group_id", "client_version", "version") VALUES
('${cgid}', 1, 1)`);
Expand All @@ -44,7 +45,7 @@ test('getCVR', async () => {
test('findMaxClientViewVersion', async () => {
const cgid = nanoid();
await withExecutor(async executor => {
await executor(/*sql*/ `INSERT INTO "client_view"
await executor(/*sql* / `INSERT INTO "client_view"
("client_group_id", "version", "client_version") VALUES
('${cgid}', 1, 1), ('${cgid}', 2, 2), ('${cgid}', 3, 3)`);
Expand Down Expand Up @@ -650,8 +651,9 @@ test('findCreates', async () => {
async function clearTables(executor: Executor) {
for (const table of syncedTables) {
await executor(/*sql*/ `DELETE FROM "${table}"`);
await executor(/*sql* / `DELETE FROM "${table}"`);
}
await executor(/*sql*/ `DELETE FROM "client_view"`);
await executor(/*sql*/ `DELETE FROM "client_view_entry"`);
await executor(/*sql* / `DELETE FROM "client_view"`);
await executor(/*sql* / `DELETE FROM "client_view_entry"`);
}
*/
2 changes: 2 additions & 0 deletions server/src/pull/__tests__/pull.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/*
import {test, expect, vi} from 'vitest';
import {Deletes, Puts, syncedTables} from '../cvr';
import {hasNextPage, isResponseEmpty, LIMIT, mergePuts} from '../pull';
Expand Down Expand Up @@ -85,3 +86,4 @@ test('merge puts', () => {
expect(target.puts.description).toEqual(source.puts.description);
expect(target.puts.comment).toEqual(source.puts.comment);
});
*/
100 changes: 0 additions & 100 deletions server/src/pull/cvr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,106 +36,6 @@ export type Deletes = {
[P in keyof TableType]: string[];
};

export async function recordCreates<T extends keyof TableType>(
executor: Executor,
table: T,
clientGroupID: string,
clientViewVersion: number,
) {
console.log('recordCreates', table, clientGroupID, clientViewVersion);
const sql = `INSERT INTO client_view_entry as cve (
SELECT
cast($1 as varchar(36)) as client_group_id,
$3 as entity,
t.id as entity_id,
t.version as entity_version,
false as deleted,
$2 as client_view_version
FROM ${table} AS t
WHERE t.id NOT IN (
SELECT cve2.entity_id FROM client_view_entry cve2 WHERE
cve2.entity = $3 AND
cve2.client_group_id = $1 AND
cve2.deleted = false
)
)
ON CONFLICT (entity, entity_id, client_group_id) DO UPDATE SET
entity_version = EXCLUDED.entity_version,
deleted = false,
client_view_version = $2
`;

const params = [clientGroupID, clientViewVersion, TableOrdinal[table]];
await executor(sql, params);
}

export async function recordUpdates<T extends keyof TableType>(
executor: Executor,
table: T,
clientGroupID: string,
clientViewVersion: number,
) {
// The conflict on the INSERT will *always* fire. Using this as a hacky way
// to do a mass-update. Using UPDATE FROM was super slow because it treated
// the subselect as a nested loop whereas the subquery here is treated as an
// index scan.
const sql = `INSERT INTO client_view_entry as cve (
SELECT
cve2.client_group_id,
cve2.entity,
cve2.entity_id,
t.version as entity_version,
cve2.deleted as deleted,
$3 as client_view_version
FROM client_view_entry AS cve2
JOIN ${table} AS t ON cve2.entity_id = t.id
WHERE
cve2.client_group_id = $2 AND
cve2.entity = $1 AND
cve2.entity_version != t.version AND
cve2.deleted = false
)
ON CONFLICT (client_group_id, entity, entity_id)
DO UPDATE SET
client_view_version = EXCLUDED.client_view_version,
entity_version = EXCLUDED.entity_version;
`;

const params = [TableOrdinal[table], clientGroupID, clientViewVersion];
const ret = await executor(sql, params);
return ret.rowCount ?? 0;
}

export async function recordDeletes<T extends keyof TableType>(
executor: Executor,
table: T,
clientGroupID: string,
clientViewVersion: number,
) {
// The conflict on the INSERT will *always* fire. Using this as a hacky way
// to do a mass-update. Using UPDATE FROM was super slow because it treated
// the subselect as a nested loop whereas the subquery here is treated as an
// index scan.
const sql = `INSERT INTO client_view_entry as cve (
SELECT *
FROM client_view_entry as cve2
WHERE
cve2.client_group_id = $2 AND
cve2.entity = $1 AND
cve2.deleted = false AND
cve2.entity_id NOT IN (
SELECT id FROM ${table}
)
)
ON CONFLICT (client_group_id, entity, entity_id)
DO UPDATE SET
deleted = true,
client_view_version = $3;`;

const params = [TableOrdinal[table], clientGroupID, clientViewVersion];
await executor(sql, params);
}

export async function getPutsSince<T extends keyof TableType>(
executor: Executor,
table: T,
Expand Down
32 changes: 17 additions & 15 deletions server/src/pull/pull.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,7 @@ import type {PatchOperation, PullResponse, PullResponseOKV1} from 'replicache';
import {transact, Executor} from '../pg';
import {getClientGroupForUpdate, putClientGroup} from '../data';
import type Express from 'express';
import {
syncedTables,
Puts,
Deletes,
recordCreates,
recordUpdates,
recordDeletes,
getDelsSince,
getPutsSince,
} from './cvr';
import {syncedTables, Puts, Deletes, getDelsSince, getPutsSince} from './cvr';
import {PARTIAL_SYNC_STATE_KEY} from 'shared';

const cookieSchema = z.object({
Expand Down Expand Up @@ -190,11 +181,22 @@ async function updateClientView(
// TODO: Attempt parallelizing. Not sure if Postgres will be smart enough
// with locking since they are all writing to (different records) of the
// same table.
for (const t of syncedTables) {
await recordCreates(executor, t, clientGroupID, nextClientViewVersion);
await recordUpdates(executor, t, clientGroupID, nextClientViewVersion);
await recordDeletes(executor, t, clientGroupID, nextClientViewVersion);
}
await executor(/*sql*/ `select update_client_view(1, 'pull_issue', $1, $2)`, [
clientGroupID,
nextClientViewVersion,
]);
await executor(
/*sql*/ `select update_client_view(2, 'pull_description', $1, $2)`,
[clientGroupID, nextClientViewVersion],
);
await executor(
/*sql*/ `select update_client_view(3, 'pull_comment', $1, $2)`,
[clientGroupID, nextClientViewVersion],
);
await executor(
/*sql*/ `select update_client_view(4, 'pull_client', $1, $2)`,
[clientGroupID, nextClientViewVersion],
);
}

export async function getAllDels(
Expand Down
179 changes: 159 additions & 20 deletions server/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,139 @@ export async function createSchemaVersion1(executor: Executor) {
clientgroupid VARCHAR(36) NOT NULL,
-- Last mutation processed from this client.
lastmutationid INTEGER NOT NULL,
-- This column is generated for compatibility with the row-versioning related code.
-- This way we can treat this table as just another row-versioned table.
-- The only thing that changes about a client is its lmid, so we can use that.
version INTEGER NOT NULL GENERATED ALWAYS AS (lastmutationid) STORED,
lastmodified TIMESTAMP(6) NOT NULL
)`);

await executor(/*sql*/ `CREATE TABLE "client_view_entry" (
-- Client Group the CV was generated for.
"client_group_id" VARCHAR(36) NOT NULL,
-- Entity the entry is for.
"entity" INTEGER NOT NULL,
-- Entity ID the entry is for.
"entity_id" VARCHAR(36) NOT NULL,
-- Version the entity was at when the entry was last updated.
"entity_version" INTEGER NOT NULL,
-- Whether the entry has been deleted.
"deleted" BOOLEAN NOT NULL DEFAULT FALSE,
-- Client View Version the entry was last updated at.
"client_view_version" INTEGER NOT NULL,
-- unique by client_group_id, entity, entity_id
PRIMARY KEY ("client_group_id", "entity", "entity_id")
)`);

await executor(/*sql*/ `CREATE TYPE entity_version AS (
id VARCHAR,
version INTEGER
)`);

await executor(/*sql*/ `CREATE OR REPLACE FUNCTION invoke_pull_fn(
p_entity_pull_func_name varchar,
p_client_group_id varchar
)
RETURNS SETOF entity_version AS $$
BEGIN
RETURN QUERY EXECUTE FORMAT(
'SELECT * FROM %I(%L)',
p_entity_pull_func_name,
p_client_group_id);
END;
$$ LANGUAGE plpgsql;`);

// TODO: Consider just giving up and using a cursor and a loop.
// Not sure how it would compare perf wise. This CTE stuff is very
// indirect and magicy.
await executor(/*sql*/ `CREATE OR REPLACE FUNCTION update_client_view(
p_entity INTEGER,
p_entity_pull_func_name VARCHAR,
p_replicache_client_group VARCHAR(36),
p_client_view_version INTEGER
)
RETURNS void AS $$
BEGIN
WITH changes AS (
SELECT * FROM
invoke_pull_fn(p_entity_pull_func_name, p_replicache_client_group)
AS t
FULL OUTER JOIN (
SELECT * FROM client_view_entry
WHERE
client_group_id = p_replicache_client_group AND
entity = p_entity
) AS cve
ON t.id = cve.entity_id
WHERE
cve.entity_id IS NULL OR
cve.deleted = true OR
t.id IS NULL OR (
cve.entity_id = t.id AND cve.entity_version != t.version
)
),
to_create AS (
SELECT * FROM changes
WHERE
entity_id IS NULL OR
deleted = true
),
to_update AS (
SELECT * FROM changes
WHERE
entity_id IS NOT NULL AND
deleted = false AND
id IS NOT NULL AND
entity_version != version
),
to_delete AS (
SELECT * FROM changes
WHERE
entity_id IS NOT NULL AND
deleted = false AND
id IS NULL
),
-- creates
created AS (
INSERT INTO client_view_entry AS cve (
SELECT
p_replicache_client_group as client_group_id,
p_entity as entity,
tc.id as entity_id,
tc.version as entity_version,
false as deleted,
p_client_view_version as client_view_version
FROM to_create tc
)
ON CONFLICT (entity, entity_id, client_group_id) DO UPDATE SET
entity_version = EXCLUDED.entity_version,
deleted = false,
client_view_version = EXCLUDED.client_view_version
),
-- updates
updates AS (
UPDATE client_view_entry AS cve SET
entity_version = tu.version,
deleted = false,
client_view_version = p_client_view_version
FROM to_update tu
WHERE
cve.client_group_id = p_replicache_client_group AND
cve.entity = p_entity AND
cve.entity_id = tu.id
)
-- deletes
UPDATE client_view_entry AS cve SET
deleted = true,
client_view_version = p_client_view_version
FROM to_delete td
WHERE
cve.client_group_id = p_replicache_client_group AND
cve.entity = p_entity AND
cve.entity_id = td.entity_id;
END;
$$ LANGUAGE plpgsql;
`);

await executor(
/*sql*/ `CREATE TYPE priority AS ENUM ('NONE', 'LOW', 'MEDIUM', 'HIGH', 'URGENT')`,
);
Expand Down Expand Up @@ -79,20 +205,33 @@ export async function createSchemaVersion1(executor: Executor) {
"version" INTEGER NOT NULL
)`);

await executor(/*sql*/ `CREATE TABLE "client_view_entry" (
-- Client Group the CV was generated for.
"client_group_id" VARCHAR(36) NOT NULL,
-- Entity the entry is for.
"entity" INTEGER NOT NULL,
-- Entity ID the entry is for.
"entity_id" VARCHAR(36) NOT NULL,
-- Version the entity was at when the entry was last updated.
"entity_version" INTEGER NOT NULL,
-- Whether the entry has been deleted.
"deleted" BOOLEAN NOT NULL DEFAULT FALSE,
-- Client View Version the entry was last updated at.
"client_view_version" INTEGER NOT NULL,
-- unique by client_group_id, entity, entity_id
PRIMARY KEY ("client_group_id", "entity", "entity_id")
)`);
await executor(/*sql*/ `CREATE FUNCTION pull_issue(client_group_id varchar)
RETURNS TABLE(varchar_column VARCHAR, number_column INTEGER) AS $$
BEGIN
RETURN QUERY SELECT id, version FROM issue;
END;
$$ LANGUAGE plpgsql`);

await executor(/*sql*/ `CREATE FUNCTION pull_description(client_group_id varchar)
RETURNS TABLE(varchar_column VARCHAR, number_column INTEGER) AS $$
BEGIN
RETURN QUERY SELECT id, version FROM description;
END;
$$ LANGUAGE plpgsql`);

await executor(/*sql*/ `CREATE FUNCTION pull_comment(client_group_id varchar)
RETURNS TABLE(varchar_column VARCHAR, number_column INTEGER) AS $$
BEGIN
RETURN QUERY SELECT id, version FROM comment;
END;
$$ LANGUAGE plpgsql`);

await executor(/*sql*/ `CREATE FUNCTION pull_client(client_group_id varchar)
RETURNS TABLE(varchar_column VARCHAR, number_column INTEGER) AS $$
BEGIN
RETURN QUERY SELECT id, lastmutationid as version
FROM replicache_client c
WHERE c.clientgroupid = client_group_id;
END;
$$ LANGUAGE plpgsql`);
}
Loading

0 comments on commit 47f024c

Please sign in to comment.