From f326bfe52ec7aead9d4967a927682f67f3de7fc1 Mon Sep 17 00:00:00 2001 From: Jason Jean Date: Tue, 4 Feb 2025 18:02:24 -0500 Subject: [PATCH] feat(core): introduce continuous tasks (#29750) An RFC about this feature is happening here: #29025. This has the most information about this feature. Nx currently does not explicitly handle tasks which run continuously until they are terminated. This PR adds the initial support for continuous tasks which run continuously until they are terminated. This adds the ability to depend on continuous tasks. There is some more work to be done but this will be enough as an MVP. Fixes # --- docs/generated/devkit/TargetConfiguration.md | 9 + docs/generated/devkit/Task.md | 9 + docs/generated/devkit/TaskGraph.md | 7 + .../js/src/utils/buildable-libs-utils.spec.ts | 16 + packages/nx/package.json | 1 + packages/nx/src/command-line/graph/graph.ts | 2 + .../commands-runner/get-command-projects.ts | 33 +- packages/nx/src/config/task-graph.ts | 7 + .../src/config/workspace-json-project-json.ts | 5 + .../run-commands/run-commands.impl.ts | 438 ++------------- .../executors/run-commands/running-tasks.ts | 519 ++++++++++++++++++ packages/nx/src/native/index.d.ts | 1 + .../native/pseudo_terminal/child_process.rs | 55 +- .../native/pseudo_terminal/pseudo_terminal.rs | 2 +- .../tasks-runner/create-task-graph.spec.ts | 295 ++++++++++ .../nx/src/tasks-runner/create-task-graph.ts | 52 +- .../forked-process-task-runner.ts | 470 +++++----------- .../nx/src/tasks-runner/init-tasks-runner.ts | 4 + .../life-cycles/formatting-utils.spec.ts | 21 + .../src/tasks-runner/pseudo-terminal.spec.ts | 37 +- .../nx/src/tasks-runner/pseudo-terminal.ts | 39 +- .../running-tasks/batch-process.ts | 84 +++ .../running-tasks/node-child-process.ts | 215 ++++++++ .../running-tasks/noop-child-process.ts | 20 + .../running-tasks/running-task.ts | 7 + .../nx/src/tasks-runner/task-orchestrator.ts | 323 +++++++---- .../src/tasks-runner/tasks-schedule.spec.ts | 23 + .../nx/src/tasks-runner/tasks-schedule.ts | 9 +- packages/nx/src/tasks-runner/utils.ts | 23 +- 29 files changed, 1856 insertions(+), 870 deletions(-) create mode 100644 packages/nx/src/executors/run-commands/running-tasks.ts create mode 100644 packages/nx/src/tasks-runner/running-tasks/batch-process.ts create mode 100644 packages/nx/src/tasks-runner/running-tasks/node-child-process.ts create mode 100644 packages/nx/src/tasks-runner/running-tasks/noop-child-process.ts create mode 100644 packages/nx/src/tasks-runner/running-tasks/running-task.ts diff --git a/docs/generated/devkit/TargetConfiguration.md b/docs/generated/devkit/TargetConfiguration.md index 05bb7fdac4..1760db6290 100644 --- a/docs/generated/devkit/TargetConfiguration.md +++ b/docs/generated/devkit/TargetConfiguration.md @@ -15,6 +15,7 @@ Target's configuration - [cache](../../devkit/documents/TargetConfiguration#cache): boolean - [command](../../devkit/documents/TargetConfiguration#command): string - [configurations](../../devkit/documents/TargetConfiguration#configurations): Object +- [continuous](../../devkit/documents/TargetConfiguration#continuous): boolean - [defaultConfiguration](../../devkit/documents/TargetConfiguration#defaultconfiguration): string - [dependsOn](../../devkit/documents/TargetConfiguration#dependson): (string | TargetDependencyConfig)[] - [executor](../../devkit/documents/TargetConfiguration#executor): string @@ -55,6 +56,14 @@ Sets of options --- +### continuous + +• `Optional` **continuous**: `boolean` + +Whether this target runs continuously + +--- + ### defaultConfiguration • `Optional` **defaultConfiguration**: `string` diff --git a/docs/generated/devkit/Task.md b/docs/generated/devkit/Task.md index 10f6d600f8..78fa965f41 100644 --- a/docs/generated/devkit/Task.md +++ b/docs/generated/devkit/Task.md @@ -7,6 +7,7 @@ A representation of the invocation of an Executor ### Properties - [cache](../../devkit/documents/Task#cache): boolean +- [continuous](../../devkit/documents/Task#continuous): boolean - [endTime](../../devkit/documents/Task#endtime): number - [hash](../../devkit/documents/Task#hash): string - [hashDetails](../../devkit/documents/Task#hashdetails): Object @@ -28,6 +29,14 @@ Determines if a given task should be cacheable. --- +### continuous + +• `Optional` **continuous**: `boolean` + +This denotes if the task runs continuously + +--- + ### endTime • `Optional` **endTime**: `number` diff --git a/docs/generated/devkit/TaskGraph.md b/docs/generated/devkit/TaskGraph.md index 35def5738b..3a40554e16 100644 --- a/docs/generated/devkit/TaskGraph.md +++ b/docs/generated/devkit/TaskGraph.md @@ -6,12 +6,19 @@ Graph of Tasks to be executed ### Properties +- [continuousDependencies](../../devkit/documents/TaskGraph#continuousdependencies): Record - [dependencies](../../devkit/documents/TaskGraph#dependencies): Record - [roots](../../devkit/documents/TaskGraph#roots): string[] - [tasks](../../devkit/documents/TaskGraph#tasks): Record ## Properties +### continuousDependencies + +• **continuousDependencies**: `Record`\<`string`, `string`[]\> + +--- + ### dependencies • **dependencies**: `Record`\<`string`, `string`[]\> diff --git a/packages/js/src/utils/buildable-libs-utils.spec.ts b/packages/js/src/utils/buildable-libs-utils.spec.ts index e3eb80c343..fb2de4f805 100644 --- a/packages/js/src/utils/buildable-libs-utils.spec.ts +++ b/packages/js/src/utils/buildable-libs-utils.spec.ts @@ -429,6 +429,7 @@ describe('calculateDependenciesFromTaskGraph', () => { 'lib3:build': [], 'lib4:build': [], }, + continuousDependencies: {}, roots: [], tasks: { 'lib1:build': { @@ -437,6 +438,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib1', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib2:build': { id: 'lib2:build', @@ -444,6 +446,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib2', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib2:build-base': { id: 'lib2:build-base', @@ -451,6 +454,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib2', target: 'build-base' }, outputs: [], parallelism: true, + continuous: false, }, 'lib3:build': { id: 'lib3:build', @@ -458,6 +462,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib3', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib4:build': { id: 'lib4:build', @@ -465,6 +470,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib4', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, }, }; @@ -604,6 +610,7 @@ describe('calculateDependenciesFromTaskGraph', () => { 'lib4:build': ['lib4:build-base'], 'lib4:build-base': [], }, + continuousDependencies: {}, roots: [], tasks: { 'lib1:build': { @@ -612,6 +619,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib1', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib1:build-base': { id: 'lib1:build-base', @@ -619,6 +627,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib1', target: 'build-base' }, outputs: [], parallelism: true, + continuous: false, }, 'lib2:build': { id: 'lib2:build', @@ -626,6 +635,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib2', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib2:build-base': { id: 'lib2:build-base', @@ -633,6 +643,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib2', target: 'build-base' }, outputs: [], parallelism: true, + continuous: false, }, 'lib3:build': { id: 'lib3:build', @@ -640,6 +651,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib3', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib3:build-base': { id: 'lib3:build-base', @@ -647,6 +659,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib3', target: 'build-base' }, outputs: [], parallelism: true, + continuous: false, }, 'lib4:build': { id: 'lib4:build', @@ -654,6 +667,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib4', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib4:build-base': { id: 'lib4:build-base', @@ -661,6 +675,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib4', target: 'build-base' }, outputs: [], parallelism: true, + continuous: false, }, }, }; @@ -752,6 +767,7 @@ describe('calculateDependenciesFromTaskGraph', () => { // not relevant for this test case const taskGraph: TaskGraph = { dependencies: {}, + continuousDependencies: {}, roots: [], tasks: {}, }; diff --git a/packages/nx/package.json b/packages/nx/package.json index 81c6d43f23..eae072dfcd 100644 --- a/packages/nx/package.json +++ b/packages/nx/package.json @@ -66,6 +66,7 @@ "string-width": "^4.2.3", "tar-stream": "~2.2.0", "tmp": "~0.2.1", + "tree-kill": "^1.2.2", "tsconfig-paths": "^4.1.2", "tslib": "^2.3.0", "yaml": "^2.6.0", diff --git a/packages/nx/src/command-line/graph/graph.ts b/packages/nx/src/command-line/graph/graph.ts index e355d7d137..5e172ab617 100644 --- a/packages/nx/src/command-line/graph/graph.ts +++ b/packages/nx/src/command-line/graph/graph.ts @@ -980,6 +980,7 @@ function getAllTaskGraphsForWorkspace(projectGraph: ProjectGraph): { taskGraphs[taskId] = { tasks: {}, dependencies: {}, + continuousDependencies: {}, roots: [], }; @@ -1006,6 +1007,7 @@ function getAllTaskGraphsForWorkspace(projectGraph: ProjectGraph): { taskGraphs[taskId] = { tasks: {}, dependencies: {}, + continuousDependencies: {}, roots: [], }; diff --git a/packages/nx/src/commands-runner/get-command-projects.ts b/packages/nx/src/commands-runner/get-command-projects.ts index 3d9c669e85..aaacb863f5 100644 --- a/packages/nx/src/commands-runner/get-command-projects.ts +++ b/packages/nx/src/commands-runner/get-command-projects.ts @@ -1,5 +1,4 @@ import { ProjectGraph, ProjectGraphProjectNode } from '../config/project-graph'; -import { removeIdsFromGraph } from '../tasks-runner/utils'; import { NxArgs } from '../utils/command-line-utils'; import { CommandGraph } from './command-graph'; import { createCommandGraph } from './create-command-graph'; @@ -34,3 +33,35 @@ function getSortedProjects( return getSortedProjects(newGraph, sortedProjects); } + +function removeIdsFromGraph( + graph: { + roots: string[]; + dependencies: Record; + }, + ids: string[], + mapWithIds: Record +): { + mapWithIds: Record; + roots: string[]; + dependencies: Record; +} { + const filteredMapWithIds = {}; + const dependencies = {}; + const removedSet = new Set(ids); + for (let id of Object.keys(mapWithIds)) { + if (!removedSet.has(id)) { + filteredMapWithIds[id] = mapWithIds[id]; + dependencies[id] = graph.dependencies[id].filter( + (depId) => !removedSet.has(depId) + ); + } + } + return { + mapWithIds: filteredMapWithIds, + dependencies: dependencies, + roots: Object.keys(dependencies).filter( + (k) => dependencies[k].length === 0 + ), + }; +} diff --git a/packages/nx/src/config/task-graph.ts b/packages/nx/src/config/task-graph.ts index 9829af409e..e65f7d3f74 100644 --- a/packages/nx/src/config/task-graph.ts +++ b/packages/nx/src/config/task-graph.ts @@ -81,6 +81,11 @@ export interface Task { * Determines if a given task should be parallelizable. */ parallelism: boolean; + + /** + * This denotes if the task runs continuously + */ + continuous?: boolean; } /** @@ -99,4 +104,6 @@ export interface TaskGraph { * Map of Task IDs to IDs of tasks which the task depends on */ dependencies: Record; + + continuousDependencies: Record; } diff --git a/packages/nx/src/config/workspace-json-project-json.ts b/packages/nx/src/config/workspace-json-project-json.ts index 951852e782..0b2692c76e 100644 --- a/packages/nx/src/config/workspace-json-project-json.ts +++ b/packages/nx/src/config/workspace-json-project-json.ts @@ -269,6 +269,11 @@ export interface TargetConfiguration { */ parallelism?: boolean; + /** + * Whether this target runs continuously + */ + continuous?: boolean; + /** * List of generators to run before the target to ensure the workspace * is up to date. diff --git a/packages/nx/src/executors/run-commands/run-commands.impl.ts b/packages/nx/src/executors/run-commands/run-commands.impl.ts index 4df466e46f..98b22fa040 100644 --- a/packages/nx/src/executors/run-commands/run-commands.impl.ts +++ b/packages/nx/src/executors/run-commands/run-commands.impl.ts @@ -1,54 +1,35 @@ -import { ChildProcess, exec, Serializable } from 'child_process'; -import * as path from 'path'; +import { Serializable } from 'child_process'; import * as yargsParser from 'yargs-parser'; -import { env as appendLocalEnv } from 'npm-run-path'; import { ExecutorContext } from '../../config/misc-interfaces'; -import * as chalk from 'chalk'; import { getPseudoTerminal, PseudoTerminal, - PseudoTtyProcess, } from '../../tasks-runner/pseudo-terminal'; import { signalToCode } from '../../utils/exit-codes'; -import { - loadAndExpandDotEnvFile, - unloadDotEnvFile, -} from '../../tasks-runner/task-env'; +import { ParallelRunningTasks, SeriallyRunningTasks } from './running-tasks'; export const LARGE_BUFFER = 1024 * 1000000; -let pseudoTerminal: PseudoTerminal | null; -const childProcesses = new Set(); - -function loadEnvVarsFile(path: string, env: Record = {}) { - unloadDotEnvFile(path, env); - const result = loadAndExpandDotEnvFile(path, env); - if (result.error) { - throw result.error; - } -} - export type Json = { [k: string]: any; }; +export interface RunCommandsCommandOptions { + command: string; + forwardAllArgs?: boolean; + /** + * description was added to allow users to document their commands inline, + * it is not intended to be used as part of the execution of the command. + */ + description?: string; + prefix?: string; + prefixColor?: string; + color?: string; + bgColor?: string; +} + export interface RunCommandsOptions extends Json { command?: string | string[]; - commands?: ( - | { - command: string; - forwardAllArgs?: boolean; - /** - * description was added to allow users to document their commands inline, - * it is not intended to be used as part of the execution of the command. - */ - description?: string; - prefix?: string; - prefixColor?: string; - color?: string; - bgColor?: string; - } - | string - )[]; + commands?: Array; color?: boolean; parallel?: boolean; readyWhen?: string | string[]; @@ -108,7 +89,18 @@ export default async function ( success: boolean; terminalOutput: string; }> { - registerProcessListener(); + const task = await runCommands(options, context); + const results = await task.getResults(); + return { + ...results, + success: results.code === 0, + }; +} + +export async function runCommands( + options: RunCommandsOptions, + context: ExecutorContext +) { const normalized = normalizeOptions(options); if (normalized.readyWhenStatus.length && !normalized.parallel) { @@ -128,11 +120,18 @@ export default async function ( ); } + const pseudoTerminal = + !options.parallel && PseudoTerminal.isSupported() + ? getPseudoTerminal() + : null; + try { - const result = options.parallel - ? await runInParallel(normalized, context) - : await runSerially(normalized, context); - return result; + const runningTask = options.parallel + ? new ParallelRunningTasks(normalized, context) + : new SeriallyRunningTasks(normalized, context, pseudoTerminal); + + registerProcessListener(runningTask, pseudoTerminal); + return runningTask; } catch (e) { if (process.env.NX_VERBOSE_LOGGING === 'true') { console.error(e); @@ -143,77 +142,6 @@ export default async function ( } } -async function runInParallel( - options: NormalizedRunCommandsOptions, - context: ExecutorContext -): Promise<{ success: boolean; terminalOutput: string }> { - const procs = options.commands.map((c) => - createProcess( - null, - c, - options.readyWhenStatus, - options.color, - calculateCwd(options.cwd, context), - options.env ?? {}, - true, - options.usePty, - options.streamOutput, - options.tty, - options.envFile - ).then((result: { success: boolean; terminalOutput: string }) => ({ - result, - command: c.command, - })) - ); - - let terminalOutput = ''; - if (options.readyWhenStatus.length) { - const r: { - result: { success: boolean; terminalOutput: string }; - command: string; - } = await Promise.race(procs); - terminalOutput += r.result.terminalOutput; - if (!r.result.success) { - const output = `Warning: command "${r.command}" exited with non-zero status code`; - terminalOutput += output; - if (options.streamOutput) { - process.stderr.write(output); - } - return { success: false, terminalOutput }; - } else { - return { success: true, terminalOutput }; - } - } else { - const r: { - result: { success: boolean; terminalOutput: string }; - command: string; - }[] = await Promise.all(procs); - terminalOutput += r.map((f) => f.result.terminalOutput).join(''); - const failed = r.filter((v) => !v.result.success); - if (failed.length > 0) { - const output = failed - .map( - (f) => - `Warning: command "${f.command}" exited with non-zero status code` - ) - .join('\r\n'); - terminalOutput += output; - if (options.streamOutput) { - process.stderr.write(output); - } - return { - success: false, - terminalOutput, - }; - } else { - return { - success: true, - terminalOutput, - }; - } - } -} - function normalizeOptions( options: RunCommandsOptions ): NormalizedRunCommandsOptions { @@ -279,241 +207,6 @@ function normalizeOptions( return options as NormalizedRunCommandsOptions; } -async function runSerially( - options: NormalizedRunCommandsOptions, - context: ExecutorContext -): Promise<{ success: boolean; terminalOutput: string }> { - pseudoTerminal ??= PseudoTerminal.isSupported() ? getPseudoTerminal() : null; - let terminalOutput = ''; - for (const c of options.commands) { - const result: { success: boolean; terminalOutput: string } = - await createProcess( - pseudoTerminal, - c, - [], - options.color, - calculateCwd(options.cwd, context), - options.processEnv ?? options.env ?? {}, - false, - options.usePty, - options.streamOutput, - options.tty, - options.envFile - ); - terminalOutput += result.terminalOutput; - if (!result.success) { - const output = `Warning: command "${c.command}" exited with non-zero status code`; - result.terminalOutput += output; - if (options.streamOutput) { - process.stderr.write(output); - } - return { success: false, terminalOutput }; - } - } - return { success: true, terminalOutput }; -} - -async function createProcess( - pseudoTerminal: PseudoTerminal | null, - commandConfig: { - command: string; - color?: string; - bgColor?: string; - prefix?: string; - prefixColor?: string; - }, - readyWhenStatus: { stringToMatch: string; found: boolean }[] = [], - color: boolean, - cwd: string, - env: Record, - isParallel: boolean, - usePty: boolean = true, - streamOutput: boolean = true, - tty: boolean, - envFile?: string -): Promise<{ success: boolean; terminalOutput: string }> { - env = processEnv(color, cwd, env, envFile); - // The rust runCommand is always a tty, so it will not look nice in parallel and if we need prefixes - // currently does not work properly in windows - if ( - pseudoTerminal && - process.env.NX_NATIVE_COMMAND_RUNNER !== 'false' && - !commandConfig.prefix && - readyWhenStatus.length === 0 && - !isParallel && - usePty - ) { - let terminalOutput = chalk.dim('> ') + commandConfig.command + '\r\n\r\n'; - if (streamOutput) { - process.stdout.write(terminalOutput); - } - - const cp = pseudoTerminal.runCommand(commandConfig.command, { - cwd, - jsEnv: env, - quiet: !streamOutput, - tty, - }); - - childProcesses.add(cp); - - return new Promise((res) => { - cp.onOutput((output) => { - terminalOutput += output; - }); - - cp.onExit((code) => { - if (code >= 128) { - process.exit(code); - } else { - res({ success: code === 0, terminalOutput }); - } - }); - }); - } - - return nodeProcess(commandConfig, cwd, env, readyWhenStatus, streamOutput); -} - -function nodeProcess( - commandConfig: { - command: string; - color?: string; - bgColor?: string; - prefix?: string; - prefixColor?: string; - }, - cwd: string, - env: Record, - readyWhenStatus: { stringToMatch: string; found: boolean }[], - streamOutput = true -): Promise<{ success: boolean; terminalOutput: string }> { - let terminalOutput = chalk.dim('> ') + commandConfig.command + '\r\n\r\n'; - if (streamOutput) { - process.stdout.write(terminalOutput); - } - return new Promise((res) => { - const childProcess = exec(commandConfig.command, { - maxBuffer: LARGE_BUFFER, - env, - cwd, - windowsHide: false, - }); - - childProcesses.add(childProcess); - - childProcess.stdout.on('data', (data) => { - const output = addColorAndPrefix(data, commandConfig); - terminalOutput += output; - if (streamOutput) { - process.stdout.write(output); - } - if (readyWhenStatus.length && isReady(readyWhenStatus, data.toString())) { - res({ success: true, terminalOutput }); - } - }); - childProcess.stderr.on('data', (err) => { - const output = addColorAndPrefix(err, commandConfig); - terminalOutput += output; - if (streamOutput) { - process.stderr.write(output); - } - if (readyWhenStatus.length && isReady(readyWhenStatus, err.toString())) { - res({ success: true, terminalOutput }); - } - }); - childProcess.on('error', (err) => { - const ouptput = addColorAndPrefix(err.toString(), commandConfig); - terminalOutput += ouptput; - if (streamOutput) { - process.stderr.write(ouptput); - } - res({ success: false, terminalOutput }); - }); - childProcess.on('exit', (code) => { - childProcesses.delete(childProcess); - if (!readyWhenStatus.length || isReady(readyWhenStatus)) { - res({ success: code === 0, terminalOutput }); - } - }); - }); -} - -function addColorAndPrefix( - out: string, - config: { - prefix?: string; - prefixColor?: string; - color?: string; - bgColor?: string; - } -) { - if (config.prefix) { - out = out - .split('\n') - .map((l) => { - let prefixText = config.prefix; - if (config.prefixColor && chalk[config.prefixColor]) { - prefixText = chalk[config.prefixColor](prefixText); - } - prefixText = chalk.bold(prefixText); - return l.trim().length > 0 ? `${prefixText} ${l}` : l; - }) - .join('\n'); - } - if (config.color && chalk[config.color]) { - out = chalk[config.color](out); - } - if (config.bgColor && chalk[config.bgColor]) { - out = chalk[config.bgColor](out); - } - return out; -} - -function calculateCwd( - cwd: string | undefined, - context: ExecutorContext -): string { - if (!cwd) return context.root; - if (path.isAbsolute(cwd)) return cwd; - return path.join(context.root, cwd); -} - -/** - * Env variables are processed in the following order: - * - env option from executor options - * - env file from envFile option if provided - * - local env variables - */ -function processEnv( - color: boolean, - cwd: string, - envOptionFromExecutor: Record, - envFile?: string -) { - let localEnv = appendLocalEnv({ cwd: cwd ?? process.cwd() }); - localEnv = { - ...process.env, - ...localEnv, - }; - - if (process.env.NX_LOAD_DOT_ENV_FILES !== 'false' && envFile) { - loadEnvVarsFile(envFile, localEnv); - } - let res: Record = { - ...localEnv, - ...envOptionFromExecutor, - }; - // need to override PATH to make sure we are using the local node_modules - if (localEnv.PATH) res.PATH = localEnv.PATH; // UNIX-like - if (localEnv.Path) res.Path = localEnv.Path; // Windows - - if (color) { - res.FORCE_COLOR = `${color}`; - } - return res; -} - export function interpolateArgsIntoCommand( command: string, opts: Pick< @@ -631,7 +324,10 @@ function filterPropKeysFromUnParsedOptions( let registered = false; -function registerProcessListener() { +function registerProcessListener( + runningTask: ParallelRunningTasks | SeriallyRunningTasks, + pseudoTerminal?: PseudoTerminal +) { if (registered) { return; } @@ -644,45 +340,25 @@ function registerProcessListener() { pseudoTerminal.sendMessageToChildren(message); } - childProcesses.forEach((p) => { - if ('connected' in p && p.connected) { - p.send(message); - } - }); + runningTask.send(message); }); // Terminate any task processes on exit process.on('exit', () => { - childProcesses.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill(); - } - }); + runningTask.kill(); }); process.on('SIGINT', () => { - childProcesses.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill('SIGTERM'); - } - }); + runningTask.kill('SIGTERM'); // we exit here because we don't need to write anything to cache. process.exit(signalToCode('SIGINT')); }); process.on('SIGTERM', () => { - childProcesses.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill('SIGTERM'); - } - }); + runningTask.kill('SIGTERM'); // no exit here because we expect child processes to terminate which // will store results to the cache and will terminate this process }); process.on('SIGHUP', () => { - childProcesses.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill('SIGTERM'); - } - }); + runningTask.kill('SIGTERM'); // no exit here because we expect child processes to terminate which // will store results to the cache and will terminate this process }); @@ -705,19 +381,3 @@ function wrapArgIntoQuotesIfNeeded(arg: string): string { return arg; } } - -function isReady( - readyWhenStatus: { stringToMatch: string; found: boolean }[] = [], - data?: string -): boolean { - if (data) { - for (const readyWhenElement of readyWhenStatus) { - if (data.toString().indexOf(readyWhenElement.stringToMatch) > -1) { - readyWhenElement.found = true; - break; - } - } - } - - return readyWhenStatus.every((readyWhenElement) => readyWhenElement.found); -} diff --git a/packages/nx/src/executors/run-commands/running-tasks.ts b/packages/nx/src/executors/run-commands/running-tasks.ts new file mode 100644 index 0000000000..216d943867 --- /dev/null +++ b/packages/nx/src/executors/run-commands/running-tasks.ts @@ -0,0 +1,519 @@ +import { ChildProcess, exec, Serializable } from 'child_process'; +import { RunningTask } from '../../tasks-runner/running-tasks/running-task'; +import { ExecutorContext } from '../../config/misc-interfaces'; +import { + LARGE_BUFFER, + NormalizedRunCommandsOptions, + RunCommandsCommandOptions, + RunCommandsOptions, +} from './run-commands.impl'; +import { + PseudoTerminal, + PseudoTtyProcess, +} from '../../tasks-runner/pseudo-terminal'; +import { isAbsolute, join } from 'path'; +import * as chalk from 'chalk'; +import { env as appendLocalEnv } from 'npm-run-path'; +import { + loadAndExpandDotEnvFile, + unloadDotEnvFile, +} from '../../tasks-runner/task-env'; +import * as treeKill from 'tree-kill'; + +export class ParallelRunningTasks implements RunningTask { + private readonly childProcesses: RunningNodeProcess[]; + private readyWhenStatus: { stringToMatch: string; found: boolean }[]; + private readonly streamOutput: boolean; + + private exitCallbacks: Array<(code: number, terminalOutput: string) => void> = + []; + + constructor(options: NormalizedRunCommandsOptions, context: ExecutorContext) { + this.childProcesses = options.commands.map( + (commandConfig) => + new RunningNodeProcess( + commandConfig, + options.color, + calculateCwd(options.cwd, context), + options.env ?? {}, + options.readyWhenStatus, + options.streamOutput, + options.envFile + ) + ); + this.readyWhenStatus = options.readyWhenStatus; + this.streamOutput = options.streamOutput; + + this.run(); + } + + async getResults(): Promise<{ code: number; terminalOutput: string }> { + return new Promise((res) => { + this.onExit((code, terminalOutput) => { + res({ code, terminalOutput }); + }); + }); + } + + onExit(cb: (code: number, terminalOutput: string) => void): void { + this.exitCallbacks.push(cb); + } + + send(message: Serializable): void { + for (const childProcess of this.childProcesses) { + childProcess.send(message); + } + } + + async kill(signal?: NodeJS.Signals | number) { + await Promise.all( + this.childProcesses.map(async (p) => { + try { + return p.kill(); + } catch (e) { + console.error(`Unable to terminate "${p.command}"\nError:`, e); + } + }) + ); + } + + private async run() { + if (this.readyWhenStatus.length) { + let { + childProcess, + result: { code, terminalOutput }, + } = await Promise.race( + this.childProcesses.map( + (childProcess) => + new Promise<{ + childProcess: RunningNodeProcess; + result: { code: number; terminalOutput: string }; + }>((res) => { + childProcess.onExit((code, terminalOutput) => { + res({ + childProcess, + result: { code, terminalOutput }, + }); + }); + }) + ) + ); + + if (code !== 0) { + const output = `Warning: command "${childProcess.command}" exited with non-zero status code`; + terminalOutput += output; + if (this.streamOutput) { + process.stderr.write(output); + } + } + + for (const cb of this.exitCallbacks) { + cb(code, terminalOutput); + } + } else { + const results = await Promise.all( + this.childProcesses.map((childProcess) => + childProcess.getResults().then((result) => ({ + childProcess, + result, + })) + ) + ); + + let terminalOutput = results + .map((r) => r.result.terminalOutput) + .join('\r\n'); + + const failed = results.filter((result) => result.result.code !== 0); + if (failed.length > 0) { + const output = failed + .map( + (failedResult) => + `Warning: command "${failedResult.childProcess.command}" exited with non-zero status code` + ) + .join('\r\n'); + terminalOutput += output; + if (this.streamOutput) { + process.stderr.write(output); + } + + for (const cb of this.exitCallbacks) { + cb(1, terminalOutput); + } + } else { + for (const cb of this.exitCallbacks) { + cb(0, terminalOutput); + } + } + } + } +} + +export class SeriallyRunningTasks implements RunningTask { + private terminalOutput = ''; + private currentProcess: RunningTask | PseudoTtyProcess | null = null; + private exitCallbacks: Array<(code: number, terminalOutput: string) => void> = + []; + private code: number | null = 0; + private error: any; + + constructor( + options: NormalizedRunCommandsOptions, + context: ExecutorContext, + private pseudoTerminal?: PseudoTerminal + ) { + this.run(options, context) + .catch((e) => { + this.error = e; + }) + .finally(() => { + for (const cb of this.exitCallbacks) { + cb(this.code, this.terminalOutput); + } + }); + } + + getResults(): Promise<{ code: number; terminalOutput: string }> { + return new Promise((res, rej) => { + this.onExit((code) => { + if (this.error) { + rej(this.error); + } else { + res({ code, terminalOutput: this.terminalOutput }); + } + }); + }); + } + + onExit(cb: (code: number, terminalOutput: string) => void): void { + this.exitCallbacks.push(cb); + } + + send(message: Serializable): void { + throw new Error('Not implemented'); + } + + kill(signal?: NodeJS.Signals | number) { + return this.currentProcess.kill(signal); + } + + private async run( + options: NormalizedRunCommandsOptions, + context: ExecutorContext + ) { + for (const c of options.commands) { + const childProcess = await this.createProcess( + c, + [], + options.color, + calculateCwd(options.cwd, context), + options.processEnv ?? options.env ?? {}, + false, + options.usePty, + options.streamOutput, + options.tty, + options.envFile + ); + this.currentProcess = childProcess; + + let { code, terminalOutput } = await childProcess.getResults(); + this.terminalOutput += terminalOutput; + this.code = code; + if (code !== 0) { + const output = `Warning: command "${c.command}" exited with non-zero status code`; + terminalOutput += output; + if (options.streamOutput) { + process.stderr.write(output); + } + this.terminalOutput += terminalOutput; + + // Stop running commands + break; + } + } + } + + private async createProcess( + commandConfig: RunCommandsCommandOptions, + readyWhenStatus: { stringToMatch: string; found: boolean }[] = [], + color: boolean, + cwd: string, + env: Record, + isParallel: boolean, + usePty: boolean = true, + streamOutput: boolean = true, + tty: boolean, + envFile?: string + ): Promise { + // The rust runCommand is always a tty, so it will not look nice in parallel and if we need prefixes + // currently does not work properly in windows + if ( + this.pseudoTerminal && + process.env.NX_NATIVE_COMMAND_RUNNER !== 'false' && + !commandConfig.prefix && + readyWhenStatus.length === 0 && + !isParallel && + usePty + ) { + return createProcessWithPseudoTty( + this.pseudoTerminal, + commandConfig, + color, + cwd, + env, + streamOutput, + tty, + envFile + ); + } + + return new RunningNodeProcess( + commandConfig, + color, + cwd, + env, + readyWhenStatus, + streamOutput, + envFile + ); + } +} + +class RunningNodeProcess implements RunningTask { + private terminalOutput = ''; + private childProcess: ChildProcess; + private exitCallbacks: Array<(code: number, terminalOutput: string) => void> = + []; + public command: string; + + constructor( + commandConfig: RunCommandsCommandOptions, + color: boolean, + cwd: string, + env: Record, + private readyWhenStatus: { stringToMatch: string; found: boolean }[], + streamOutput = true, + envFile: string + ) { + env = processEnv(color, cwd, env, envFile); + this.command = commandConfig.command; + this.terminalOutput = chalk.dim('> ') + commandConfig.command + '\r\n\r\n'; + if (streamOutput) { + process.stdout.write(this.terminalOutput); + } + this.childProcess = exec(commandConfig.command, { + maxBuffer: LARGE_BUFFER, + env, + cwd, + windowsHide: false, + }); + + this.addListeners(commandConfig, streamOutput); + } + + getResults(): Promise<{ code: number; terminalOutput: string }> { + return new Promise((res) => { + this.onExit((code, terminalOutput) => { + res({ code, terminalOutput }); + }); + }); + } + + onExit(cb: (code: number, terminalOutput: string) => void): void { + this.exitCallbacks.push(cb); + } + + send(message: Serializable): void { + this.childProcess.send(message); + } + + kill(signal?: NodeJS.Signals | number): Promise { + return new Promise((res, rej) => { + treeKill(this.childProcess.pid, signal, (err) => { + if (err) { + rej(err); + } else { + res(); + } + }); + }); + } + + private addListeners( + commandConfig: RunCommandsCommandOptions, + streamOutput: boolean + ) { + this.childProcess.stdout.on('data', (data) => { + const output = addColorAndPrefix(data, commandConfig); + this.terminalOutput += output; + if (streamOutput) { + process.stdout.write(output); + } + if ( + this.readyWhenStatus.length && + isReady(this.readyWhenStatus, data.toString()) + ) { + for (const cb of this.exitCallbacks) { + cb(0, this.terminalOutput); + } + } + }); + this.childProcess.stderr.on('data', (err) => { + const output = addColorAndPrefix(err, commandConfig); + this.terminalOutput += output; + if (streamOutput) { + process.stderr.write(output); + } + if ( + this.readyWhenStatus.length && + isReady(this.readyWhenStatus, err.toString()) + ) { + for (const cb of this.exitCallbacks) { + cb(1, this.terminalOutput); + } + } + }); + this.childProcess.on('error', (err) => { + const output = addColorAndPrefix(err.toString(), commandConfig); + this.terminalOutput += output; + if (streamOutput) { + process.stderr.write(output); + } + for (const cb of this.exitCallbacks) { + cb(1, this.terminalOutput); + } + }); + this.childProcess.on('exit', (code) => { + if (!this.readyWhenStatus.length || isReady(this.readyWhenStatus)) { + for (const cb of this.exitCallbacks) { + cb(code, this.terminalOutput); + } + } + }); + } +} + +async function createProcessWithPseudoTty( + pseudoTerminal: PseudoTerminal, + commandConfig: { + command: string; + color?: string; + bgColor?: string; + prefix?: string; + }, + color: boolean, + cwd: string, + env: Record, + streamOutput: boolean = true, + tty: boolean, + envFile?: string +) { + let terminalOutput = chalk.dim('> ') + commandConfig.command + '\r\n\r\n'; + if (streamOutput) { + process.stdout.write(terminalOutput); + } + env = processEnv(color, cwd, env, envFile); + const childProcess = pseudoTerminal.runCommand(commandConfig.command, { + cwd, + jsEnv: env, + quiet: !streamOutput, + tty, + }); + + childProcess.onOutput((output) => { + terminalOutput += output; + }); + + return childProcess; +} + +function addColorAndPrefix(out: string, config: RunCommandsCommandOptions) { + if (config.prefix) { + out = out + .split('\n') + .map((l) => { + let prefixText = config.prefix; + if (config.prefixColor && chalk[config.prefixColor]) { + prefixText = chalk[config.prefixColor](prefixText); + } + prefixText = chalk.bold(prefixText); + return l.trim().length > 0 ? `${prefixText} ${l}` : l; + }) + .join('\n'); + } + if (config.color && chalk[config.color]) { + out = chalk[config.color](out); + } + if (config.bgColor && chalk[config.bgColor]) { + out = chalk[config.bgColor](out); + } + return out; +} + +function calculateCwd( + cwd: string | undefined, + context: ExecutorContext +): string { + if (!cwd) return context.root; + if (isAbsolute(cwd)) return cwd; + return join(context.root, cwd); +} + +/** + * Env variables are processed in the following order: + * - env option from executor options + * - env file from envFile option if provided + * - local env variables + */ +function processEnv( + color: boolean, + cwd: string, + envOptionFromExecutor: Record, + envFile?: string +) { + let localEnv = appendLocalEnv({ cwd: cwd ?? process.cwd() }); + localEnv = { + ...process.env, + ...localEnv, + }; + + if (process.env.NX_LOAD_DOT_ENV_FILES !== 'false' && envFile) { + loadEnvVarsFile(envFile, localEnv); + } + let res: Record = { + ...localEnv, + ...envOptionFromExecutor, + }; + // need to override PATH to make sure we are using the local node_modules + if (localEnv.PATH) res.PATH = localEnv.PATH; // UNIX-like + if (localEnv.Path) res.Path = localEnv.Path; // Windows + + if (color) { + res.FORCE_COLOR = `${color}`; + } + return res; +} + +function isReady( + readyWhenStatus: { stringToMatch: string; found: boolean }[] = [], + data?: string +): boolean { + if (data) { + for (const readyWhenElement of readyWhenStatus) { + if (data.toString().indexOf(readyWhenElement.stringToMatch) > -1) { + readyWhenElement.found = true; + break; + } + } + } + + return readyWhenStatus.every((readyWhenElement) => readyWhenElement.found); +} + +function loadEnvVarsFile(path: string, env: Record = {}) { + unloadDotEnvFile(path, env); + const result = loadAndExpandDotEnvFile(path, env); + if (result.error) { + throw result.error; + } +} diff --git a/packages/nx/src/native/index.d.ts b/packages/nx/src/native/index.d.ts index db85885b6e..660732c5c8 100644 --- a/packages/nx/src/native/index.d.ts +++ b/packages/nx/src/native/index.d.ts @@ -11,6 +11,7 @@ export declare class ChildProcess { kill(): void onExit(callback: (message: string) => void): void onOutput(callback: (message: string) => void): void + cleanup(): void } export declare class FileLock { diff --git a/packages/nx/src/native/pseudo_terminal/child_process.rs b/packages/nx/src/native/pseudo_terminal/child_process.rs index be4deb401e..3b37f2c3fa 100644 --- a/packages/nx/src/native/pseudo_terminal/child_process.rs +++ b/packages/nx/src/native/pseudo_terminal/child_process.rs @@ -1,11 +1,13 @@ -use crossbeam_channel::Receiver; +use crossbeam_channel::Sender; +use crossbeam_channel::{bounded, Receiver}; use napi::{ - threadsafe_function::{ - ErrorStrategy::Fatal, ThreadsafeFunction, ThreadsafeFunctionCallMode::NonBlocking, - }, - Env, JsFunction, + threadsafe_function::{ + ErrorStrategy::Fatal, ThreadsafeFunction, ThreadsafeFunctionCallMode::NonBlocking, + }, + Env, JsFunction, }; use portable_pty::ChildKiller; +use tracing::warn; pub enum ChildProcessMessage { Kill, @@ -16,6 +18,7 @@ pub struct ChildProcess { process_killer: Box, message_receiver: Receiver, pub(crate) wait_receiver: Receiver, + thread_handles: Vec>, } #[napi] impl ChildProcess { @@ -28,6 +31,7 @@ impl ChildProcess { process_killer, message_receiver, wait_receiver: exit_receiver, + thread_handles: vec![], } } @@ -68,18 +72,43 @@ impl ChildProcess { callback_tsfn.unref(&env)?; - std::thread::spawn(move || { - while let Ok(content) = rx.recv() { - // windows will add `ESC[6n` to the beginning of the output, - // we dont want to store this ANSI code in cache, because replays will cause issues - // remove it before sending it to js - #[cfg(windows)] - let content = content.replace("\x1B[6n", ""); + let (kill_tx, kill_rx) = bounded::<()>(1); - callback_tsfn.call(content, NonBlocking); + std::thread::spawn(move || { + loop { + if kill_rx.try_recv().is_ok() { + break; + } + + if let Ok(content) = rx.try_recv() { + // windows will add `ESC[6n` to the beginning of the output, + // we dont want to store this ANSI code in cache, because replays will cause issues + // remove it before sending it to js + #[cfg(windows)] + let content = content.replace("\x1B[6n", ""); + callback_tsfn.call(content, NonBlocking); + } } }); + self.thread_handles.push(kill_tx); + Ok(()) } + + #[napi] + pub fn cleanup(&mut self) { + let handles = std::mem::take(&mut self.thread_handles); + for handle in handles { + if let Err(e) = handle.send(()) { + warn!(error = ?e, "Failed to send kill signal to thread"); + } + } + } +} + +impl Drop for ChildProcess { + fn drop(&mut self) { + self.cleanup(); + } } diff --git a/packages/nx/src/native/pseudo_terminal/pseudo_terminal.rs b/packages/nx/src/native/pseudo_terminal/pseudo_terminal.rs index 001e7357c2..61cd28143f 100644 --- a/packages/nx/src/native/pseudo_terminal/pseudo_terminal.rs +++ b/packages/nx/src/native/pseudo_terminal/pseudo_terminal.rs @@ -159,9 +159,9 @@ pub fn run_command( } let (exit_to_process_tx, exit_to_process_rx) = bounded(1); + trace!("Running {}", command); let mut child = pair.slave.spawn_command(cmd)?; pseudo_terminal.running.store(true, Ordering::SeqCst); - trace!("Running {}", command); let is_tty = tty.unwrap_or_else(|| std::io::stdout().is_tty()); if is_tty { trace!("Enabling raw mode"); diff --git a/packages/nx/src/tasks-runner/create-task-graph.spec.ts b/packages/nx/src/tasks-runner/create-task-graph.spec.ts index 4fa5768a29..059f2aa960 100644 --- a/packages/nx/src/tasks-runner/create-task-graph.spec.ts +++ b/packages/nx/src/tasks-runner/create-task-graph.spec.ts @@ -48,6 +48,7 @@ describe('createTaskGraph', () => { executor: 'nx:run-commands', }, serve: { + continuous: true, executor: 'nx:run-commands', }, }, @@ -90,6 +91,7 @@ describe('createTaskGraph', () => { roots: [], tasks: {}, dependencies: {}, + continuousDependencies: {}, }); }); @@ -117,11 +119,15 @@ describe('createTaskGraph', () => { overrides: { a: 123 }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:test': [], }, + continuousDependencies: { + 'app1:test': [], + }, }); const twoTasks = createTaskGraph( @@ -148,6 +154,7 @@ describe('createTaskGraph', () => { overrides: { a: 123 }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib1:test': { id: 'lib1:test', @@ -159,12 +166,17 @@ describe('createTaskGraph', () => { overrides: { a: 123 }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:test': [], 'lib1:test': [], }, + continuousDependencies: { + 'app1:test': [], + 'lib1:test': [], + }, }); }); @@ -299,6 +311,7 @@ describe('createTaskGraph', () => { overrides: {}, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, 'lib2:compile': { id: 'lib2:compile', @@ -312,12 +325,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }, }, dependencies: { 'lib1:compile:libDefault': ['lib2:compile'], 'lib2:compile': [], }, + continuousDependencies: { + 'lib1:compile:libDefault': [], + 'lib2:compile': [], + }, }); const compileApp = createTaskGraph( @@ -343,6 +361,7 @@ describe('createTaskGraph', () => { overrides: {}, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib1:compile:libDefault': { id: 'lib1:compile:libDefault', @@ -357,6 +376,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, 'lib2:compile:ci': { id: 'lib2:compile:ci', @@ -371,6 +391,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -378,6 +399,11 @@ describe('createTaskGraph', () => { 'lib1:compile:libDefault': ['lib2:compile:ci'], 'lib2:compile:ci': [], }, + continuousDependencies: { + 'app1:compile:ci': [], + 'lib1:compile:libDefault': [], + 'lib2:compile:ci': [], + }, }); }); @@ -460,6 +486,10 @@ describe('createTaskGraph', () => { 'app1:compile': ['lib3:compile'], 'lib3:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'lib3:compile': [], + }, roots: ['lib3:compile'], tasks: { 'app1:compile': { @@ -472,6 +502,7 @@ describe('createTaskGraph', () => { target: 'compile', }, parallelism: true, + continuous: false, }, 'lib3:compile': { id: 'lib3:compile', @@ -485,6 +516,7 @@ describe('createTaskGraph', () => { target: 'compile', }, parallelism: true, + continuous: false, }, }, }); @@ -514,11 +546,15 @@ describe('createTaskGraph', () => { overrides: { a: '--value=app1-root' }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:test': [], }, + continuousDependencies: { + 'app1:test': [], + }, }); }); @@ -546,11 +582,15 @@ describe('createTaskGraph', () => { overrides: { a: '--base-href=/app1-root${deploymentId}' }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:test': [], }, + continuousDependencies: { + 'app1:test': [], + }, }); }); @@ -661,6 +701,7 @@ describe('createTaskGraph', () => { overrides: { myFlag: 'flag value' }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:precompile': { id: 'app1:precompile', @@ -672,6 +713,7 @@ describe('createTaskGraph', () => { overrides: { myFlag: 'flag value' }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib1:compile': { id: 'lib1:compile', @@ -683,6 +725,7 @@ describe('createTaskGraph', () => { overrides: { myFlag: 'flag value' }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, 'lib2:compile': { id: 'lib2:compile', @@ -694,6 +737,7 @@ describe('createTaskGraph', () => { overrides: { __overrides_unparsed__: [] }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -702,6 +746,12 @@ describe('createTaskGraph', () => { 'lib1:compile': ['lib2:compile'], 'lib2:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app1:precompile': [], + 'lib1:compile': [], + 'lib2:compile': [], + }, }); }); @@ -732,6 +782,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:precompile': { id: 'app1:precompile', @@ -745,6 +796,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:precompile2': { id: 'app1:precompile2', @@ -758,6 +810,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib1:compile': { id: 'lib1:compile', @@ -771,6 +824,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -779,6 +833,127 @@ describe('createTaskGraph', () => { 'app1:precompile2': [], 'lib1:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app1:precompile': [], + 'app1:precompile2': [], + 'lib1:compile': [], + }, + }); + }); + + it('should create graphs with continuous dependencies', () => { + projectGraph.nodes['app1'].data.targets['serve'].dependsOn = [ + { + dependencies: true, + target: 'serve', + }, + { + target: 'compile', + }, + ]; + projectGraph.nodes['app1'].data.targets['compile'].dependsOn = [ + { + dependencies: true, + target: 'compile', + }, + ]; + projectGraph.nodes['lib1'].data.targets['serve'] = { + executor: 'nx:run-command', + continuous: true, + dependsOn: [ + { + dependencies: true, + target: 'serve', + }, + { + target: 'compile', + }, + ], + }; + const taskGraph = createTaskGraph( + projectGraph, + {}, + ['app1'], + ['serve'], + undefined, + { + __overrides_unparsed__: [], + } + ); + // precompile should also be in here + expect(taskGraph).toEqual({ + roots: ['lib1:compile'], + tasks: { + 'app1:serve': { + id: 'app1:serve', + target: { + project: 'app1', + target: 'serve', + }, + outputs: [], + overrides: { + __overrides_unparsed__: [], + }, + projectRoot: 'app1-root', + parallelism: true, + continuous: true, + }, + 'app1:compile': { + id: 'app1:compile', + target: { + project: 'app1', + target: 'compile', + }, + outputs: [], + overrides: { + __overrides_unparsed__: [], + }, + projectRoot: 'app1-root', + parallelism: true, + continuous: false, + }, + 'lib1:serve': { + id: 'lib1:serve', + target: { + project: 'lib1', + target: 'serve', + }, + outputs: [], + overrides: { + __overrides_unparsed__: [], + }, + projectRoot: 'lib1-root', + parallelism: true, + continuous: true, + }, + 'lib1:compile': { + id: 'lib1:compile', + target: { + project: 'lib1', + target: 'compile', + }, + outputs: [], + overrides: { + __overrides_unparsed__: [], + }, + projectRoot: 'lib1-root', + parallelism: true, + continuous: false, + }, + }, + dependencies: { + 'app1:serve': ['app1:compile'], + 'app1:compile': ['lib1:compile'], + 'lib1:serve': ['lib1:compile'], + 'lib1:compile': [], + }, + continuousDependencies: { + 'app1:serve': ['lib1:serve'], + 'app1:compile': [], + 'lib1:serve': [], + 'lib1:compile': [], + }, }); }); @@ -809,6 +984,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:precompile': { id: 'app1:precompile', @@ -822,6 +998,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:precompile2': { id: 'app1:precompile2', @@ -835,6 +1012,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib1:compile': { id: 'lib1:compile', @@ -848,6 +1026,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -856,6 +1035,12 @@ describe('createTaskGraph', () => { 'app1:precompile2': [], 'lib1:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app1:precompile': [], + 'app1:precompile2': [], + 'lib1:compile': [], + }, }); }); @@ -955,6 +1140,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib1:compile': { id: 'lib1:compile', @@ -968,6 +1154,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, 'lib2:compile': { id: 'lib2:compile', @@ -981,6 +1168,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }, 'lib3:compile': { id: 'lib3:compile', @@ -994,6 +1182,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib3-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -1002,6 +1191,12 @@ describe('createTaskGraph', () => { 'lib2:compile': ['lib3:compile'], 'lib3:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'lib1:compile': [], + 'lib2:compile': [], + 'lib3:compile': [], + }, }); }); @@ -1118,6 +1313,7 @@ describe('createTaskGraph', () => { outputs: [], overrides: { myFlag: 'flag value' }, parallelism: true, + continuous: false, }, 'app2:compile': { id: 'app2:compile', @@ -1126,6 +1322,7 @@ describe('createTaskGraph', () => { outputs: [], overrides: { __overrides_unparsed__: [] }, parallelism: true, + continuous: false, }, 'coreInfra:apply': { id: 'coreInfra:apply', @@ -1134,6 +1331,7 @@ describe('createTaskGraph', () => { outputs: [], overrides: { myFlag: 'flag value' }, parallelism: true, + continuous: false, }, 'app1:compile': { id: 'app1:compile', @@ -1142,6 +1340,7 @@ describe('createTaskGraph', () => { outputs: [], overrides: { __overrides_unparsed__: [] }, parallelism: true, + continuous: false, }, 'infra2:apply': { id: 'infra2:apply', @@ -1150,6 +1349,7 @@ describe('createTaskGraph', () => { outputs: [], overrides: { myFlag: 'flag value' }, parallelism: true, + continuous: false, }, }, dependencies: { @@ -1164,6 +1364,13 @@ describe('createTaskGraph', () => { 'app1:compile': [], 'infra2:apply': ['app2:compile', 'coreInfra:apply'], }, + continuousDependencies: { + 'infra1:apply': [], + 'app2:compile': [], + 'coreInfra:apply': [], + 'app1:compile': [], + 'infra2:apply': [], + }, }); }); @@ -1217,6 +1424,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:test': { id: 'app1:test', @@ -1230,12 +1438,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:compile': ['app1:test'], 'app1:test': ['app1:compile'], }, + continuousDependencies: { + 'app1:compile': [], + 'app1:test': [], + }, }); }); @@ -1326,6 +1539,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }), 'lib2:build': expect.objectContaining({ id: 'lib2:build', @@ -1339,6 +1553,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }), 'lib3:build': expect.objectContaining({ id: 'lib3:build', @@ -1352,6 +1567,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib3-root', parallelism: true, + continuous: false, }), 'lib4:build': expect.objectContaining({ id: 'lib4:build', @@ -1365,6 +1581,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib4-root', parallelism: true, + continuous: false, }), }, dependencies: { @@ -1373,6 +1590,12 @@ describe('createTaskGraph', () => { 'lib3:build': ['lib4:build'], 'lib4:build': ['lib1:build'], }, + continuousDependencies: { + 'lib1:build': [], + 'lib2:build': [], + 'lib3:build': [], + 'lib4:build': [], + }, }); }); @@ -1458,6 +1681,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }), 'lib2:build': expect.objectContaining({ id: 'lib2:build', @@ -1471,6 +1695,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }), 'lib4:build': expect.objectContaining({ id: 'lib4:build', @@ -1484,6 +1709,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib4-root', parallelism: true, + continuous: false, }), }, dependencies: { @@ -1491,6 +1717,11 @@ describe('createTaskGraph', () => { 'lib2:build': ['lib4:build'], 'lib4:build': [], }, + continuousDependencies: { + 'lib1:build': [], + 'lib2:build': [], + 'lib4:build': [], + }, }); }); @@ -1551,11 +1782,15 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }), }, dependencies: { 'lib1:build': [], }, + continuousDependencies: { + 'lib1:build': [], + }, }); }); @@ -1642,6 +1877,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }), 'lib2:build': expect.objectContaining({ id: 'lib2:build', @@ -1655,6 +1891,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }), 'lib4:build': expect.objectContaining({ id: 'lib4:build', @@ -1668,6 +1905,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib4-root', parallelism: true, + continuous: false, }), }, dependencies: { @@ -1675,6 +1913,11 @@ describe('createTaskGraph', () => { 'lib2:build': [], 'lib4:build': ['lib1:build'], }, + continuousDependencies: { + 'lib1:build': [], + 'lib2:build': [], + 'lib4:build': [], + }, }); }); @@ -1757,6 +2000,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }), 'lib2:build': expect.objectContaining({ id: 'lib2:build', @@ -1770,12 +2014,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }), }, dependencies: { 'lib1:build': ['lib2:build'], 'lib2:build': [], }, + continuousDependencies: { + 'lib1:build': [], + 'lib2:build': [], + }, }); }); @@ -1852,6 +2101,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app3:compile': { id: 'app3:compile', @@ -1865,12 +2115,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'app3-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:compile': [], 'app3:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app3:compile': [], + }, }); }); @@ -1944,6 +2199,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app3:compile': { id: 'app3:compile', @@ -1957,12 +2213,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'app3-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:compile': [], 'app3:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app3:compile': [], + }, }); }); @@ -2042,6 +2303,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:test': { id: 'app1:test', @@ -2055,6 +2317,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib2:dep': { id: 'lib2:dep', @@ -2068,6 +2331,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }, 'lib2:dep2': { id: 'lib2:dep2', @@ -2081,6 +2345,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -2089,6 +2354,12 @@ describe('createTaskGraph', () => { 'lib2:dep': [], 'lib2:dep2': [], }, + continuousDependencies: { + 'app1:lint': [], + 'app1:test': [], + 'lib2:dep': [], + 'lib2:dep2': [], + }, }); }); @@ -2160,11 +2431,15 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + }, }); const taskGraph2 = createTaskGraph( @@ -2194,6 +2469,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app2:compile': { id: 'app2:compile', @@ -2207,12 +2483,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'app2-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:compile': ['app2:compile'], 'app2:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app2:compile': [], + }, }); }); @@ -2706,6 +2987,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app4:precompile': { id: 'app4:precompile', @@ -2719,12 +3001,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'app4-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:compile': ['app4:precompile'], 'app4:precompile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app4:precompile': [], + }, }); taskGraph = createTaskGraph( @@ -2752,6 +3039,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app2-root', parallelism: true, + continuous: false, }, 'app3:compile': { id: 'app3:compile', @@ -2765,6 +3053,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app3-root', parallelism: true, + continuous: false, }, 'app4:precompile': { id: 'app4:precompile', @@ -2778,6 +3067,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app4-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -2785,6 +3075,11 @@ describe('createTaskGraph', () => { 'app3:compile': ['app4:precompile'], 'app4:precompile': [], }, + continuousDependencies: { + 'app2:compile': [], + 'app3:compile': [], + 'app4:precompile': [], + }, }); }); diff --git a/packages/nx/src/tasks-runner/create-task-graph.ts b/packages/nx/src/tasks-runner/create-task-graph.ts index bb97f6e06e..30a8c2b335 100644 --- a/packages/nx/src/tasks-runner/create-task-graph.ts +++ b/packages/nx/src/tasks-runner/create-task-graph.ts @@ -16,6 +16,7 @@ export class ProcessTasks { private readonly seen = new Set(); readonly tasks: { [id: string]: Task } = {}; readonly dependencies: { [k: string]: string[] } = {}; + readonly continuousDependencies: { [k: string]: string[] } = {}; private readonly allTargetNames: string[]; constructor( @@ -58,6 +59,7 @@ export class ProcessTasks { ); this.tasks[task.id] = task; this.dependencies[task.id] = []; + this.continuousDependencies[task.id] = []; } } } @@ -75,6 +77,7 @@ export class ProcessTasks { if (!initialTasks[t]) { delete this.tasks[t]; delete this.dependencies[t]; + delete this.continuousDependencies[t]; } } for (let d of Object.keys(this.dependencies)) { @@ -82,6 +85,11 @@ export class ProcessTasks { (dd) => !!initialTasks[dd] ); } + for (let d of Object.keys(this.continuousDependencies)) { + this.continuousDependencies[d] = this.continuousDependencies[d].filter( + (dd) => !!initialTasks[dd] + ); + } } filterDummyTasks(this.dependencies); @@ -96,8 +104,22 @@ export class ProcessTasks { } } - return Object.keys(this.dependencies).filter( - (d) => this.dependencies[d].length === 0 + filterDummyTasks(this.continuousDependencies); + + for (const taskId of Object.keys(this.continuousDependencies)) { + if (this.continuousDependencies[taskId].length > 0) { + this.continuousDependencies[taskId] = [ + ...new Set( + this.continuousDependencies[taskId].filter((d) => d !== taskId) + ).values(), + ]; + } + } + + return Object.keys(this.tasks).filter( + (d) => + this.dependencies[d].length === 0 && + this.continuousDependencies[d].length === 0 ); } @@ -204,9 +226,6 @@ export class ProcessTasks { dependencyConfig.target, resolvedConfiguration ); - if (task.id !== selfTaskId) { - this.dependencies[task.id].push(selfTaskId); - } if (!this.tasks[selfTaskId]) { const newTask = this.createTask( selfTaskId, @@ -217,6 +236,7 @@ export class ProcessTasks { ); this.tasks[selfTaskId] = newTask; this.dependencies[selfTaskId] = []; + this.continuousDependencies[selfTaskId] = []; this.processTask( newTask, newTask.target.project, @@ -224,6 +244,13 @@ export class ProcessTasks { overrides ); } + if (task.id !== selfTaskId) { + if (this.tasks[selfTaskId].continuous) { + this.continuousDependencies[task.id].push(selfTaskId); + } else { + this.dependencies[task.id].push(selfTaskId); + } + } } } @@ -265,8 +292,17 @@ export class ProcessTasks { resolvedConfiguration ); + const depTargetConfiguration = + this.projectGraph.nodes[depProject.name].data.targets[ + dependencyConfig.target + ]; + if (task.id !== depTargetId) { - this.dependencies[task.id].push(depTargetId); + if (depTargetConfiguration.continuous) { + this.continuousDependencies[task.id].push(depTargetId); + } else { + this.dependencies[task.id].push(depTargetId); + } } if (!this.tasks[depTargetId]) { const newTask = this.createTask( @@ -278,6 +314,7 @@ export class ProcessTasks { ); this.tasks[depTargetId] = newTask; this.dependencies[depTargetId] = []; + this.continuousDependencies[depTargetId] = []; this.processTask( newTask, @@ -298,6 +335,7 @@ export class ProcessTasks { ); this.dependencies[task.id].push(dummyId); this.dependencies[dummyId] ??= []; + this.continuousDependencies[dummyId] ??= []; const noopTask = this.createDummyTask(dummyId, task); this.processTask(noopTask, depProject.name, configuration, overrides); } @@ -354,6 +392,7 @@ export class ProcessTasks { ), cache: project.data.targets[target].cache, parallelism: project.data.targets[target].parallelism ?? true, + continuous: project.data.targets[target].continuous ?? false, }; } @@ -405,6 +444,7 @@ export function createTaskGraph( roots, tasks: p.tasks, dependencies: p.dependencies, + continuousDependencies: p.continuousDependencies, }; } diff --git a/packages/nx/src/tasks-runner/forked-process-task-runner.ts b/packages/nx/src/tasks-runner/forked-process-task-runner.ts index ab6750d7d4..e14e773298 100644 --- a/packages/nx/src/tasks-runner/forked-process-task-runner.ts +++ b/packages/nx/src/tasks-runner/forked-process-task-runner.ts @@ -1,26 +1,26 @@ -import { readFileSync, writeFileSync } from 'fs'; -import { ChildProcess, fork, Serializable } from 'child_process'; -import * as chalk from 'chalk'; +import { writeFileSync } from 'fs'; +import { fork, Serializable } from 'child_process'; import { DefaultTasksRunnerOptions } from './default-tasks-runner'; import { output } from '../utils/output'; import { getCliPath, getPrintableCommandArgsForTask } from './utils'; import { Batch } from './tasks-schedule'; import { join } from 'path'; -import { - BatchMessage, - BatchMessageType, - BatchResults, -} from './batch/batch-messages'; +import { BatchMessageType } from './batch/batch-messages'; import { stripIndents } from '../utils/strip-indents'; import { Task, TaskGraph } from '../config/task-graph'; -import { Transform } from 'stream'; import { - PseudoTtyProcess, getPseudoTerminal, PseudoTerminal, + PseudoTtyProcess, } from './pseudo-terminal'; import { signalToCode } from '../utils/exit-codes'; import { ProjectGraph } from '../config/project-graph'; +import { + NodeChildProcessWithDirectOutput, + NodeChildProcessWithNonDirectOutput, +} from './running-tasks/node-child-process'; +import { BatchProcess } from './running-tasks/batch-process'; +import { RunningTask } from './running-tasks/running-task'; const forkScript = join(__dirname, './fork.js'); @@ -30,8 +30,8 @@ export class ForkedProcessTaskRunner { cliPath = getCliPath(); private readonly verbose = process.env.NX_VERBOSE_LOGGING === 'true'; - private processes = new Set(); - private finishedProcesses = new Set(); + private processes = new Set(); + private finishedProcesses = new Set(); private pseudoTerminal: PseudoTerminal | null = PseudoTerminal.isSupported() ? getPseudoTerminal() @@ -47,95 +47,53 @@ export class ForkedProcessTaskRunner { } // TODO: vsavkin delegate terminal output printing - public forkProcessForBatch( + public async forkProcessForBatch( { executorName, taskGraph: batchTaskGraph }: Batch, projectGraph: ProjectGraph, fullTaskGraph: TaskGraph, env: NodeJS.ProcessEnv - ): Promise { - return new Promise((res, rej) => { - let p: ChildProcess; - try { - const count = Object.keys(batchTaskGraph.tasks).length; - if (count > 1) { - output.logSingleLine( - `Running ${output.bold(count)} ${output.bold( - 'tasks' - )} with ${output.bold(executorName)}` - ); - } else { - const args = getPrintableCommandArgsForTask( - Object.values(batchTaskGraph.tasks)[0] - ); - output.logCommand(args.join(' ')); - } + ): Promise { + const count = Object.keys(batchTaskGraph.tasks).length; + if (count > 1) { + output.logSingleLine( + `Running ${output.bold(count)} ${output.bold( + 'tasks' + )} with ${output.bold(executorName)}` + ); + } else { + const args = getPrintableCommandArgsForTask( + Object.values(batchTaskGraph.tasks)[0] + ); + output.logCommand(args.join(' ')); + } - p = fork(workerPath, { - stdio: ['inherit', 'inherit', 'inherit', 'ipc'], - env, - }); - this.processes.add(p); - - p.once('exit', (code, signal) => { - this.processes.delete(p); - if (code === null) code = signalToCode(signal); - if (code !== 0) { - rej( - new Error( - `"${executorName}" exited unexpectedly with code: ${code}` - ) - ); - } - }); - - p.on('error', (err) => { - this.processes.delete(p); - rej(err || new Error(`"${executorName}" exited unexpectedly`)); - }); - - p.on('message', (message: BatchMessage) => { - switch (message.type) { - case BatchMessageType.CompleteBatchExecution: { - res(message.results); - this.finishedProcesses.add(p); - break; - } - case BatchMessageType.RunTasks: { - break; - } - default: { - // Re-emit any non-batch messages from the task process - if (process.send) { - process.send(message); - } - } - } - }); - - // Start the tasks - p.send({ - type: BatchMessageType.RunTasks, - executorName, - projectGraph, - batchTaskGraph, - fullTaskGraph, - }); - } catch (e) { - rej(e); - if (p) { - this.processes.delete(p); - p.kill(); - } - } + const p = fork(workerPath, { + stdio: ['inherit', 'inherit', 'inherit', 'ipc'], + env, }); + const cp = new BatchProcess(p, executorName); + this.processes.add(cp); + + cp.onExit(() => { + this.processes.delete(cp); + }); + + // Start the tasks + cp.send({ + type: BatchMessageType.RunTasks, + executorName, + projectGraph, + batchTaskGraph, + fullTaskGraph, + }); + + return cp; } public cleanUpBatchProcesses() { if (this.finishedProcesses.size > 0) { this.finishedProcesses.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill(); - } + p.kill(); }); this.finishedProcesses.clear(); } @@ -156,15 +114,15 @@ export class ForkedProcessTaskRunner { taskGraph: TaskGraph; env: NodeJS.ProcessEnv; } - ): Promise<{ code: number; terminalOutput: string }> { + ): Promise { return pipeOutput - ? await this.forkProcessPipeOutputCapture(task, { + ? this.forkProcessWithPrefixAndNotTTY(task, { temporaryOutputPath, streamOutput, taskGraph, env, }) - : await this.forkProcessDirectOutputCapture(task, { + : this.forkProcessDirectOutputCapture(task, { temporaryOutputPath, streamOutput, taskGraph, @@ -188,7 +146,7 @@ export class ForkedProcessTaskRunner { env: NodeJS.ProcessEnv; disablePseudoTerminal: boolean; } - ): Promise<{ code: number; terminalOutput: string }> { + ): Promise { const shouldPrefix = streamOutput && process.env.NX_PREFIX_OUTPUT === 'true'; @@ -229,7 +187,7 @@ export class ForkedProcessTaskRunner { taskGraph: TaskGraph; env: NodeJS.ProcessEnv; } - ): Promise<{ code: number; terminalOutput: string }> { + ): Promise { const args = getPrintableCommandArgsForTask(task); if (streamOutput) { output.logCommand(args.join(' ')); @@ -256,42 +214,15 @@ export class ForkedProcessTaskRunner { terminalOutput += msg; }); - return new Promise((res) => { - p.onExit((code) => { - this.processes.delete(p); - // If the exit code is greater than 128, it's a special exit code for a signal - if (code >= 128) { - process.exit(code); - } - this.writeTerminalOutput(temporaryOutputPath, terminalOutput); - res({ - code, - terminalOutput, - }); - }); + p.onExit((code) => { + if (code > 128) { + process.exit(code); + } + this.processes.delete(p); + this.writeTerminalOutput(temporaryOutputPath, terminalOutput); }); - } - private forkProcessPipeOutputCapture( - task: Task, - { - streamOutput, - temporaryOutputPath, - taskGraph, - env, - }: { - streamOutput: boolean; - temporaryOutputPath: string; - taskGraph: TaskGraph; - env: NodeJS.ProcessEnv; - } - ) { - return this.forkProcessWithPrefixAndNotTTY(task, { - streamOutput, - temporaryOutputPath, - taskGraph, - env, - }); + return p; } private forkProcessWithPrefixAndNotTTY( @@ -308,85 +239,49 @@ export class ForkedProcessTaskRunner { env: NodeJS.ProcessEnv; } ) { - return new Promise<{ code: number; terminalOutput: string }>((res, rej) => { - try { - const args = getPrintableCommandArgsForTask(task); - if (streamOutput) { - output.logCommand(args.join(' ')); - } - - const p = fork(this.cliPath, { - stdio: ['inherit', 'pipe', 'pipe', 'ipc'], - env, - }); - this.processes.add(p); - - // Re-emit any messages from the task process - p.on('message', (message) => { - if (process.send) { - process.send(message); - } - }); - - // Send message to run the executor - p.send({ - targetDescription: task.target, - overrides: task.overrides, - taskGraph, - isVerbose: this.verbose, - }); - - if (streamOutput) { - if (process.env.NX_PREFIX_OUTPUT === 'true') { - const color = getColor(task.target.project); - const prefixText = `${task.target.project}:`; - - p.stdout - .pipe( - logClearLineToPrefixTransformer(color.bold(prefixText) + ' ') - ) - .pipe(addPrefixTransformer(color.bold(prefixText))) - .pipe(process.stdout); - p.stderr - .pipe(logClearLineToPrefixTransformer(color(prefixText) + ' ')) - .pipe(addPrefixTransformer(color(prefixText))) - .pipe(process.stderr); - } else { - p.stdout.pipe(addPrefixTransformer()).pipe(process.stdout); - p.stderr.pipe(addPrefixTransformer()).pipe(process.stderr); - } - } - - let outWithErr = []; - p.stdout.on('data', (chunk) => { - outWithErr.push(chunk.toString()); - }); - p.stderr.on('data', (chunk) => { - outWithErr.push(chunk.toString()); - }); - - p.once('exit', (code, signal) => { - this.processes.delete(p); - if (code === null) code = signalToCode(signal); - // we didn't print any output as we were running the command - // print all the collected output| - const terminalOutput = outWithErr.join(''); - - if (!streamOutput) { - this.options.lifeCycle.printTaskTerminalOutput( - task, - code === 0 ? 'success' : 'failure', - terminalOutput - ); - } - this.writeTerminalOutput(temporaryOutputPath, terminalOutput); - res({ code, terminalOutput }); - }); - } catch (e) { - console.error(e); - rej(e); + try { + const args = getPrintableCommandArgsForTask(task); + if (streamOutput) { + output.logCommand(args.join(' ')); } - }); + + const p = fork(this.cliPath, { + stdio: ['inherit', 'pipe', 'pipe', 'ipc'], + env, + }); + + // Send message to run the executor + p.send({ + targetDescription: task.target, + overrides: task.overrides, + taskGraph, + isVerbose: this.verbose, + }); + + const cp = new NodeChildProcessWithNonDirectOutput(p, { + streamOutput, + prefix: task.target.project, + }); + this.processes.add(cp); + + cp.onExit((code, terminalOutput) => { + this.processes.delete(cp); + + if (!streamOutput) { + this.options.lifeCycle.printTaskTerminalOutput( + task, + code === 0 ? 'success' : 'failure', + terminalOutput + ); + } + this.writeTerminalOutput(temporaryOutputPath, terminalOutput); + }); + + return cp; + } catch (e) { + console.error(e); + throw e; + } } private forkProcessDirectOutputCapture( @@ -403,71 +298,56 @@ export class ForkedProcessTaskRunner { env: NodeJS.ProcessEnv; } ) { - return new Promise<{ code: number; terminalOutput: string }>((res, rej) => { - try { - const args = getPrintableCommandArgsForTask(task); - if (streamOutput) { - output.logCommand(args.join(' ')); - } - const p = fork(this.cliPath, { - stdio: ['inherit', 'inherit', 'inherit', 'ipc'], - env, - }); - this.processes.add(p); + try { + const args = getPrintableCommandArgsForTask(task); + if (streamOutput) { + output.logCommand(args.join(' ')); + } + const p = fork(this.cliPath, { + stdio: ['inherit', 'inherit', 'inherit', 'ipc'], + env, + }); + const cp = new NodeChildProcessWithDirectOutput(p, temporaryOutputPath); - // Re-emit any messages from the task process - p.on('message', (message) => { - if (process.send) { - process.send(message); + this.processes.add(cp); + + // Send message to run the executor + p.send({ + targetDescription: task.target, + overrides: task.overrides, + taskGraph, + isVerbose: this.verbose, + }); + + cp.onExit((code, signal) => { + this.processes.delete(cp); + // we didn't print any output as we were running the command + // print all the collected output + try { + const terminalOutput = cp.getTerminalOutput(); + if (!streamOutput) { + this.options.lifeCycle.printTaskTerminalOutput( + task, + code === 0 ? 'success' : 'failure', + terminalOutput + ); } - }); - - // Send message to run the executor - p.send({ - targetDescription: task.target, - overrides: task.overrides, - taskGraph, - isVerbose: this.verbose, - }); - - p.once('exit', (code, signal) => { - this.processes.delete(p); - if (code === null) code = signalToCode(signal); - // we didn't print any output as we were running the command - // print all the collected output - let terminalOutput = ''; - try { - terminalOutput = this.readTerminalOutput(temporaryOutputPath); - if (!streamOutput) { - this.options.lifeCycle.printTaskTerminalOutput( - task, - code === 0 ? 'success' : 'failure', - terminalOutput - ); - } - } catch (e) { - console.log(stripIndents` + } catch (e) { + console.log(stripIndents` Unable to print terminal output for Task "${task.id}". Task failed with Exit Code ${code} and Signal "${signal}". Received error message: ${e.message} `); - } - res({ - code, - terminalOutput, - }); - }); - } catch (e) { - console.error(e); - rej(e); - } - }); - } + } + }); - private readTerminalOutput(outputPath: string) { - return readFileSync(outputPath).toString(); + return cp; + } catch (e) { + console.error(e); + throw e; + } } private writeTerminalOutput(outputPath: string, content: string) { @@ -482,13 +362,12 @@ export class ForkedProcessTaskRunner { } const messageHandler = (message: Serializable) => { - // this.publisher.publish(message.toString()); if (this.pseudoTerminal) { this.pseudoTerminal.sendMessageToChildren(message); } this.processes.forEach((p) => { - if ('connected' in p && p.connected) { + if ('connected' in p && p.connected && 'send' in p) { p.send(message); } }); @@ -499,9 +378,7 @@ export class ForkedProcessTaskRunner { const cleanUp = (signal?: NodeJS.Signals) => { this.processes.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill(signal); - } + p.kill(signal); }); process.off('message', messageHandler); this.cleanUpBatchProcesses(); @@ -528,60 +405,3 @@ export class ForkedProcessTaskRunner { }); } } - -const colors = [ - chalk.green, - chalk.greenBright, - chalk.red, - chalk.redBright, - chalk.cyan, - chalk.cyanBright, - chalk.yellow, - chalk.yellowBright, - chalk.magenta, - chalk.magentaBright, -]; - -function getColor(projectName: string) { - let code = 0; - for (let i = 0; i < projectName.length; ++i) { - code += projectName.charCodeAt(i); - } - const colorIndex = code % colors.length; - - return colors[colorIndex]; -} - -/** - * Prevents terminal escape sequence from clearing line prefix. - */ -function logClearLineToPrefixTransformer(prefix: string) { - let prevChunk = null; - return new Transform({ - transform(chunk, _encoding, callback) { - if (prevChunk && prevChunk.toString() === '\x1b[2K') { - chunk = chunk.toString().replace(/\x1b\[1G/g, (m) => m + prefix); - } - this.push(chunk); - prevChunk = chunk; - callback(); - }, - }); -} - -function addPrefixTransformer(prefix?: string) { - const newLineSeparator = process.platform.startsWith('win') ? '\r\n' : '\n'; - return new Transform({ - transform(chunk, _encoding, callback) { - const list = chunk.toString().split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g); - list - .filter(Boolean) - .forEach((m) => - this.push( - prefix ? prefix + ' ' + m + newLineSeparator : m + newLineSeparator - ) - ); - callback(); - }, - }); -} diff --git a/packages/nx/src/tasks-runner/init-tasks-runner.ts b/packages/nx/src/tasks-runner/init-tasks-runner.ts index e582ee3fe0..b614f01737 100644 --- a/packages/nx/src/tasks-runner/init-tasks-runner.ts +++ b/packages/nx/src/tasks-runner/init-tasks-runner.ts @@ -47,6 +47,10 @@ export async function initTasksRunner(nxArgs: NxArgs) { acc[task.id] = []; return acc; }, {} as any), + continuousDependencies: opts.tasks.reduce((acc, task) => { + acc[task.id] = []; + return acc; + }, {} as any), }; const taskResults = await invokeTasksRunner({ diff --git a/packages/nx/src/tasks-runner/life-cycles/formatting-utils.spec.ts b/packages/nx/src/tasks-runner/life-cycles/formatting-utils.spec.ts index 8b706a63c6..269157b909 100644 --- a/packages/nx/src/tasks-runner/life-cycles/formatting-utils.spec.ts +++ b/packages/nx/src/tasks-runner/life-cycles/formatting-utils.spec.ts @@ -59,6 +59,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -78,6 +79,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -99,6 +101,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -109,6 +112,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -131,6 +135,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -150,6 +155,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -171,6 +177,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -181,6 +188,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -200,6 +208,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -210,6 +219,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -229,6 +239,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -239,6 +250,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -262,6 +274,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -272,6 +285,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -295,6 +309,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -305,6 +320,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -315,6 +331,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -338,6 +355,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -348,6 +366,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -358,6 +377,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -368,6 +388,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] diff --git a/packages/nx/src/tasks-runner/pseudo-terminal.spec.ts b/packages/nx/src/tasks-runner/pseudo-terminal.spec.ts index 7eefc84de5..def3967731 100644 --- a/packages/nx/src/tasks-runner/pseudo-terminal.spec.ts +++ b/packages/nx/src/tasks-runner/pseudo-terminal.spec.ts @@ -17,6 +17,7 @@ describe('PseudoTerminal', () => { done(); }); }); + it('should kill a running command', (done) => { const childProcess = terminal.runCommand( 'sleep 3 && echo "hello world" > file.txt' @@ -31,18 +32,35 @@ describe('PseudoTerminal', () => { it('should subscribe to output', (done) => { const childProcess = terminal.runCommand('echo "hello world"'); - let output = ''; childProcess.onOutput((chunk) => { output += chunk; }); childProcess.onExit(() => { - expect(output.trim()).toContain('hello world'); - done(); + try { + expect(output.trim()).toContain('hello world'); + } finally { + done(); + } }); }); + it('should get results', async () => { + const childProcess = terminal.runCommand('echo "hello world"'); + + const results = await childProcess.getResults(); + + expect(results.code).toEqual(0); + expect(results.terminalOutput).toContain('hello world'); + const childProcess2 = terminal.runCommand('echo "hello jason"'); + + const results2 = await childProcess2.getResults(); + + expect(results2.code).toEqual(0); + expect(results2.terminalOutput).toContain('hello jason'); + }); + if (process.env.CI !== 'true') { it('should be tty', (done) => { const childProcess = terminal.runCommand( @@ -56,17 +74,12 @@ describe('PseudoTerminal', () => { } it('should run multiple commands', async () => { - function runCommand() { - return new Promise((res) => { - const cp1 = terminal.runCommand('whoami', {}); - - cp1.onExit(res); - }); - } - let i = 0; while (i < 10) { - await runCommand(); + const childProcess = terminal.runCommand('whoami', {}); + + await childProcess.getResults(); + i++; } }); diff --git a/packages/nx/src/tasks-runner/pseudo-terminal.ts b/packages/nx/src/tasks-runner/pseudo-terminal.ts index fc3a1bdee4..2e41d20157 100644 --- a/packages/nx/src/tasks-runner/pseudo-terminal.ts +++ b/packages/nx/src/tasks-runner/pseudo-terminal.ts @@ -138,15 +138,32 @@ export class PseudoTerminal { export class PseudoTtyProcess { isAlive = true; - exitCallbacks = []; + private exitCallbacks: Array<(code: number) => void> = []; + private outputCallbacks: Array<(output: string) => void> = []; + + private terminalOutput = ''; constructor(private childProcess: ChildProcess) { + childProcess.onOutput((output) => { + this.terminalOutput += output; + this.outputCallbacks.forEach((cb) => cb(output)); + }); + childProcess.onExit((message) => { this.isAlive = false; - const exitCode = messageToCode(message); + const code = messageToCode(message); + childProcess.cleanup(); - this.exitCallbacks.forEach((cb) => cb(exitCode)); + this.exitCallbacks.forEach((cb) => cb(code)); + }); + } + + async getResults(): Promise<{ code: number; terminalOutput: string }> { + return new Promise((res) => { + this.onExit((code) => { + res({ code, terminalOutput: this.terminalOutput }); + }); }); } @@ -155,17 +172,17 @@ export class PseudoTtyProcess { } onOutput(callback: (message: string) => void): void { - this.childProcess.onOutput(callback); + this.outputCallbacks.push(callback); } kill(): void { - try { - this.childProcess.kill(); - } catch { - // when the child process completes before we explicitly call kill, this will throw - // do nothing - } finally { - if (this.isAlive == true) { + if (this.isAlive) { + try { + this.childProcess.kill(); + } catch { + // when the child process completes before we explicitly call kill, this will throw + // do nothing + } finally { this.isAlive = false; } } diff --git a/packages/nx/src/tasks-runner/running-tasks/batch-process.ts b/packages/nx/src/tasks-runner/running-tasks/batch-process.ts new file mode 100644 index 0000000000..22899eb266 --- /dev/null +++ b/packages/nx/src/tasks-runner/running-tasks/batch-process.ts @@ -0,0 +1,84 @@ +import { + BatchMessage, + BatchMessageType, + BatchResults, +} from '../batch/batch-messages'; +import { ChildProcess, Serializable } from 'child_process'; +import { signalToCode } from '../../utils/exit-codes'; + +export class BatchProcess { + private exitCallbacks: Array<(code: number) => void> = []; + private resultsCallbacks: Array<(results: BatchResults) => void> = []; + + constructor( + private childProcess: ChildProcess, + private executorName: string + ) { + this.childProcess.on('message', (message: BatchMessage) => { + switch (message.type) { + case BatchMessageType.CompleteBatchExecution: { + for (const cb of this.resultsCallbacks) { + cb(message.results); + } + break; + } + case BatchMessageType.RunTasks: { + break; + } + default: { + // Re-emit any non-batch messages from the task process + if (process.send) { + process.send(message); + } + } + } + }); + + this.childProcess.once('exit', (code, signal) => { + if (code === null) code = signalToCode(signal); + + for (const cb of this.exitCallbacks) { + cb(code); + } + }); + } + + onExit(cb: (code: number) => void) { + this.exitCallbacks.push(cb); + } + + onResults(cb: (results: BatchResults) => void) { + this.resultsCallbacks.push(cb); + } + + async getResults(): Promise { + return Promise.race([ + new Promise((_, rej) => { + this.onExit((code) => { + if (code !== 0) { + rej( + new Error( + `"${this.executorName}" exited unexpectedly with code: ${code}` + ) + ); + } + }); + }), + new Promise((res) => { + this.onResults(res); + }), + ]); + } + + send(message: Serializable): void { + if (this.childProcess.connected) { + this.childProcess.send(message); + } + } + + kill(signal?: NodeJS.Signals | number): void { + if (this.childProcess.connected) { + this.childProcess.kill(signal); + } + } +} diff --git a/packages/nx/src/tasks-runner/running-tasks/node-child-process.ts b/packages/nx/src/tasks-runner/running-tasks/node-child-process.ts new file mode 100644 index 0000000000..1d63895244 --- /dev/null +++ b/packages/nx/src/tasks-runner/running-tasks/node-child-process.ts @@ -0,0 +1,215 @@ +import { ChildProcess, Serializable } from 'child_process'; +import { signalToCode } from '../../utils/exit-codes'; +import { RunningTask } from './running-task'; +import { Transform } from 'stream'; +import * as chalk from 'chalk'; +import { readFileSync } from 'fs'; + +export class NodeChildProcessWithNonDirectOutput implements RunningTask { + private terminalOutput: string = ''; + private exitCallbacks: Array<(code: number, terminalOutput: string) => void> = + []; + + constructor( + private childProcess: ChildProcess, + { streamOutput, prefix }: { streamOutput: boolean; prefix: string } + ) { + if (streamOutput) { + if (process.env.NX_PREFIX_OUTPUT === 'true') { + const color = getColor(prefix); + const prefixText = `${prefix}:`; + + this.childProcess.stdout + .pipe(logClearLineToPrefixTransformer(color.bold(prefixText) + ' ')) + .pipe(addPrefixTransformer(color.bold(prefixText))) + .pipe(process.stdout); + this.childProcess.stderr + .pipe(logClearLineToPrefixTransformer(color(prefixText) + ' ')) + .pipe(addPrefixTransformer(color(prefixText))) + .pipe(process.stderr); + } else { + this.childProcess.stdout + .pipe(addPrefixTransformer()) + .pipe(process.stdout); + this.childProcess.stderr + .pipe(addPrefixTransformer()) + .pipe(process.stderr); + } + } + + this.childProcess.on('exit', (code, signal) => { + if (code === null) code = signalToCode(signal); + for (const cb of this.exitCallbacks) { + cb(code, this.terminalOutput); + } + }); + + // Re-emit any messages from the task process + this.childProcess.on('message', (message) => { + if (process.send) { + process.send(message); + } + }); + + this.childProcess.stdout.on('data', (chunk) => { + this.terminalOutput += chunk.toString(); + }); + this.childProcess.stderr.on('data', (chunk) => { + this.terminalOutput += chunk.toString(); + }); + } + + onExit(cb: (code: number, terminalOutput: string) => void) { + this.exitCallbacks.push(cb); + } + + async getResults(): Promise<{ code: number; terminalOutput: string }> { + return new Promise((res) => { + this.onExit((code, terminalOutput) => { + res({ code, terminalOutput }); + }); + }); + } + + send(message: Serializable): void { + if (this.childProcess.connected) { + this.childProcess.send(message); + } + } + + public kill(signal?: NodeJS.Signals | number) { + if (this.childProcess.connected) { + this.childProcess.kill(signal); + } + } +} + +function addPrefixTransformer(prefix?: string) { + const newLineSeparator = process.platform.startsWith('win') ? '\r\n' : '\n'; + return new Transform({ + transform(chunk, _encoding, callback) { + const list = chunk.toString().split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g); + list + .filter(Boolean) + .forEach((m) => + this.push( + prefix ? prefix + ' ' + m + newLineSeparator : m + newLineSeparator + ) + ); + callback(); + }, + }); +} + +const colors = [ + chalk.green, + chalk.greenBright, + chalk.red, + chalk.redBright, + chalk.cyan, + chalk.cyanBright, + chalk.yellow, + chalk.yellowBright, + chalk.magenta, + chalk.magentaBright, +]; + +function getColor(projectName: string) { + let code = 0; + for (let i = 0; i < projectName.length; ++i) { + code += projectName.charCodeAt(i); + } + const colorIndex = code % colors.length; + + return colors[colorIndex]; +} + +/** + * Prevents terminal escape sequence from clearing line prefix. + */ +function logClearLineToPrefixTransformer(prefix: string) { + let prevChunk = null; + return new Transform({ + transform(chunk, _encoding, callback) { + if (prevChunk && prevChunk.toString() === '\x1b[2K') { + chunk = chunk.toString().replace(/\x1b\[1G/g, (m) => m + prefix); + } + this.push(chunk); + prevChunk = chunk; + callback(); + }, + }); +} + +export class NodeChildProcessWithDirectOutput implements RunningTask { + private terminalOutput: string | undefined; + private exitCallbacks: Array<(code: number, signal: string) => void> = []; + + private exited = false; + private exitCode: number; + + constructor( + private childProcess: ChildProcess, + private temporaryOutputPath: string + ) { + // Re-emit any messages from the task process + this.childProcess.on('message', (message) => { + if (process.send) { + process.send(message); + } + }); + + this.childProcess.on('exit', (code, signal) => { + if (code === null) code = signalToCode(signal); + + this.exited = true; + this.exitCode = code; + + for (const cb of this.exitCallbacks) { + cb(code, signal); + } + }); + } + + send(message: Serializable): void { + if (this.childProcess.connected) { + this.childProcess.send(message); + } + } + + onExit(cb: (code: number, signal: NodeJS.Signals) => void) { + this.exitCallbacks.push(cb); + } + + async getResults(): Promise<{ code: number; terminalOutput: string }> { + const terminalOutput = this.getTerminalOutput(); + if (this.exited) { + return Promise.resolve({ + code: this.exitCode, + terminalOutput, + }); + } + await this.waitForExit(); + return Promise.resolve({ + code: this.exitCode, + terminalOutput, + }); + } + + waitForExit() { + return new Promise((res) => { + this.onExit(() => res()); + }); + } + + getTerminalOutput() { + this.terminalOutput ??= readFileSync(this.temporaryOutputPath).toString(); + return this.terminalOutput; + } + + kill(signal?: NodeJS.Signals | number): void { + if (this.childProcess.connected) { + this.childProcess.kill(signal); + } + } +} diff --git a/packages/nx/src/tasks-runner/running-tasks/noop-child-process.ts b/packages/nx/src/tasks-runner/running-tasks/noop-child-process.ts new file mode 100644 index 0000000000..173acc121d --- /dev/null +++ b/packages/nx/src/tasks-runner/running-tasks/noop-child-process.ts @@ -0,0 +1,20 @@ +import { Serializable } from 'child_process'; +import { RunningTask } from './running-task'; + +export class NoopChildProcess implements RunningTask { + constructor(private results: { code: number; terminalOutput: string }) {} + + send(): void {} + + async getResults(): Promise<{ code: number; terminalOutput: string }> { + return this.results; + } + + kill(): void { + return; + } + + onExit(cb: (code: number) => void): void { + cb(this.results.code); + } +} diff --git a/packages/nx/src/tasks-runner/running-tasks/running-task.ts b/packages/nx/src/tasks-runner/running-tasks/running-task.ts new file mode 100644 index 0000000000..7355d16edb --- /dev/null +++ b/packages/nx/src/tasks-runner/running-tasks/running-task.ts @@ -0,0 +1,7 @@ +export abstract class RunningTask { + abstract getResults(): Promise<{ code: number; terminalOutput: string }>; + + abstract onExit(cb: (code: number) => void): void; + + abstract kill(signal?: NodeJS.Signals | number): Promise | void; +} diff --git a/packages/nx/src/tasks-runner/task-orchestrator.ts b/packages/nx/src/tasks-runner/task-orchestrator.ts index d343a42208..de8fca6ae7 100644 --- a/packages/nx/src/tasks-runner/task-orchestrator.ts +++ b/packages/nx/src/tasks-runner/task-orchestrator.ts @@ -3,7 +3,7 @@ import { performance } from 'perf_hooks'; import { relative } from 'path'; import { writeFileSync } from 'fs'; import { TaskHasher } from '../hasher/task-hasher'; -import runCommandsImpl from '../executors/run-commands/run-commands.impl'; +import { runCommands } from '../executors/run-commands/run-commands.impl'; import { ForkedProcessTaskRunner } from './forked-process-task-runner'; import { Cache, DbCache, dbCacheEnabled, getCache } from './cache'; import { DefaultTasksRunnerOptions } from './default-tasks-runner'; @@ -33,6 +33,9 @@ import { output } from '../utils/output'; import { combineOptionsForExecutor } from '../utils/params'; import { NxJsonConfiguration } from '../config/nx-json'; import type { TaskDetails } from '../native'; +import { NoopChildProcess } from './running-tasks/noop-child-process'; +import { RunningTask } from './running-tasks/running-task'; +import { NxArgs } from '../utils/command-line-utils'; export class TaskOrchestrator { private taskDetails: TaskDetails | null = getTaskDetails(); @@ -64,6 +67,9 @@ export class TaskOrchestrator { private bailed = false; + private runningContinuousTasks = new Map(); + + private cleaningUp = false; // endregion internal state constructor( @@ -72,7 +78,7 @@ export class TaskOrchestrator { private readonly projectGraph: ProjectGraph, private readonly taskGraph: TaskGraph, private readonly nxJson: NxJsonConfiguration, - private readonly options: DefaultTasksRunnerOptions, + private readonly options: NxArgs & DefaultTasksRunnerOptions, private readonly bail: boolean, private readonly daemon: DaemonClient, private readonly outputStyle: string @@ -91,13 +97,17 @@ export class TaskOrchestrator { performance.mark('task-execution:start'); + const threadCount = + this.options.parallel + + Object.values(this.taskGraph.tasks).filter((t) => t.continuous).length; + const threads = []; - process.stdout.setMaxListeners(this.options.parallel + defaultMaxListeners); - process.stderr.setMaxListeners(this.options.parallel + defaultMaxListeners); + process.stdout.setMaxListeners(threadCount + defaultMaxListeners); + process.stderr.setMaxListeners(threadCount + defaultMaxListeners); // initial seeding of the queue - for (let i = 0; i < this.options.parallel; ++i) { + for (let i = 0; i < threadCount; ++i) { threads.push(this.executeNextBatchOfTasksUsingTaskSchedule()); } await Promise.all(threads); @@ -110,6 +120,8 @@ export class TaskOrchestrator { ); this.cache.removeOldCacheRecords(); + await this.cleanup(); + return this.completedTasks; } @@ -139,7 +151,11 @@ export class TaskOrchestrator { if (task) { const groupId = this.closeGroup(); - await this.applyFromCacheOrRunTask(doNotSkipCache, task, groupId); + if (task.continuous) { + await this.startContinuousTask(task, groupId); + } else { + await this.applyFromCacheOrRunTask(doNotSkipCache, task, groupId); + } this.openGroup(groupId); @@ -325,12 +341,14 @@ export class TaskOrchestrator { private async runBatch(batch: Batch, env: NodeJS.ProcessEnv) { try { - const results = await this.forkedProcessTaskRunner.forkProcessForBatch( - batch, - this.projectGraph, - this.taskGraph, - env - ); + const batchProcess = + await this.forkedProcessTaskRunner.forkProcessForBatch( + batch, + this.projectGraph, + this.taskGraph, + env + ); + const results = await batchProcess.getResults(); const batchResultEntries = Object.entries(results); return batchResultEntries.map(([taskId, result]) => ({ ...result, @@ -401,104 +419,116 @@ export class TaskOrchestrator { // the task wasn't cached if (results.length === 0) { - const shouldPrefix = - streamOutput && process.env.NX_PREFIX_OUTPUT === 'true'; - const targetConfiguration = getTargetConfigurationForTask( + const childProcess = await this.runTask( task, - this.projectGraph + streamOutput, + env, + temporaryOutputPath, + pipeOutput ); - if ( - process.env.NX_RUN_COMMANDS_DIRECTLY !== 'false' && - targetConfiguration.executor === 'nx:run-commands' && - !shouldPrefix - ) { - try { - const { schema } = getExecutorForTask(task, this.projectGraph); - const isRunOne = this.initiatingProject != null; - const combinedOptions = combineOptionsForExecutor( - task.overrides, - task.target.configuration ?? - targetConfiguration.defaultConfiguration, - targetConfiguration, - schema, - task.target.project, - relative(task.projectRoot ?? workspaceRoot, process.cwd()), - process.env.NX_VERBOSE_LOGGING === 'true' - ); - if (combinedOptions.env) { - env = { - ...env, - ...combinedOptions.env, - }; - } - if (streamOutput) { - const args = getPrintableCommandArgsForTask(task); - output.logCommand(args.join(' ')); - } - const { success, terminalOutput } = await runCommandsImpl( - { - ...combinedOptions, - env, - usePty: isRunOne && !this.tasksSchedule.hasTasks(), - streamOutput, - }, - { - root: workspaceRoot, // only root is needed in runCommandsImpl - } as any - ); - const status = success ? 'success' : 'failure'; + const { code, terminalOutput } = await childProcess.getResults(); + results.push({ + task, + status: code === 0 ? 'success' : 'failure', + terminalOutput, + }); + } + await this.postRunSteps([task], results, doNotSkipCache, { groupId }); + } + + private async runTask( + task: Task, + streamOutput: boolean, + env: { [p: string]: string | undefined; TZ?: string }, + temporaryOutputPath: string, + pipeOutput: boolean + ): Promise { + const shouldPrefix = + streamOutput && process.env.NX_PREFIX_OUTPUT === 'true'; + const targetConfiguration = getTargetConfigurationForTask( + task, + this.projectGraph + ); + if ( + process.env.NX_RUN_COMMANDS_DIRECTLY !== 'false' && + targetConfiguration.executor === 'nx:run-commands' && + !shouldPrefix + ) { + try { + const { schema } = getExecutorForTask(task, this.projectGraph); + const isRunOne = this.initiatingProject != null; + const combinedOptions = combineOptionsForExecutor( + task.overrides, + task.target.configuration ?? targetConfiguration.defaultConfiguration, + targetConfiguration, + schema, + task.target.project, + relative(task.projectRoot ?? workspaceRoot, process.cwd()), + process.env.NX_VERBOSE_LOGGING === 'true' + ); + if (combinedOptions.env) { + env = { + ...env, + ...combinedOptions.env, + }; + } + if (streamOutput) { + const args = getPrintableCommandArgsForTask(task); + output.logCommand(args.join(' ')); + } + const runningTask = await runCommands( + { + ...combinedOptions, + env, + usePty: + isRunOne && + !this.tasksSchedule.hasTasks() && + this.runningContinuousTasks.size === 0, + streamOutput, + }, + { + root: workspaceRoot, // only root is needed in runCommands + } as any + ); + + runningTask.onExit((code, terminalOutput) => { if (!streamOutput) { this.options.lifeCycle.printTaskTerminalOutput( task, - status, + code === 0 ? 'success' : 'failure', terminalOutput ); + writeFileSync(temporaryOutputPath, terminalOutput); } - writeFileSync(temporaryOutputPath, terminalOutput); - results.push({ - task, - status, - terminalOutput, - }); - } catch (e) { - if (process.env.NX_VERBOSE_LOGGING === 'true') { - console.error(e); - } else { - console.error(e.message); - } - const terminalOutput = e.stack ?? e.message ?? ''; - writeFileSync(temporaryOutputPath, terminalOutput); - results.push({ - task, - status: 'failure', - terminalOutput, - }); + }); + + return runningTask; + } catch (e) { + if (process.env.NX_VERBOSE_LOGGING === 'true') { + console.error(e); + } else { + console.error(e.message); } - } else if (targetConfiguration.executor === 'nx:noop') { - writeFileSync(temporaryOutputPath, ''); - results.push({ - task, - status: 'success', - terminalOutput: '', - }); - } else { - // cache prep - const { code, terminalOutput } = await this.runTaskInForkedProcess( - task, - env, - pipeOutput, - temporaryOutputPath, - streamOutput - ); - results.push({ - task, - status: code === 0 ? 'success' : 'failure', - terminalOutput, - }); + const terminalOutput = e.stack ?? e.message ?? ''; + writeFileSync(temporaryOutputPath, terminalOutput); } + } else if (targetConfiguration.executor === 'nx:noop') { + writeFileSync(temporaryOutputPath, ''); + return new NoopChildProcess({ + code: 0, + terminalOutput: '', + }); + } else { + // cache prep + return await this.runTaskInForkedProcess( + task, + env, + pipeOutput, + temporaryOutputPath, + streamOutput + ); } - await this.postRunSteps([task], results, doNotSkipCache, { groupId }); } private async runTaskInForkedProcess( @@ -511,10 +541,10 @@ export class TaskOrchestrator { try { const usePtyFork = process.env.NX_NATIVE_COMMAND_RUNNER !== 'false'; - // Disable the pseudo terminal if this is a run-many - const disablePseudoTerminal = !this.initiatingProject; + // Disable the pseudo terminal if this is a run-many or when running a continuous task as part of a run-one + const disablePseudoTerminal = !this.initiatingProject || task.continuous; // execution - const { code, terminalOutput } = usePtyFork + const childProcess = usePtyFork ? await this.forkedProcessTaskRunner.forkProcess(task, { temporaryOutputPath, streamOutput, @@ -531,20 +561,86 @@ export class TaskOrchestrator { env, }); - return { - code, - terminalOutput, - }; + return childProcess; } catch (e) { if (process.env.NX_VERBOSE_LOGGING === 'true') { console.error(e); } - return { + return new NoopChildProcess({ code: 1, - }; + terminalOutput: undefined, + }); } } + private async startContinuousTask(task: Task, groupId: number) { + const taskSpecificEnv = await this.processedTasks.get(task.id); + await this.preRunSteps([task], { groupId }); + + const pipeOutput = await this.pipeOutputCapture(task); + // obtain metadata + const temporaryOutputPath = this.cache.temporaryOutputPath(task); + const streamOutput = + this.outputStyle === 'static' + ? false + : shouldStreamOutput(task, this.initiatingProject); + + let env = pipeOutput + ? getEnvVariablesForTask( + task, + taskSpecificEnv, + process.env.FORCE_COLOR === undefined + ? 'true' + : process.env.FORCE_COLOR, + this.options.skipNxCache, + this.options.captureStderr, + null, + null + ) + : getEnvVariablesForTask( + task, + taskSpecificEnv, + undefined, + this.options.skipNxCache, + this.options.captureStderr, + temporaryOutputPath, + streamOutput + ); + const childProcess = await this.runTask( + task, + streamOutput, + env, + temporaryOutputPath, + pipeOutput + ); + this.runningContinuousTasks.set(task.id, childProcess); + + childProcess.onExit((code) => { + if (!this.cleaningUp) { + console.error( + `Task "${task.id}" is continuous but exited with code ${code}` + ); + this.cleanup().then(() => { + process.exit(1); + }); + } + }); + if ( + this.initiatingProject === task.target.project && + this.options.targets.length === 1 && + this.options.targets[0] === task.target.target + ) { + await childProcess.getResults(); + } else { + await this.tasksSchedule.scheduleNextTasks(); + // release blocked threads + this.waitingForTasks.forEach((f) => f(null)); + this.waitingForTasks.length = 0; + } + + return childProcess; + } + // endregion Single Task // region Lifecycle @@ -731,4 +827,17 @@ export class TaskOrchestrator { } // endregion utils + + private async cleanup() { + this.cleaningUp = true; + await Promise.all( + Array.from(this.runningContinuousTasks).map(async ([taskId, t]) => { + try { + return t.kill(); + } catch (e) { + console.error(`Unable to terminate ${taskId}\nError:`, e); + } + }) + ); + } } diff --git a/packages/nx/src/tasks-runner/tasks-schedule.spec.ts b/packages/nx/src/tasks-runner/tasks-schedule.spec.ts index ed4fcd07cc..b4f481ed5c 100644 --- a/packages/nx/src/tasks-runner/tasks-schedule.spec.ts +++ b/packages/nx/src/tasks-runner/tasks-schedule.spec.ts @@ -18,6 +18,7 @@ function createMockTask(id: string, parallelism: boolean = true): Task { outputs: [], overrides: {}, parallelism, + continuous: false, }; } @@ -65,6 +66,11 @@ describe('TasksSchedule', () => { 'app2:build': [], 'lib1:build': [], }, + continuousDependencies: { + 'app1:build': [], + 'app2:build': [], + 'lib1:build': [], + }, roots: ['lib1:build', 'app2:build'], }; jest.spyOn(nxJsonUtils, 'readNxJson').mockReturnValue({}); @@ -274,6 +280,13 @@ describe('TasksSchedule', () => { 'app4:test': [], 'lib1:test': [], }, + continuousDependencies: { + 'app1:test': [], + 'app2:test': [], + 'app3:test': [], + 'app4:test': [], + 'lib1:test': [], + }, roots: [ 'app1:test', 'app2:test', @@ -552,6 +565,11 @@ describe('TasksSchedule', () => { 'app2:build': [], 'lib1:build': [], }, + continuousDependencies: { + 'app1:build': [], + 'app2:build': [], + 'lib1:build': [], + }, roots: ['lib1:build', 'app2:build'], }; jest.spyOn(nxJsonUtils, 'readNxJson').mockReturnValue({}); @@ -719,6 +737,11 @@ describe('TasksSchedule', () => { 'app2:test': [], 'lib1:test': [], }, + continuousDependencies: { + 'app1:test': [], + 'app2:test': [], + 'lib1:test': [], + }, roots: ['app1:test', 'app2:test', 'lib1:test'], }; jest.spyOn(nxJsonUtils, 'readNxJson').mockReturnValue({}); diff --git a/packages/nx/src/tasks-runner/tasks-schedule.ts b/packages/nx/src/tasks-runner/tasks-schedule.ts index 3374c53631..d7df4e981a 100644 --- a/packages/nx/src/tasks-runner/tasks-schedule.ts +++ b/packages/nx/src/tasks-runner/tasks-schedule.ts @@ -212,12 +212,15 @@ export class TasksSchedule { ({ tasks: {}, dependencies: {}, + continuousDependencies: {}, roots: [], } as TaskGraph)); batch.tasks[task.id] = task; batch.dependencies[task.id] = this.notScheduledTaskGraph.dependencies[task.id]; + batch.continuousDependencies[task.id] = + this.notScheduledTaskGraph.continuousDependencies[task.id]; if (isRoot) { batch.roots.push(task.id); } @@ -251,9 +254,13 @@ export class TasksSchedule { const hasDependenciesCompleted = this.taskGraph.dependencies[taskId].every( (id) => this.completedTasks.has(id) ); + const hasContinuousDependenciesStarted = + this.taskGraph.continuousDependencies[taskId].every((id) => + this.runningTasks.has(id) + ); // if dependencies have not completed, cannot schedule - if (!hasDependenciesCompleted) { + if (!hasDependenciesCompleted || !hasContinuousDependenciesStarted) { return false; } diff --git a/packages/nx/src/tasks-runner/utils.ts b/packages/nx/src/tasks-runner/utils.ts index 660c054b39..fd74a06e05 100644 --- a/packages/nx/src/tasks-runner/utils.ts +++ b/packages/nx/src/tasks-runner/utils.ts @@ -451,18 +451,20 @@ export function removeTasksFromTaskGraph( graph: TaskGraph, ids: string[] ): TaskGraph { - const newGraph = removeIdsFromGraph(graph, ids, graph.tasks); + const newGraph = removeIdsFromTaskGraph(graph, ids, graph.tasks); return { dependencies: newGraph.dependencies, + continuousDependencies: newGraph.continuousDependencies, roots: newGraph.roots, tasks: newGraph.mapWithIds, }; } -export function removeIdsFromGraph( +function removeIdsFromTaskGraph( graph: { roots: string[]; dependencies: Record; + continuousDependencies: Record; }, ids: string[], mapWithIds: Record @@ -470,9 +472,11 @@ export function removeIdsFromGraph( mapWithIds: Record; roots: string[]; dependencies: Record; + continuousDependencies: Record; } { const filteredMapWithIds = {}; const dependencies = {}; + const continuousDependencies = {}; const removedSet = new Set(ids); for (let id of Object.keys(mapWithIds)) { if (!removedSet.has(id)) { @@ -480,13 +484,18 @@ export function removeIdsFromGraph( dependencies[id] = graph.dependencies[id].filter( (depId) => !removedSet.has(depId) ); + continuousDependencies[id] = graph.continuousDependencies[id].filter( + (depId) => !removedSet.has(depId) + ); } } return { mapWithIds: filteredMapWithIds, dependencies: dependencies, - roots: Object.keys(dependencies).filter( - (k) => dependencies[k].length === 0 + continuousDependencies, + roots: Object.keys(filteredMapWithIds).filter( + (k) => + dependencies[k].length === 0 && continuousDependencies[k].length === 0 ), }; } @@ -505,6 +514,12 @@ export function calculateReverseDeps( }); }); + Object.keys(taskGraph.continuousDependencies).forEach((taskId) => { + taskGraph.continuousDependencies[taskId].forEach((d) => { + reverseTaskDeps[d].push(taskId); + }); + }); + return reverseTaskDeps; }