fix(core): move plugin worker to socket (#26558)

<!-- Please make sure you have read the submission guidelines before
posting an PR -->
<!--
https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr
-->

<!-- Please make sure that your commit message follows our format -->
<!-- Example: `fix(nx): must begin with lowercase` -->

<!-- If this is a particularly complex change or feature addition, you
can request a dedicated Nx release for this pull request branch. Mention
someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they
will confirm if the PR warrants its own release for testing purposes,
and generate it for you if appropriate. -->

## Current Behavior
Plugin isolation communicates with workers via built-in node IPC with
forked processes. When doing this, the parent process will not exit
until the child process has exited, in case more messages would be sent.
This requires an explicit call to shut down the plugin workers.

We set this up as a `process.on('exit')` listener, to shutdown the
workers whenever the main Nx process dies. This is "fine", but requires
explicit calls to `process.exit` as node won't exit on its own
otherwise.

## Expected Behavior
To allow plugin workers to clean themselves up on exit, but not require
explicit `process.exit` calls, we need to detach them from the main
process and call `unref`. This only works when IPC is not being used. As
such, we need a different way to communicate with the worker.

This PR updates the communication method to mirror the daemon, and
communicate over a socket. Additionally, this PR enables isolation
during the Nx repo's E2E tests.

## Related Issue(s)
<!-- Please link the issue being fixed so it gets closed when this is
merged. -->

Fixes #
This commit is contained in:
Craigory Coppola 2024-06-26 17:31:10 -04:00 committed by GitHub
parent 24cc86b96f
commit a0e8f83672
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 332 additions and 196 deletions

1
.env Normal file
View File

@ -0,0 +1 @@
NX_ISOLATE_PLUGINS=true

2
.gitignore vendored
View File

@ -37,7 +37,7 @@ out
.angular
# Local dev files
.env
.env.local
.bashrc
.nx

View File

@ -160,7 +160,7 @@ export function getStrippedEnvironmentVariables() {
return true;
}
const allowedKeys = ['NX_ADD_PLUGINS'];
const allowedKeys = ['NX_ADD_PLUGINS', 'NX_ISOLATE_PLUGINS'];
if (key.startsWith('NX_') && !allowedKeys.includes(key)) {
return false;

View File

@ -212,6 +212,6 @@
"nxCloudUrl": "https://staging.nx.app",
"parallel": 1,
"cacheDirectory": "/tmp/nx-cache",
"bust": 5,
"bust": 7,
"defaultBase": "master"
}

View File

@ -37,7 +37,7 @@ export const yargsNxInfixCommand: CommandModule = {
command: '$0 <target> [project] [_..]',
describe: 'Run a target for a project',
handler: async (args) => {
await handleErrors(
const exitCode = await handleErrors(
(args.verbose as boolean) ?? process.env.NX_VERBOSE_LOGGING === 'true',
async () => {
return (await import('./run-one')).runOne(
@ -46,5 +46,6 @@ export const yargsNxInfixCommand: CommandModule = {
);
}
);
process.exit(exitCode);
},
};

View File

@ -22,6 +22,11 @@ export const getForkedProcessOsSocketPath = (id: string) => {
return isWindows ? '\\\\.\\pipe\\nx\\' + resolve(path) : resolve(path);
};
export const getPluginOsSocketPath = (id: string) => {
let path = resolve(join(getSocketDir(), 'plugin' + id + '.sock'));
return isWindows ? '\\\\.\\pipe\\nx\\' + resolve(path) : resolve(path);
};
export function killSocketOrPath(): void {
try {
unlinkSync(getFullOsSocketPath());

View File

@ -296,7 +296,7 @@ describe('getTouchedNpmPackages', () => {
});
it('should handle and log workspace package.json changes when the changes are not in `npmPackages` (projectGraph.externalNodes)', () => {
jest.spyOn(logger, 'warn');
jest.spyOn(logger, 'warn').mockImplementation(() => {});
expect(() => {
getTouchedNpmPackages(
[

View File

@ -159,7 +159,7 @@ export async function loadNxPlugins(
const cleanupFunctions: Array<() => void> = [];
for (const plugin of plugins) {
const [loadedPluginPromise, cleanup] = loadingMethod(plugin, root);
const [loadedPluginPromise, cleanup] = await loadingMethod(plugin, root);
result.push(loadedPluginPromise);
cleanupFunctions.push(cleanup);
}

View File

@ -3,33 +3,15 @@ import { PluginConfiguration } from '../../../config/nx-json';
import { LoadedNxPlugin } from '../internal-api';
import { loadRemoteNxPlugin } from './plugin-pool';
/**
* Used to ensure 1 plugin : 1 worker
*/
const remotePluginCache = new Map<
string,
readonly [Promise<LoadedNxPlugin>, () => void]
>();
export function loadNxPluginInIsolation(
export async function loadNxPluginInIsolation(
plugin: PluginConfiguration,
root = workspaceRoot
): readonly [Promise<LoadedNxPlugin>, () => void] {
const cacheKey = JSON.stringify(plugin);
if (remotePluginCache.has(cacheKey)) {
return remotePluginCache.get(cacheKey);
}
const [loadingPlugin, cleanup] = loadRemoteNxPlugin(plugin, root);
// We clean up plugin workers when Nx process completes.
const val = [
): Promise<readonly [Promise<LoadedNxPlugin>, () => void]> {
const [loadingPlugin, cleanup] = await loadRemoteNxPlugin(plugin, root);
return [
loadingPlugin,
() => {
cleanup();
remotePluginCache.delete(cacheKey);
},
] as const;
remotePluginCache.set(cacheKey, val);
return val;
}

View File

@ -7,9 +7,11 @@ import {
CreateDependenciesContext,
CreateMetadataContext,
CreateNodesContext,
CreateNodesContextV2,
} from '../public-api';
import { LoadedNxPlugin } from '../internal-api';
import { Serializable } from 'child_process';
import { Socket } from 'net';
export interface PluginWorkerLoadMessage {
type: 'load';
@ -42,7 +44,7 @@ export interface PluginWorkerCreateNodesMessage {
type: 'createNodes';
payload: {
configFiles: string[];
context: CreateNodesContext;
context: CreateNodesContextV2;
tx: string;
};
}
@ -159,6 +161,7 @@ export function isPluginWorkerMessage(
'createNodes',
'createDependencies',
'processProjectGraph',
'createMetadata',
].includes(message.type)
);
}
@ -175,6 +178,7 @@ export function isPluginWorkerResult(
'createNodesResult',
'createDependenciesResult',
'processProjectGraphResult',
'createMetadataResult',
].includes(message.type)
);
}
@ -192,6 +196,7 @@ type MessageHandlerReturn<T extends PluginWorkerMessage | PluginWorkerResult> =
export async function consumeMessage<
T extends PluginWorkerMessage | PluginWorkerResult
>(
socket: Socket,
raw: T,
handlers: {
[K in T['type']]: (
@ -205,7 +210,14 @@ export async function consumeMessage<
if (handler) {
const response = await handler(message.payload);
if (response) {
process.send!(response);
sendMessageOverSocket(socket, response);
}
}
}
export function sendMessageOverSocket(
socket: Socket,
message: PluginWorkerMessage | PluginWorkerResult
) {
socket.write(JSON.stringify(message) + String.fromCodePoint(4));
}

View File

@ -7,53 +7,44 @@ import { PluginConfiguration } from '../../../config/nx-json';
// import { logger } from '../../utils/logger';
import { LoadedNxPlugin, nxPluginCache } from '../internal-api';
import { consumeMessage, isPluginWorkerResult } from './messaging';
import { getPluginOsSocketPath } from '../../../daemon/socket-utils';
import { consumeMessagesFromSocket } from '../../../utils/consume-messages-from-socket';
import {
consumeMessage,
isPluginWorkerResult,
sendMessageOverSocket,
} from './messaging';
import { Socket, connect } from 'net';
const cleanupFunctions = new Set<() => void>();
const pluginNames = new Map<ChildProcess, string>();
const MAX_MESSAGE_WAIT = 1000 * 60 * 5; // 5 minutes
interface PendingPromise {
promise: Promise<unknown>;
resolver: (result: any) => void;
rejector: (err: any) => void;
}
export function loadRemoteNxPlugin(
type NxPluginWorkerCache = Map<string, Promise<LoadedNxPlugin>>;
const nxPluginWorkerCache: NxPluginWorkerCache = (global[
'nxPluginWorkerCache'
] ??= new Map());
export async function loadRemoteNxPlugin(
plugin: PluginConfiguration,
root: string
): [Promise<LoadedNxPlugin>, () => void] {
// this should only really be true when running unit tests within
// the Nx repo. We still need to start the worker in this case,
// but its typescript.
const isWorkerTypescript = path.extname(__filename) === '.ts';
const workerPath = path.join(__dirname, 'plugin-worker');
const env: Record<string, string> = {
...process.env,
...(isWorkerTypescript
? {
// Ensures that the worker uses the same tsconfig as the main process
TS_NODE_PROJECT: path.join(
__dirname,
'../../../../tsconfig.lib.json'
),
): Promise<[Promise<LoadedNxPlugin>, () => void]> {
const cacheKey = JSON.stringify({ plugin, root });
if (nxPluginWorkerCache.has(cacheKey)) {
return [nxPluginWorkerCache.get(cacheKey), () => {}];
}
: {}),
};
const worker = fork(workerPath, [], {
stdio: ['ignore', 'inherit', 'inherit', 'ipc'],
env,
execArgv: [
...process.execArgv,
// If the worker is typescript, we need to register ts-node
...(isWorkerTypescript ? ['-r', 'ts-node/register'] : []),
],
});
worker.send({ type: 'load', payload: { plugin, root } });
// logger.verbose(`[plugin-worker] started worker: ${worker.pid}`);
const { worker, socket } = await startPluginWorker();
const pendingPromises = new Map<string, PendingPromise>();
@ -61,24 +52,45 @@ export function loadRemoteNxPlugin(
const cleanupFunction = () => {
worker.off('exit', exitHandler);
socket.destroy();
shutdownPluginWorker(worker);
nxPluginWorkerCache.delete(cacheKey);
};
cleanupFunctions.add(cleanupFunction);
return [
new Promise<LoadedNxPlugin>((res, rej) => {
worker.on(
'message',
createWorkerHandler(worker, pendingPromises, res, rej)
const pluginPromise = new Promise<LoadedNxPlugin>((res, rej) => {
sendMessageOverSocket(socket, {
type: 'load',
payload: { plugin, root },
});
// logger.verbose(`[plugin-worker] started worker: ${worker.pid}`);
const loadTimeout = setTimeout(() => {
rej(new Error('Plugin worker timed out when loading plugin:' + plugin));
}, MAX_MESSAGE_WAIT);
socket.on(
'data',
consumeMessagesFromSocket(
createWorkerHandler(
worker,
pendingPromises,
(val) => {
clearTimeout(loadTimeout);
res(val);
},
rej,
socket
)
)
);
worker.on('exit', exitHandler);
}),
() => {
cleanupFunction();
cleanupFunctions.delete(cleanupFunction);
},
];
});
nxPluginWorkerCache.set(cacheKey, pluginPromise);
return [pluginPromise, cleanupFunction];
}
function shutdownPluginWorker(worker: ChildProcess) {
@ -102,15 +114,20 @@ function createWorkerHandler(
worker: ChildProcess,
pending: Map<string, PendingPromise>,
onload: (plugin: LoadedNxPlugin) => void,
onloadError: (err?: unknown) => void
onloadError: (err?: unknown) => void,
socket: Socket
) {
let pluginName: string;
return function (message: Serializable) {
let txId = 0;
return function (raw: string) {
const message = JSON.parse(raw);
if (!isPluginWorkerResult(message)) {
return;
}
return consumeMessage(message, {
return consumeMessage(socket, message, {
'load-result': (result) => {
if (result.success) {
const { name, createNodesPattern, include, exclude } = result;
@ -124,9 +141,10 @@ function createWorkerHandler(
? [
createNodesPattern,
(configFiles, ctx) => {
const tx = pluginName + ':createNodes:' + performance.now();
const tx =
pluginName + worker.pid + ':createNodes:' + txId++;
return registerPendingPromise(tx, pending, () => {
worker.send({
sendMessageOverSocket(socket, {
type: 'createNodes',
payload: { configFiles, context: ctx, tx },
});
@ -137,9 +155,9 @@ function createWorkerHandler(
createDependencies: result.hasCreateDependencies
? (ctx) => {
const tx =
pluginName + ':createDependencies:' + performance.now();
pluginName + worker.pid + ':createDependencies:' + txId++;
return registerPendingPromise(tx, pending, () => {
worker.send({
sendMessageOverSocket(socket, {
type: 'createDependencies',
payload: { context: ctx, tx },
});
@ -149,9 +167,9 @@ function createWorkerHandler(
processProjectGraph: result.hasProcessProjectGraph
? (graph, ctx) => {
const tx =
pluginName + ':processProjectGraph:' + performance.now();
pluginName + worker.pid + ':processProjectGraph:' + txId++;
return registerPendingPromise(tx, pending, () => {
worker.send({
sendMessageOverSocket(socket, {
type: 'processProjectGraph',
payload: { graph, ctx, tx },
});
@ -161,9 +179,9 @@ function createWorkerHandler(
createMetadata: result.hasCreateMetadata
? (graph, ctx) => {
const tx =
pluginName + ':createMetadata:' + performance.now();
pluginName + worker.pid + ':createMetadata:' + txId++;
return registerPendingPromise(tx, pending, () => {
worker.send({
sendMessageOverSocket(socket, {
type: 'createMetadata',
payload: { graph, context: ctx, tx },
});
@ -228,26 +246,38 @@ function createWorkerExitHandler(
};
}
process.on('exit', () => {
let cleanedUp = false;
const exitHandler = () => {
if (cleanedUp) return;
for (const fn of cleanupFunctions) {
fn();
}
});
cleanedUp = true;
};
process.on('exit', exitHandler);
process.on('SIGINT', exitHandler);
process.on('SIGTERM', exitHandler);
function registerPendingPromise(
tx: string,
pending: Map<string, PendingPromise>,
callback: () => void
): Promise<any> {
let resolver, rejector;
let resolver, rejector, timeout;
const promise = new Promise((res, rej) => {
resolver = res;
rejector = rej;
resolver = res;
timeout = setTimeout(() => {
rej(new Error(`Plugin worker timed out when processing message ${tx}`));
}, MAX_MESSAGE_WAIT);
callback();
}).finally(() => {
pending.delete(tx);
clearTimeout(timeout);
});
pending.set(tx, {
@ -258,3 +288,81 @@ function registerPendingPromise(
return promise;
}
global.nxPluginWorkerCount ??= 0;
async function startPluginWorker() {
// this should only really be true when running unit tests within
// the Nx repo. We still need to start the worker in this case,
// but its typescript.
const isWorkerTypescript = path.extname(__filename) === '.ts';
const workerPath = path.join(__dirname, 'plugin-worker');
const env: Record<string, string> = {
...process.env,
...(isWorkerTypescript
? {
// Ensures that the worker uses the same tsconfig as the main process
TS_NODE_PROJECT: path.join(
__dirname,
'../../../../tsconfig.lib.json'
),
}
: {}),
};
const ipcPath = getPluginOsSocketPath(
[process.pid, global.nxPluginWorkerCount++].join('-')
);
const worker = fork(workerPath, [ipcPath], {
stdio: process.stdout.isTTY ? 'inherit' : 'ignore',
env,
execArgv: [
...process.execArgv,
// If the worker is typescript, we need to register ts-node
...(isWorkerTypescript ? ['-r', 'ts-node/register'] : []),
],
detached: true,
});
worker.disconnect();
worker.unref();
let attempts = 0;
return new Promise<{
worker: ChildProcess;
socket: Socket;
}>((resolve, reject) => {
const id = setInterval(async () => {
const socket = await isServerAvailable(ipcPath);
if (socket) {
socket.unref();
clearInterval(id);
resolve({
worker,
socket,
});
} else if (attempts > 1000) {
// daemon fails to start, the process probably exited
// we print the logs and exit the client
reject('Failed to start plugin worker.');
} else {
attempts++;
}
}, 10);
});
}
function isServerAvailable(ipcPath: string): Promise<Socket | false> {
return new Promise((resolve) => {
try {
const socket = connect(ipcPath, () => {
resolve(socket);
});
socket.once('error', () => {
resolve(false);
});
} catch (err) {
resolve(false);
}
});
}

View File

@ -1,8 +1,10 @@
import { consumeMessage, isPluginWorkerMessage } from './messaging';
import { LoadedNxPlugin } from '../internal-api';
import { loadNxPlugin } from '../loader';
import { Serializable } from 'child_process';
import { createSerializableError } from '../../../utils/serializable-error';
import { createServer } from 'net';
import { consumeMessagesFromSocket } from '../../../utils/consume-messages-from-socket';
import { unlinkSync } from 'fs';
if (process.env.NX_PERF_LOGGING === 'true') {
require('../../../utils/perf-logging');
@ -12,11 +14,17 @@ global.NX_GRAPH_CREATION = true;
let plugin: LoadedNxPlugin;
process.on('message', async (message: Serializable) => {
const socketPath = process.argv[2];
const server = createServer((socket) => {
socket.on(
'data',
consumeMessagesFromSocket((raw) => {
const message = JSON.parse(raw.toString());
if (!isPluginWorkerMessage(message)) {
return;
}
return consumeMessage(message, {
return consumeMessage(socket, message, {
load: async ({ plugin: pluginConfiguration, root }) => {
process.chdir(root);
try {
@ -32,7 +40,8 @@ process.on('message', async (message: Serializable) => {
hasCreateDependencies:
'createDependencies' in plugin && !!plugin.createDependencies,
hasProcessProjectGraph:
'processProjectGraph' in plugin && !!plugin.processProjectGraph,
'processProjectGraph' in plugin &&
!!plugin.processProjectGraph,
hasCreateMetadata:
'createMetadata' in plugin && !!plugin.createMetadata,
success: true,
@ -117,4 +126,22 @@ process.on('message', async (message: Serializable) => {
}
},
});
})
);
});
server.listen(socketPath);
const exitHandler = (exitCode: number) => () => {
server.close();
try {
unlinkSync(socketPath);
} catch (e) {}
process.exit(exitCode);
};
const events = ['SIGINT', 'SIGTERM', 'SIGQUIT', 'exit'];
events.forEach((event) => process.once(event, exitHandler(0)));
process.once('uncaughtException', exitHandler(1));
process.once('unhandledRejection', exitHandler(1));