Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gatsby-core-utils): create proper mutex #34761

Merged
merged 7 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/how-to/testing/unit-testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ module.exports = {
".+\\.(css|styl|less|sass|scss)$": `identity-obj-proxy`,
".+\\.(jpg|jpeg|png|gif|eot|otf|webp|svg|ttf|woff|woff2|mp4|webm|wav|mp3|m4a|aac|oga)$": `<rootDir>/__mocks__/file-mock.js`,
"^gatsby-page-utils/(.*)$": `gatsby-page-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
"^gatsby-core-utils/(.*)$": `gatsby-core-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
},
testPathIgnorePatterns: [`node_modules`, `\\.cache`, `<rootDir>.*/public`],
transformIgnorePatterns: [`node_modules/(?!(gatsby)/)`],
Expand Down
1 change: 1 addition & 0 deletions examples/using-jest/jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module.exports = {
".+\\.(css|styl|less|sass|scss)$": `identity-obj-proxy`,
".+\\.(jpg|jpeg|png|gif|eot|otf|webp|svg|ttf|woff|woff2|mp4|webm|wav|mp3|m4a|aac|oga)$": `<rootDir>/__mocks__/file-mock.js`,
"^gatsby-page-utils/(.*)$": `gatsby-page-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
"^gatsby-core-utils/(.*)$": `gatsby-core-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
},
testPathIgnorePatterns: [`node_modules`, `.cache`],
transformIgnorePatterns: [`node_modules/(?!(gatsby)/)`],
Expand Down
1 change: 1 addition & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ module.exports = {
"^ordered-binary$": `<rootDir>/node_modules/ordered-binary/dist/index.cjs`,
"^msgpackr$": `<rootDir>/node_modules/msgpackr/dist/node.cjs`,
"^gatsby-page-utils/(.*)$": `gatsby-page-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
"^gatsby-core-utils/(.*)$": `gatsby-core-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
},
snapshotSerializers: [`jest-serializer-path`],
collectCoverageFrom: coverageDirs,
Expand Down
17 changes: 17 additions & 0 deletions packages/gatsby-core-utils/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,20 @@ const requireUtil = createRequireFromPath("../src/utils/")
requireUtil("./some-tool")
// ...
```

### Mutex

When working inside workers or async operations you want some kind of concurrency control that a specific work load can only concurrent one at a time. This is what a [Mutex](https://en.wikipedia.org/wiki/Mutual_exclusion) does.

By implementing the following code, the code is only executed one at a time and the other threads/async workloads are awaited until the current one is done. This is handy when writing to the same file to disk.

```js
const { createMutex } = require("gatsby-core-utils/mutex")

const mutex = createMutex("my-custom-mutex-key")
await mutex.acquire()

await fs.writeFile("pathToFile", "my custom content")

await mutex.release()
```
15 changes: 15 additions & 0 deletions packages/gatsby-core-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@
"gatsby",
"gatsby-core-utils"
],
"exports": {
".": "./dist/index.js",
"./*": "./dist/*.js"
},
"typesVersions": {
"*": {
"*": [
"dist/*.d.ts",
"dist/index.d.ts"
]
}
},
"author": "Ward Peeters <[email protected]>",
"homepage": "https://github.com/gatsbyjs/gatsby/tree/master/packages/gatsby-core-utils#readme",
"license": "MIT",
Expand Down Expand Up @@ -36,9 +48,12 @@
"file-type": "^16.5.3",
"fs-extra": "^10.0.0",
"got": "^11.8.3",
"import-from": "^4.0.0",
"lock": "^1.1.0",
"lmdb": "^2.1.7",
wardpeet marked this conversation as resolved.
Show resolved Hide resolved
"node-object-hash": "^2.3.10",
"proper-lockfile": "^4.1.2",
"resolve-from": "^5.0.0",
"tmp": "^0.2.1",
"xdg-basedir": "^4.0.0"
},
Expand Down
99 changes: 99 additions & 0 deletions packages/gatsby-core-utils/src/__tests__/mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import path from "path"
import { remove, mkdirp } from "fs-extra"
import { createMutex } from "../mutex"
import * as storage from "../utils/get-storage"

jest.spyOn(storage, `getDatabaseDir`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw this in a beforeEach and reset it within an afterEach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure what the point is here? Only thing spyOn does is make it possible to listen to it.


function sleep(timeout = 100): Promise<void> {
return new Promise(resolve => setTimeout(resolve, timeout))
}

async function doAsync(
mutex: ReturnType<typeof createMutex>,
result: Array<string> = [],
waitTime: number,
id: string
): Promise<Array<string>> {
await mutex.acquire()
result.push(`start ${id}`)
await sleep(waitTime)
result.push(`stop ${id}`)
await mutex.release()

return result
}

describe(`mutex`, () => {
const cachePath = path.join(__dirname, `.cache`)
beforeAll(async () => {
await mkdirp(cachePath)
storage.getDatabaseDir.mockReturnValue(cachePath)
})

afterAll(async () => {
await storage.closeDatabase()
await remove(cachePath)
})

it(`should only allow one action go through at the same time`, async () => {
wardpeet marked this conversation as resolved.
Show resolved Hide resolved
const mutex = createMutex(`test-key`, 300)

const result: Array<string> = []

doAsync(mutex, result, 50, `1`)
await sleep(0)
await doAsync(mutex, result, 10, `2`)

expect(result).toMatchInlineSnapshot(`
Array [
"start 1",
"stop 1",
"start 2",
"stop 2",
]
`)
})

it(`should generate the same mutex if key are identical`, async () => {
const mutex1 = createMutex(`test-key`, 300)
const mutex2 = createMutex(`test-key`, 300)

const result: Array<string> = []

const mutexPromise = doAsync(mutex1, result, 50, `1`)
await sleep(0)
await doAsync(mutex2, result, 10, `2`)
await mutexPromise

expect(result).toMatchInlineSnapshot(`
Array [
"start 1",
"stop 1",
"start 2",
"stop 2",
]
`)
})

it(`shouldn't wait if keys are different`, async () => {
const mutex1 = createMutex(`test-key`, 300)
const mutex2 = createMutex(`other-key`, 300)

const result: Array<string> = []

const mutexPromise = doAsync(mutex1, result, 50, `1`)
await sleep(0)
await doAsync(mutex2, result, 10, `2`)
await mutexPromise

expect(result).toMatchInlineSnapshot(`
Array [
"start 1",
"start 2",
"stop 2",
"stop 1",
]
`)
})
})
57 changes: 57 additions & 0 deletions packages/gatsby-core-utils/src/mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { getStorage, LockStatus, getDatabaseDir } from "./utils/get-storage"

interface IMutex {
acquire(): Promise<void>
release(): Promise<void>
}

// Random number to re-check if mutex got released
const DEFAULT_MUTEX_INTERVAL = 3000

async function waitUntilUnlocked(
storage: ReturnType<typeof getStorage>,
key: string,
timeout: number
): Promise<void> {
const isUnlocked = await storage.mutex.ifNoExists(key, () => {
storage.mutex.put(key, LockStatus.Locked)
})

if (isUnlocked) {
return
}

await new Promise<void>(resolve => {
setTimeout(() => {
resolve(waitUntilUnlocked(storage, key, timeout))
}, timeout)
})
}

/**
* Creates a mutex, make sure to call `release` when you're done with it.
*
* @param {string} key A unique key
*/
export function createMutex(
key: string,
timeout = DEFAULT_MUTEX_INTERVAL
): IMutex {
const storage = getStorage(getDatabaseDir())
const BUILD_ID = global.__GATSBY?.buildId ?? ``
const prefixedKey = `${BUILD_ID}-${key}`

return {
acquire: (): Promise<void> =>
waitUntilUnlocked(storage, prefixedKey, timeout),
release: async (): Promise<void> => {
await storage.mutex.remove(prefixedKey)
},
}
}

export async function releaseAllMutexes(): Promise<void> {
const storage = getStorage(getDatabaseDir())

await storage.mutex.clearAsync()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you view this being used for? It feels like it could be a footgun for plugin creators, and feel iffy about exposing it.

Copy link
Contributor Author

@wardpeet wardpeet Feb 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll use it in gatsby core, that's the only reason. I can't clear mutexes other wise from other builds, let's say a previous build crashed and mutexes are still alive, this list can grow until someone does gatsby clean.

Seems like I need to update this branch to latest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
16 changes: 16 additions & 0 deletions packages/gatsby-core-utils/src/utils/get-lmdb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import path from "path"
import importFrom from "import-from"
import resolveFrom from "resolve-from"

export function getLmdb(): typeof import("lmdb") {
const gatsbyPkgRoot = path.dirname(
resolveFrom(process.cwd(), `gatsby/package.json`)
)

// Try to use lmdb from gatsby if not we use our own version
try {
return importFrom(gatsbyPkgRoot, `lmdb`) as typeof import("lmdb")
} catch (err) {
return require(`lmdb`)
}
}
68 changes: 68 additions & 0 deletions packages/gatsby-core-utils/src/utils/get-storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import path from "path"
import { getLmdb } from "./get-lmdb"
import type { RootDatabase, Database } from "lmdb"

export enum LockStatus {
Locked = 0,
Unlocked = 1,
}

interface ICoreUtilsDatabase {
mutex: Database<LockStatus, string>
}

let databases: ICoreUtilsDatabase | undefined
let rootDb: RootDatabase

export function getDatabaseDir(): string {
const rootDir = global.__GATSBY?.root ?? process.cwd()
return path.join(rootDir, `.cache`, `data`, `gatsby-core-utils`)
}

export function getStorage(fullDbPath: string): ICoreUtilsDatabase {
if (!databases) {
if (!fullDbPath) {
throw new Error(`LMDB path is not set!`)
}

// __GATSBY_OPEN_LMDBS tracks if we already opened given db in this process
// In `gatsby serve` case we might try to open it twice - once for engines
// and second to get access to `SitePage` nodes (to power trailing slashes
// redirect middleware). This ensure there is single instance within a process.
// Using more instances seems to cause weird random errors.
if (!globalThis.__GATSBY_OPEN_LMDBS) {
globalThis.__GATSBY_OPEN_LMDBS = new Map()
}

databases = globalThis.__GATSBY_OPEN_LMDBS.get(fullDbPath)

if (databases) {
return databases
}

const open = getLmdb().open

rootDb = open({
name: `root`,
path: fullDbPath,
compression: true,
sharedStructuresKey: Symbol.for(`structures`),
})

databases = {
mutex: rootDb.openDB({
name: `mutex`,
}),
}

globalThis.__GATSBY_OPEN_LMDBS.set(fullDbPath, databases)
}

return databases as ICoreUtilsDatabase
}

export async function closeDatabase(): Promise<void> {
if (rootDb) {
await rootDb.close()
}
}
51 changes: 25 additions & 26 deletions packages/gatsby/src/services/initialize.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import _ from "lodash"
import { slash, isCI } from "gatsby-core-utils"
import { releaseAllMutexes } from "gatsby-core-utils/mutex"
import fs from "fs-extra"
import md5File from "md5-file"
import crypto from "crypto"
Expand Down Expand Up @@ -407,34 +408,29 @@ export async function initialize({
// }
// }

if (
process.env.GATSBY_EXPERIMENTAL_PRESERVE_FILE_DOWNLOAD_CACHE ||
process.env.GATSBY_EXPERIMENTAL_PRESERVE_WEBPACK_CACHE
) {
const deleteGlobs = [
// By default delete all files & subdirectories
`${cacheDirectory}/**`,
`${cacheDirectory}/*/`,
]

if (process.env.GATSBY_EXPERIMENTAL_PRESERVE_FILE_DOWNLOAD_CACHE) {
// Stop the caches directory from being deleted, add all sub directories,
// but remove gatsby-source-filesystem
deleteGlobs.push(`!${cacheDirectory}/caches`)
deleteGlobs.push(`${cacheDirectory}/caches/*`)
deleteGlobs.push(`!${cacheDirectory}/caches/gatsby-source-filesystem`)
}
const deleteGlobs = [
// By default delete all files & subdirectories
`${cacheDirectory}/**`,
`!${cacheDirectory}/data`,
`${cacheDirectory}/data/**`,
`!${cacheDirectory}/data/gatsby-core-utils/`,
`!${cacheDirectory}/data/gatsby-core-utils/**`,
]

if (process.env.GATSBY_EXPERIMENTAL_PRESERVE_FILE_DOWNLOAD_CACHE) {
// Stop the caches directory from being deleted, add all sub directories,
// but remove gatsby-source-filesystem
deleteGlobs.push(`!${cacheDirectory}/caches`)
deleteGlobs.push(`${cacheDirectory}/caches/*`)
deleteGlobs.push(`!${cacheDirectory}/caches/gatsby-source-filesystem`)
}

if (process.env.GATSBY_EXPERIMENTAL_PRESERVE_WEBPACK_CACHE) {
// Add webpack
deleteGlobs.push(`!${cacheDirectory}/webpack`)
}
await del(deleteGlobs)
} else {
// Attempt to empty dir if remove fails,
// like when directory is mount point
await fs.remove(cacheDirectory).catch(() => fs.emptyDir(cacheDirectory))
if (process.env.GATSBY_EXPERIMENTAL_PRESERVE_WEBPACK_CACHE) {
wardpeet marked this conversation as resolved.
Show resolved Hide resolved
// Add webpack
deleteGlobs.push(`!${cacheDirectory}/webpack`)
}

await del(deleteGlobs)
} catch (e) {
reporter.error(`Failed to remove .cache files.`, e)
}
Expand All @@ -445,6 +441,9 @@ export async function initialize({
cacheIsCorrupt,
})

// make sure all previous mutexes are released
await releaseAllMutexes()

// in future this should show which plugin's caches are purged
// possibly should also have which plugins had caches
telemetry.decorateEvent(`BUILD_END`, {
Expand Down
Loading