Skip to content

Commit

Permalink
rebase after #1662
Browse files Browse the repository at this point in the history
  • Loading branch information
Fil committed Sep 17, 2024
1 parent e467a6b commit bf450c0
Show file tree
Hide file tree
Showing 16 changed files with 221 additions and 43 deletions.
74 changes: 46 additions & 28 deletions src/fileWatchers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type {FSWatcher} from "node:fs";
import {watch} from "node:fs";
import {readFileSync, watch} from "node:fs";
import {join} from "node:path/posix";
import {isEnoent} from "./error.js";
import {maybeStat} from "./files.js";
import type {LoaderResolver} from "./loader.js";
Expand All @@ -11,38 +12,55 @@ export class FileWatchers {
static async of(loaders: LoaderResolver, path: string, names: Iterable<string>, callback: (name: string) => void) {
const that = new FileWatchers();
const {watchers} = that;
const {root} = loaders;
for (const name of names) {
const watchPath = loaders.getWatchPath(resolvePath(path, name));
if (!watchPath) continue;
let currentStat = await maybeStat(watchPath);
let watcher: FSWatcher;
const index = watchers.length;
const path0 = resolvePath(path, name);
const paths = new Set([path0]);
try {
watcher = watch(watchPath, async function watched(type) {
// Re-initialize the watcher on the original path on rename.
if (type === "rename") {
watcher.close();
try {
watcher = watchers[index] = watch(watchPath, watched);
} catch (error) {
if (!isEnoent(error)) throw error;
console.error(`file no longer exists: ${watchPath}`);
for (const path of JSON.parse(
readFileSync(join(root, ".observablehq", "cache", `${path0}__dependencies`), "utf-8")
))
paths.add(path);
} catch (error) {
if (!isEnoent(error)) {
throw error;
}
}

for (const path of paths) {
const watchPath = loaders.getWatchPath(path);
if (!watchPath) continue;
console.warn(watchPath, name);
let currentStat = await maybeStat(watchPath);
let watcher: FSWatcher;
const index = watchers.length;
try {
watcher = watch(watchPath, async function watched(type) {
// Re-initialize the watcher on the original path on rename.
if (type === "rename") {
watcher.close();
try {
watcher = watchers[index] = watch(watchPath, watched);
} catch (error) {
if (!isEnoent(error)) throw error;
console.error(`file no longer exists: ${watchPath}`);
return;
}
setTimeout(() => watched("change"), 100); // delay to avoid a possibly-empty file
return;
}
setTimeout(() => watched("change"), 100); // delay to avoid a possibly-empty file
return;
}
const newStat = await maybeStat(watchPath);
// Ignore if the file was truncated or not modified.
if (currentStat?.mtimeMs === newStat?.mtimeMs || newStat?.size === 0) return;
currentStat = newStat;
callback(name);
});
} catch (error) {
if (!isEnoent(error)) throw error;
continue;
const newStat = await maybeStat(watchPath);
// Ignore if the file was truncated or not modified.
if (currentStat?.mtimeMs === newStat?.mtimeMs || newStat?.size === 0) return;
currentStat = newStat;
callback(name);
});
} catch (error) {
if (!isEnoent(error)) throw error;
continue;
}
watchers[index] = watcher;
}
watchers[index] = watcher;
}
return that;
}
Expand Down
97 changes: 83 additions & 14 deletions src/loader.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import {createHash} from "node:crypto";
import type {FSWatcher, WatchListener, WriteStream} from "node:fs";
import {createReadStream, existsSync, statSync, watch} from "node:fs";
import {open, readFile, rename, unlink} from "node:fs/promises";
import {createReadStream, existsSync, readFileSync, statSync, watch} from "node:fs";
import {open, readFile, rename, rm, unlink, writeFile} from "node:fs/promises";
import {dirname, extname, join} from "node:path/posix";
import {createGunzip} from "node:zlib";
import {spawn} from "cross-spawn";
import JSZip from "jszip";
import {extract} from "tar-stream";
import {enoent} from "./error.js";
import {enoent, isEnoent} from "./error.js";
import {maybeStat, prepareOutput, visitFiles} from "./files.js";
import {FileWatchers} from "./fileWatchers.js";
import {formatByteSize} from "./format.js";
Expand All @@ -16,6 +16,7 @@ import {findModule, getFileInfo, getLocalModuleHash, getModuleHash} from "./java
import type {Logger, Writer} from "./logger.js";
import type {MarkdownPage, ParseOptions} from "./markdown.js";
import {parseMarkdown} from "./markdown.js";
import {preview} from "./preview.js";
import {getModuleResolver, resolveImportPath} from "./resolvers.js";
import type {Params} from "./route.js";
import {isParameterized, requote, route} from "./route.js";
Expand Down Expand Up @@ -51,6 +52,9 @@ const defaultEffects: LoadEffects = {
export interface LoadOptions {
/** Whether to use a stale cache; true when building. */
useStale?: boolean;

/** An asset server for chained data loaders. */
FILE_SERVER?: string;
}

export interface LoaderOptions {
Expand All @@ -61,7 +65,7 @@ export interface LoaderOptions {
}

export class LoaderResolver {
private readonly root: string;
readonly root: string;
private readonly interpreters: Map<string, string[]>;

constructor({root, interpreters}: {root: string; interpreters?: Record<string, string[] | null>}) {
Expand Down Expand Up @@ -304,7 +308,21 @@ export class LoaderResolver {
const info = getFileInfo(this.root, path);
if (!info) return createHash("sha256").digest("hex");
const {hash} = info;
return path === name ? hash : createHash("sha256").update(hash).update(String(info.mtimeMs)).digest("hex");
if (path === name) return hash;
const hash2 = createHash("sha256").update(hash).update(String(info.mtimeMs));
try {
for (const path of JSON.parse(
readFileSync(join(this.root, ".observablehq", "cache", `${name}__dependencies`), "utf-8")
)) {
const info = getFileInfo(this.root, this.getSourceFilePath(path));
if (info) hash2.update(info.hash).update(String(info.mtimeMs));
}
} catch (error) {
if (!isEnoent(error)) {
throw error;
}
}
return hash2.digest("hex");
}

getOutputFileHash(name: string): string {
Expand Down Expand Up @@ -417,12 +435,37 @@ abstract class AbstractLoader implements Loader {
const outputPath = join(".observablehq", "cache", this.targetPath);
const cachePath = join(this.root, outputPath);
const loaderStat = await maybeStat(loaderPath);
const cacheStat = await maybeStat(cachePath);
if (!cacheStat) effects.output.write(faint("[missing] "));
else if (cacheStat.mtimeMs < loaderStat!.mtimeMs) {
if (useStale) return effects.output.write(faint("[using stale] ")), outputPath;
else effects.output.write(faint("[stale] "));
} else return effects.output.write(faint("[fresh] ")), outputPath;
const paths = new Set([cachePath]);
try {
for (const path of JSON.parse(await readFile(`${cachePath}__dependencies`, "utf-8"))) paths.add(path);
} catch (error) {
if (!isEnoent(error)) {
throw error;
}
}

const FRESH = 0;
const STALE = 1;
const MISSING = 2;
let status = FRESH;
for (const path of paths) {
const cacheStat = await maybeStat(path);
if (!cacheStat) {
status = MISSING;
break;
} else if (cacheStat.mtimeMs < loaderStat!.mtimeMs) status = Math.max(status, STALE);
}
switch (status) {
case FRESH:
return effects.output.write(faint("[fresh] ")), outputPath;
case STALE:
if (useStale) return effects.output.write(faint("[using stale] ")), outputPath;
effects.output.write(faint("[stale] "));
break;
case MISSING:
effects.output.write(faint("[missing] "));
break;
}
const tempPath = join(this.root, ".observablehq", "cache", `${this.targetPath}.${process.pid}`);
const errorPath = tempPath + ".err";
const errorStat = await maybeStat(errorPath);
Expand All @@ -434,15 +477,37 @@ abstract class AbstractLoader implements Loader {
await prepareOutput(tempPath);
await prepareOutput(cachePath);
const tempFd = await open(tempPath, "w");

// Launch a server for chained data loaders. TODO configure host?
const dependencies = new Set<string>();
const {server} = await preview({root: this.root, verbose: false, hostname: "127.0.0.1", dependencies});
const address = server.address();
if (!address || typeof address !== "object")
throw new Error("Couldn't launch server for chained data loaders!");
const FILE_SERVER = `http://${address.address}:${address.port}/_file/`;

try {
await this.exec(tempFd.createWriteStream({highWaterMark: 1024 * 1024}), {useStale}, effects);
await this.exec(tempFd.createWriteStream({highWaterMark: 1024 * 1024}), {useStale, FILE_SERVER}, effects);
await rename(tempPath, cachePath);
} catch (error) {
await rename(tempPath, errorPath);
throw error;
} finally {
await tempFd.close();
}

const cachedeps = `${cachePath}__dependencies`;
if (dependencies.size) await writeFile(cachedeps, JSON.stringify([...dependencies]), "utf-8");
else
try {
await rm(cachedeps);
} catch (error) {
if (!isEnoent(error)) throw error;
}

// TODO: server.close() might be enough?
await new Promise((closed) => server.close(closed));

return outputPath;
})();
command.finally(() => runningCommands.delete(key)).catch(() => {});
Expand Down Expand Up @@ -495,8 +560,12 @@ class CommandLoader extends AbstractLoader {
this.args = args;
}

async exec(output: WriteStream): Promise<void> {
const subprocess = spawn(this.command, this.args, {windowsHide: true, stdio: ["ignore", output, "inherit"]});
async exec(output: WriteStream, {FILE_SERVER}): Promise<void> {
const subprocess = spawn(this.command, this.args, {
windowsHide: true,
stdio: ["ignore", output, "inherit"],
env: {...process.env, FILE_SERVER}
});
const code = await new Promise((resolve, reject) => {
subprocess.on("error", reject);
subprocess.on("close", resolve);
Expand Down
8 changes: 7 additions & 1 deletion src/preview.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export interface PreviewOptions {
port?: number;
origins?: string[];
verbose?: boolean;
dependencies?: Set<string>;
}

export async function preview(options: PreviewOptions): Promise<PreviewServer> {
Expand All @@ -58,19 +59,22 @@ export class PreviewServer {
private readonly _server: ReturnType<typeof createServer>;
private readonly _socketServer: WebSocketServer;
private readonly _verbose: boolean;
private readonly dependencies: Set<string> | undefined;

private constructor({
config,
root,
origins = [],
server,
verbose
verbose,
dependencies
}: {
config?: string;
root?: string;
origins?: string[];
server: Server;
verbose: boolean;
dependencies?: Set<string>;
}) {
this._config = config;
this._root = root;
Expand All @@ -80,6 +84,7 @@ export class PreviewServer {
this._server.on("request", this._handleRequest);
this._socketServer = new WebSocketServer({server: this._server});
this._socketServer.on("connection", this._handleConnection);
this.dependencies = dependencies;
}

static async start({verbose = true, hostname, port, open, ...options}: PreviewOptions) {
Expand Down Expand Up @@ -172,6 +177,7 @@ export class PreviewServer {
}
throw enoent(path);
} else if (pathname.startsWith("/_file/")) {
if (this.dependencies) this.dependencies.add(pathname.slice("/_file".length));
send(req, await loaders.loadFile(pathname.slice("/_file".length)), {root}).pipe(res);
} else {
if ((pathname = normalize(pathname)).startsWith("..")) throw new Error("Invalid path: " + pathname);
Expand Down
1 change: 1 addition & 0 deletions test/input/build/chain/chain-source.json.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
echo '{"x": 3}'
1 change: 1 addition & 0 deletions test/input/build/chain/chain.json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
console.log(JSON.stringify(process.env.address, null, 2));
9 changes: 9 additions & 0 deletions test/input/build/chain/chain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Chained data loaders

```js
FileAttachment("chain1.json").json()
```

```js
FileAttachment("chain2.csv").csv({typed: true})
```
3 changes: 3 additions & 0 deletions test/input/build/chain/chain1.json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const {FILE_SERVER} = process.env;
const {x} = await fetch(`${FILE_SERVER}chain-source.json`).then((response) => response.json());
console.log(JSON.stringify({x, "x^2": x * x}, null, 2));
3 changes: 3 additions & 0 deletions test/input/build/chain/chain2.csv.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const {FILE_SERVER} = process.env;
const {x} = await fetch(`${FILE_SERVER}chain-source.json`).then((response) => response.json());
console.log(`name,value\nx,${x}\nx^2,${x * x}`);
4 changes: 4 additions & 0 deletions test/output/build/chain/_file/chain1.550fb08c.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"x": 3,
"x^2": 9
}
3 changes: 3 additions & 0 deletions test/output/build/chain/_file/chain2.b1220d22.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name,value
x,3
x^2,9
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
61 changes: 61 additions & 0 deletions test/output/build/chain/chain.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<!DOCTYPE html>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1">
<meta name="generator" content="Observable Framework v1.0.0-test">
<title>Chained data loaders</title>
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link rel="preload" as="style" href="https://fonts.googleapis.com/css2?family=Source+Serif+4:ital,opsz,wght@0,8..60,200..900;1,8..60,200..900&amp;display=swap" crossorigin>
<link rel="preload" as="style" href="./_observablehq/theme-air,near-midnight.00000004.css">
<link rel="stylesheet" type="text/css" href="https://fonts.googleapis.com/css2?family=Source+Serif+4:ital,opsz,wght@0,8..60,200..900;1,8..60,200..900&amp;display=swap" crossorigin>
<link rel="stylesheet" type="text/css" href="./_observablehq/theme-air,near-midnight.00000004.css">
<link rel="modulepreload" href="./_observablehq/client.00000001.js">
<link rel="modulepreload" href="./_observablehq/runtime.00000002.js">
<link rel="modulepreload" href="./_observablehq/stdlib.00000003.js">
<link rel="modulepreload" href="./_npm/[email protected]/cd372fb8.js">
<script type="module">

import {define} from "./_observablehq/client.00000001.js";
import {registerFile} from "./_observablehq/stdlib.00000003.js";

registerFile("./chain1.json", {"name":"./chain1.json","mimeType":"application/json","path":"./_file/chain1.550fb08c.json","lastModified":/* ts */1706742000000,"size":25});
registerFile("./chain2.csv", {"name":"./chain2.csv","mimeType":"text/csv","path":"./_file/chain2.b1220d22.csv","lastModified":/* ts */1706742000000,"size":21});

define({id: "7ecb71dd", inputs: ["FileAttachment","display"], body: async (FileAttachment,display) => {
display(await(
FileAttachment("./chain1.json").json()
))
}});

define({id: "f6c957f2", inputs: ["FileAttachment","display"], body: async (FileAttachment,display) => {
display(await(
FileAttachment("./chain2.csv").csv({typed: true})
))
}});

</script>
<input id="observablehq-sidebar-toggle" type="checkbox" title="Toggle sidebar">
<label id="observablehq-sidebar-backdrop" for="observablehq-sidebar-toggle"></label>
<nav id="observablehq-sidebar">
<ol>
<label id="observablehq-sidebar-close" for="observablehq-sidebar-toggle"></label>
<li class="observablehq-link"><a href="./">Home</a></li>
</ol>
<ol>
<li class="observablehq-link observablehq-link-active"><a href="./chain">Chained data loaders</a></li>
</ol>
</nav>
<script>{/* redacted init script */}</script>
<aside id="observablehq-toc" data-selector="h1:not(:first-of-type)[id], h2:first-child[id], :not(h1) + h2[id]">
<nav>
</nav>
</aside>
<div id="observablehq-center">
<main id="observablehq-main" class="observablehq">
<h1 id="chained-data-loaders" tabindex="-1"><a class="observablehq-header-anchor" href="#chained-data-loaders">Chained data loaders</a></h1>
<div class="observablehq observablehq--block"><observablehq-loading></observablehq-loading><!--:7ecb71dd:--></div>
<div class="observablehq observablehq--block"><observablehq-loading></observablehq-loading><!--:f6c957f2:--></div>
</main>
<footer id="observablehq-footer">
<div>Built with <a href="https://observablehq.com/" target="_blank" rel="noopener noreferrer">Observable</a> on <a title="2024-01-10T16:00:00">Jan 10, 2024</a>.</div>
</footer>
</div>

0 comments on commit bf450c0

Please sign in to comment.