diff --git a/.env b/.env new file mode 100644 index 0000000000..582460b0d8 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +NX_ISOLATE_PLUGINS=true \ No newline at end of file diff --git a/.gitignore b/.gitignore index ab215da9f1..926728d344 100644 --- a/.gitignore +++ b/.gitignore @@ -37,7 +37,7 @@ out .angular # Local dev files -.env +.env.local .bashrc .nx diff --git a/e2e/utils/get-env-info.ts b/e2e/utils/get-env-info.ts index 17e8f98a9a..0869cf9182 100644 --- a/e2e/utils/get-env-info.ts +++ b/e2e/utils/get-env-info.ts @@ -160,7 +160,7 @@ export function getStrippedEnvironmentVariables() { return true; } - const allowedKeys = ['NX_ADD_PLUGINS']; + const allowedKeys = ['NX_ADD_PLUGINS', 'NX_ISOLATE_PLUGINS']; if (key.startsWith('NX_') && !allowedKeys.includes(key)) { return false; diff --git a/nx.json b/nx.json index 3e1d6f5ff8..24750ba283 100644 --- a/nx.json +++ b/nx.json @@ -212,6 +212,6 @@ "nxCloudUrl": "https://staging.nx.app", "parallel": 1, "cacheDirectory": "/tmp/nx-cache", - "bust": 5, + "bust": 7, "defaultBase": "master" } diff --git a/packages/nx/src/command-line/run/command-object.ts b/packages/nx/src/command-line/run/command-object.ts index 6b27f0cd3b..120c2093b4 100644 --- a/packages/nx/src/command-line/run/command-object.ts +++ b/packages/nx/src/command-line/run/command-object.ts @@ -37,7 +37,7 @@ export const yargsNxInfixCommand: CommandModule = { command: '$0 [project] [_..]', describe: 'Run a target for a project', handler: async (args) => { - await handleErrors( + const exitCode = await handleErrors( (args.verbose as boolean) ?? process.env.NX_VERBOSE_LOGGING === 'true', async () => { return (await import('./run-one')).runOne( @@ -46,5 +46,6 @@ export const yargsNxInfixCommand: CommandModule = { ); } ); + process.exit(exitCode); }, }; diff --git a/packages/nx/src/daemon/socket-utils.ts b/packages/nx/src/daemon/socket-utils.ts index be83e1ae9e..3cf2640613 100644 --- a/packages/nx/src/daemon/socket-utils.ts +++ b/packages/nx/src/daemon/socket-utils.ts @@ -22,6 +22,11 @@ export const getForkedProcessOsSocketPath = (id: string) => { return isWindows ? '\\\\.\\pipe\\nx\\' + resolve(path) : resolve(path); }; +export const getPluginOsSocketPath = (id: string) => { + let path = resolve(join(getSocketDir(), 'plugin' + id + '.sock')); + return isWindows ? '\\\\.\\pipe\\nx\\' + resolve(path) : resolve(path); +}; + export function killSocketOrPath(): void { try { unlinkSync(getFullOsSocketPath()); diff --git a/packages/nx/src/plugins/js/project-graph/affected/npm-packages.spec.ts b/packages/nx/src/plugins/js/project-graph/affected/npm-packages.spec.ts index b5c7992a4e..a5ea8032ff 100644 --- a/packages/nx/src/plugins/js/project-graph/affected/npm-packages.spec.ts +++ b/packages/nx/src/plugins/js/project-graph/affected/npm-packages.spec.ts @@ -296,7 +296,7 @@ describe('getTouchedNpmPackages', () => { }); it('should handle and log workspace package.json changes when the changes are not in `npmPackages` (projectGraph.externalNodes)', () => { - jest.spyOn(logger, 'warn'); + jest.spyOn(logger, 'warn').mockImplementation(() => {}); expect(() => { getTouchedNpmPackages( [ diff --git a/packages/nx/src/project-graph/plugins/internal-api.ts b/packages/nx/src/project-graph/plugins/internal-api.ts index 230f27f381..68d4163ce0 100644 --- a/packages/nx/src/project-graph/plugins/internal-api.ts +++ b/packages/nx/src/project-graph/plugins/internal-api.ts @@ -159,7 +159,7 @@ export async function loadNxPlugins( const cleanupFunctions: Array<() => void> = []; for (const plugin of plugins) { - const [loadedPluginPromise, cleanup] = loadingMethod(plugin, root); + const [loadedPluginPromise, cleanup] = await loadingMethod(plugin, root); result.push(loadedPluginPromise); cleanupFunctions.push(cleanup); } diff --git a/packages/nx/src/project-graph/plugins/isolation/index.ts b/packages/nx/src/project-graph/plugins/isolation/index.ts index 19a5ba5aba..4bab409265 100644 --- a/packages/nx/src/project-graph/plugins/isolation/index.ts +++ b/packages/nx/src/project-graph/plugins/isolation/index.ts @@ -3,33 +3,15 @@ import { PluginConfiguration } from '../../../config/nx-json'; import { LoadedNxPlugin } from '../internal-api'; import { loadRemoteNxPlugin } from './plugin-pool'; -/** - * Used to ensure 1 plugin : 1 worker - */ -const remotePluginCache = new Map< - string, - readonly [Promise, () => void] ->(); - -export function loadNxPluginInIsolation( +export async function loadNxPluginInIsolation( plugin: PluginConfiguration, root = workspaceRoot -): readonly [Promise, () => void] { - const cacheKey = JSON.stringify(plugin); - - if (remotePluginCache.has(cacheKey)) { - return remotePluginCache.get(cacheKey); - } - - const [loadingPlugin, cleanup] = loadRemoteNxPlugin(plugin, root); - // We clean up plugin workers when Nx process completes. - const val = [ +): Promise, () => void]> { + const [loadingPlugin, cleanup] = await loadRemoteNxPlugin(plugin, root); + return [ loadingPlugin, () => { cleanup(); - remotePluginCache.delete(cacheKey); }, ] as const; - remotePluginCache.set(cacheKey, val); - return val; } diff --git a/packages/nx/src/project-graph/plugins/isolation/messaging.ts b/packages/nx/src/project-graph/plugins/isolation/messaging.ts index ce64fb43aa..bafc17fd32 100644 --- a/packages/nx/src/project-graph/plugins/isolation/messaging.ts +++ b/packages/nx/src/project-graph/plugins/isolation/messaging.ts @@ -7,9 +7,11 @@ import { CreateDependenciesContext, CreateMetadataContext, CreateNodesContext, + CreateNodesContextV2, } from '../public-api'; import { LoadedNxPlugin } from '../internal-api'; import { Serializable } from 'child_process'; +import { Socket } from 'net'; export interface PluginWorkerLoadMessage { type: 'load'; @@ -42,7 +44,7 @@ export interface PluginWorkerCreateNodesMessage { type: 'createNodes'; payload: { configFiles: string[]; - context: CreateNodesContext; + context: CreateNodesContextV2; tx: string; }; } @@ -159,6 +161,7 @@ export function isPluginWorkerMessage( 'createNodes', 'createDependencies', 'processProjectGraph', + 'createMetadata', ].includes(message.type) ); } @@ -175,6 +178,7 @@ export function isPluginWorkerResult( 'createNodesResult', 'createDependenciesResult', 'processProjectGraphResult', + 'createMetadataResult', ].includes(message.type) ); } @@ -192,6 +196,7 @@ type MessageHandlerReturn = export async function consumeMessage< T extends PluginWorkerMessage | PluginWorkerResult >( + socket: Socket, raw: T, handlers: { [K in T['type']]: ( @@ -205,7 +210,14 @@ export async function consumeMessage< if (handler) { const response = await handler(message.payload); if (response) { - process.send!(response); + sendMessageOverSocket(socket, response); } } } + +export function sendMessageOverSocket( + socket: Socket, + message: PluginWorkerMessage | PluginWorkerResult +) { + socket.write(JSON.stringify(message) + String.fromCodePoint(4)); +} diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts index ecfc269de7..550b074643 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts @@ -7,53 +7,44 @@ import { PluginConfiguration } from '../../../config/nx-json'; // import { logger } from '../../utils/logger'; import { LoadedNxPlugin, nxPluginCache } from '../internal-api'; -import { consumeMessage, isPluginWorkerResult } from './messaging'; +import { getPluginOsSocketPath } from '../../../daemon/socket-utils'; +import { consumeMessagesFromSocket } from '../../../utils/consume-messages-from-socket'; + +import { + consumeMessage, + isPluginWorkerResult, + sendMessageOverSocket, +} from './messaging'; +import { Socket, connect } from 'net'; const cleanupFunctions = new Set<() => void>(); const pluginNames = new Map(); +const MAX_MESSAGE_WAIT = 1000 * 60 * 5; // 5 minutes + interface PendingPromise { promise: Promise; resolver: (result: any) => void; rejector: (err: any) => void; } -export function loadRemoteNxPlugin( +type NxPluginWorkerCache = Map>; + +const nxPluginWorkerCache: NxPluginWorkerCache = (global[ + 'nxPluginWorkerCache' +] ??= new Map()); + +export async function loadRemoteNxPlugin( plugin: PluginConfiguration, root: string -): [Promise, () => void] { - // this should only really be true when running unit tests within - // the Nx repo. We still need to start the worker in this case, - // but its typescript. - const isWorkerTypescript = path.extname(__filename) === '.ts'; - const workerPath = path.join(__dirname, 'plugin-worker'); +): Promise<[Promise, () => void]> { + const cacheKey = JSON.stringify({ plugin, root }); + if (nxPluginWorkerCache.has(cacheKey)) { + return [nxPluginWorkerCache.get(cacheKey), () => {}]; + } - const env: Record = { - ...process.env, - ...(isWorkerTypescript - ? { - // Ensures that the worker uses the same tsconfig as the main process - TS_NODE_PROJECT: path.join( - __dirname, - '../../../../tsconfig.lib.json' - ), - } - : {}), - }; - - const worker = fork(workerPath, [], { - stdio: ['ignore', 'inherit', 'inherit', 'ipc'], - env, - execArgv: [ - ...process.execArgv, - // If the worker is typescript, we need to register ts-node - ...(isWorkerTypescript ? ['-r', 'ts-node/register'] : []), - ], - }); - worker.send({ type: 'load', payload: { plugin, root } }); - - // logger.verbose(`[plugin-worker] started worker: ${worker.pid}`); + const { worker, socket } = await startPluginWorker(); const pendingPromises = new Map(); @@ -61,24 +52,45 @@ export function loadRemoteNxPlugin( const cleanupFunction = () => { worker.off('exit', exitHandler); + socket.destroy(); shutdownPluginWorker(worker); + nxPluginWorkerCache.delete(cacheKey); }; cleanupFunctions.add(cleanupFunction); - return [ - new Promise((res, rej) => { - worker.on( - 'message', - createWorkerHandler(worker, pendingPromises, res, rej) - ); - worker.on('exit', exitHandler); - }), - () => { - cleanupFunction(); - cleanupFunctions.delete(cleanupFunction); - }, - ]; + const pluginPromise = new Promise((res, rej) => { + sendMessageOverSocket(socket, { + type: 'load', + payload: { plugin, root }, + }); + // logger.verbose(`[plugin-worker] started worker: ${worker.pid}`); + + const loadTimeout = setTimeout(() => { + rej(new Error('Plugin worker timed out when loading plugin:' + plugin)); + }, MAX_MESSAGE_WAIT); + + socket.on( + 'data', + consumeMessagesFromSocket( + createWorkerHandler( + worker, + pendingPromises, + (val) => { + clearTimeout(loadTimeout); + res(val); + }, + rej, + socket + ) + ) + ); + worker.on('exit', exitHandler); + }); + + nxPluginWorkerCache.set(cacheKey, pluginPromise); + + return [pluginPromise, cleanupFunction]; } function shutdownPluginWorker(worker: ChildProcess) { @@ -102,15 +114,20 @@ function createWorkerHandler( worker: ChildProcess, pending: Map, onload: (plugin: LoadedNxPlugin) => void, - onloadError: (err?: unknown) => void + onloadError: (err?: unknown) => void, + socket: Socket ) { let pluginName: string; - return function (message: Serializable) { + let txId = 0; + + return function (raw: string) { + const message = JSON.parse(raw); + if (!isPluginWorkerResult(message)) { return; } - return consumeMessage(message, { + return consumeMessage(socket, message, { 'load-result': (result) => { if (result.success) { const { name, createNodesPattern, include, exclude } = result; @@ -124,9 +141,10 @@ function createWorkerHandler( ? [ createNodesPattern, (configFiles, ctx) => { - const tx = pluginName + ':createNodes:' + performance.now(); + const tx = + pluginName + worker.pid + ':createNodes:' + txId++; return registerPendingPromise(tx, pending, () => { - worker.send({ + sendMessageOverSocket(socket, { type: 'createNodes', payload: { configFiles, context: ctx, tx }, }); @@ -137,9 +155,9 @@ function createWorkerHandler( createDependencies: result.hasCreateDependencies ? (ctx) => { const tx = - pluginName + ':createDependencies:' + performance.now(); + pluginName + worker.pid + ':createDependencies:' + txId++; return registerPendingPromise(tx, pending, () => { - worker.send({ + sendMessageOverSocket(socket, { type: 'createDependencies', payload: { context: ctx, tx }, }); @@ -149,9 +167,9 @@ function createWorkerHandler( processProjectGraph: result.hasProcessProjectGraph ? (graph, ctx) => { const tx = - pluginName + ':processProjectGraph:' + performance.now(); + pluginName + worker.pid + ':processProjectGraph:' + txId++; return registerPendingPromise(tx, pending, () => { - worker.send({ + sendMessageOverSocket(socket, { type: 'processProjectGraph', payload: { graph, ctx, tx }, }); @@ -161,9 +179,9 @@ function createWorkerHandler( createMetadata: result.hasCreateMetadata ? (graph, ctx) => { const tx = - pluginName + ':createMetadata:' + performance.now(); + pluginName + worker.pid + ':createMetadata:' + txId++; return registerPendingPromise(tx, pending, () => { - worker.send({ + sendMessageOverSocket(socket, { type: 'createMetadata', payload: { graph, context: ctx, tx }, }); @@ -228,26 +246,38 @@ function createWorkerExitHandler( }; } -process.on('exit', () => { +let cleanedUp = false; +const exitHandler = () => { + if (cleanedUp) return; for (const fn of cleanupFunctions) { fn(); } -}); + cleanedUp = true; +}; + +process.on('exit', exitHandler); +process.on('SIGINT', exitHandler); +process.on('SIGTERM', exitHandler); function registerPendingPromise( tx: string, pending: Map, callback: () => void ): Promise { - let resolver, rejector; + let resolver, rejector, timeout; const promise = new Promise((res, rej) => { - resolver = res; rejector = rej; + resolver = res; + + timeout = setTimeout(() => { + rej(new Error(`Plugin worker timed out when processing message ${tx}`)); + }, MAX_MESSAGE_WAIT); callback(); }).finally(() => { pending.delete(tx); + clearTimeout(timeout); }); pending.set(tx, { @@ -258,3 +288,81 @@ function registerPendingPromise( return promise; } + +global.nxPluginWorkerCount ??= 0; +async function startPluginWorker() { + // this should only really be true when running unit tests within + // the Nx repo. We still need to start the worker in this case, + // but its typescript. + const isWorkerTypescript = path.extname(__filename) === '.ts'; + const workerPath = path.join(__dirname, 'plugin-worker'); + + const env: Record = { + ...process.env, + ...(isWorkerTypescript + ? { + // Ensures that the worker uses the same tsconfig as the main process + TS_NODE_PROJECT: path.join( + __dirname, + '../../../../tsconfig.lib.json' + ), + } + : {}), + }; + + const ipcPath = getPluginOsSocketPath( + [process.pid, global.nxPluginWorkerCount++].join('-') + ); + + const worker = fork(workerPath, [ipcPath], { + stdio: process.stdout.isTTY ? 'inherit' : 'ignore', + env, + execArgv: [ + ...process.execArgv, + // If the worker is typescript, we need to register ts-node + ...(isWorkerTypescript ? ['-r', 'ts-node/register'] : []), + ], + detached: true, + }); + worker.disconnect(); + worker.unref(); + + let attempts = 0; + return new Promise<{ + worker: ChildProcess; + socket: Socket; + }>((resolve, reject) => { + const id = setInterval(async () => { + const socket = await isServerAvailable(ipcPath); + if (socket) { + socket.unref(); + clearInterval(id); + resolve({ + worker, + socket, + }); + } else if (attempts > 1000) { + // daemon fails to start, the process probably exited + // we print the logs and exit the client + reject('Failed to start plugin worker.'); + } else { + attempts++; + } + }, 10); + }); +} + +function isServerAvailable(ipcPath: string): Promise { + return new Promise((resolve) => { + try { + const socket = connect(ipcPath, () => { + resolve(socket); + }); + socket.once('error', () => { + resolve(false); + }); + } catch (err) { + resolve(false); + } + }); +} diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts index 0f61ab620c..0303021a83 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts @@ -1,8 +1,10 @@ import { consumeMessage, isPluginWorkerMessage } from './messaging'; import { LoadedNxPlugin } from '../internal-api'; import { loadNxPlugin } from '../loader'; -import { Serializable } from 'child_process'; import { createSerializableError } from '../../../utils/serializable-error'; +import { createServer } from 'net'; +import { consumeMessagesFromSocket } from '../../../utils/consume-messages-from-socket'; +import { unlinkSync } from 'fs'; if (process.env.NX_PERF_LOGGING === 'true') { require('../../../utils/perf-logging'); @@ -12,109 +14,134 @@ global.NX_GRAPH_CREATION = true; let plugin: LoadedNxPlugin; -process.on('message', async (message: Serializable) => { - if (!isPluginWorkerMessage(message)) { - return; - } - return consumeMessage(message, { - load: async ({ plugin: pluginConfiguration, root }) => { - process.chdir(root); - try { - const [promise] = loadNxPlugin(pluginConfiguration, root); - plugin = await promise; - return { - type: 'load-result', - payload: { - name: plugin.name, - include: plugin.include, - exclude: plugin.exclude, - createNodesPattern: plugin.createNodes?.[0], - hasCreateDependencies: - 'createDependencies' in plugin && !!plugin.createDependencies, - hasProcessProjectGraph: - 'processProjectGraph' in plugin && !!plugin.processProjectGraph, - hasCreateMetadata: - 'createMetadata' in plugin && !!plugin.createMetadata, - success: true, - }, - }; - } catch (e) { - return { - type: 'load-result', - payload: { - success: false, - error: createSerializableError(e), - }, - }; +const socketPath = process.argv[2]; + +const server = createServer((socket) => { + socket.on( + 'data', + consumeMessagesFromSocket((raw) => { + const message = JSON.parse(raw.toString()); + if (!isPluginWorkerMessage(message)) { + return; } - }, - createNodes: async ({ configFiles, context, tx }) => { - try { - const result = await plugin.createNodes[1](configFiles, context); - return { - type: 'createNodesResult', - payload: { result, success: true, tx }, - }; - } catch (e) { - return { - type: 'createNodesResult', - payload: { - success: false, - error: createSerializableError(e), - tx, - }, - }; - } - }, - createDependencies: async ({ context, tx }) => { - try { - const result = await plugin.createDependencies(context); - return { - type: 'createDependenciesResult', - payload: { dependencies: result, success: true, tx }, - }; - } catch (e) { - return { - type: 'createDependenciesResult', - payload: { - success: false, - error: createSerializableError(e), - tx, - }, - }; - } - }, - processProjectGraph: async ({ graph, ctx, tx }) => { - try { - const result = await plugin.processProjectGraph(graph, ctx); - return { - type: 'processProjectGraphResult', - payload: { graph: result, success: true, tx }, - }; - } catch (e) { - return { - type: 'processProjectGraphResult', - payload: { - success: false, - error: createSerializableError(e), - tx, - }, - }; - } - }, - createMetadata: async ({ graph, context, tx }) => { - try { - const result = await plugin.createMetadata(graph, context); - return { - type: 'createMetadataResult', - payload: { metadata: result, success: true, tx }, - }; - } catch (e) { - return { - type: 'createMetadataResult', - payload: { success: false, error: e.stack, tx }, - }; - } - }, - }); + return consumeMessage(socket, message, { + load: async ({ plugin: pluginConfiguration, root }) => { + process.chdir(root); + try { + const [promise] = loadNxPlugin(pluginConfiguration, root); + plugin = await promise; + return { + type: 'load-result', + payload: { + name: plugin.name, + include: plugin.include, + exclude: plugin.exclude, + createNodesPattern: plugin.createNodes?.[0], + hasCreateDependencies: + 'createDependencies' in plugin && !!plugin.createDependencies, + hasProcessProjectGraph: + 'processProjectGraph' in plugin && + !!plugin.processProjectGraph, + hasCreateMetadata: + 'createMetadata' in plugin && !!plugin.createMetadata, + success: true, + }, + }; + } catch (e) { + return { + type: 'load-result', + payload: { + success: false, + error: createSerializableError(e), + }, + }; + } + }, + createNodes: async ({ configFiles, context, tx }) => { + try { + const result = await plugin.createNodes[1](configFiles, context); + return { + type: 'createNodesResult', + payload: { result, success: true, tx }, + }; + } catch (e) { + return { + type: 'createNodesResult', + payload: { + success: false, + error: createSerializableError(e), + tx, + }, + }; + } + }, + createDependencies: async ({ context, tx }) => { + try { + const result = await plugin.createDependencies(context); + return { + type: 'createDependenciesResult', + payload: { dependencies: result, success: true, tx }, + }; + } catch (e) { + return { + type: 'createDependenciesResult', + payload: { + success: false, + error: createSerializableError(e), + tx, + }, + }; + } + }, + processProjectGraph: async ({ graph, ctx, tx }) => { + try { + const result = await plugin.processProjectGraph(graph, ctx); + return { + type: 'processProjectGraphResult', + payload: { graph: result, success: true, tx }, + }; + } catch (e) { + return { + type: 'processProjectGraphResult', + payload: { + success: false, + error: createSerializableError(e), + tx, + }, + }; + } + }, + createMetadata: async ({ graph, context, tx }) => { + try { + const result = await plugin.createMetadata(graph, context); + return { + type: 'createMetadataResult', + payload: { metadata: result, success: true, tx }, + }; + } catch (e) { + return { + type: 'createMetadataResult', + payload: { success: false, error: e.stack, tx }, + }; + } + }, + }); + }) + ); }); + +server.listen(socketPath); + +const exitHandler = (exitCode: number) => () => { + server.close(); + try { + unlinkSync(socketPath); + } catch (e) {} + process.exit(exitCode); +}; + +const events = ['SIGINT', 'SIGTERM', 'SIGQUIT', 'exit']; + +events.forEach((event) => process.once(event, exitHandler(0))); +process.once('uncaughtException', exitHandler(1)); +process.once('unhandledRejection', exitHandler(1));