feat(core): introduce continuous tasks (#29750)
<!-- Please make sure you have read the submission guidelines before posting an PR --> <!-- https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr --> <!-- Please make sure that your commit message follows our format --> <!-- Example: `fix(nx): must begin with lowercase` --> <!-- If this is a particularly complex change or feature addition, you can request a dedicated Nx release for this pull request branch. Mention someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they will confirm if the PR warrants its own release for testing purposes, and generate it for you if appropriate. --> An RFC about this feature is happening here: #29025. This has the most information about this feature. <!-- This is the behavior we have today --> Nx currently does not explicitly handle tasks which run continuously until they are terminated. <!-- This is the behavior we should expect with the changes in this PR --> This PR adds the initial support for continuous tasks which run continuously until they are terminated. This adds the ability to depend on continuous tasks. There is some more work to be done but this will be enough as an MVP. <!-- Please link the issue being fixed so it gets closed when this is merged. --> Fixes #
This commit is contained in:
parent
0082d939eb
commit
f326bfe52e
@ -15,6 +15,7 @@ Target's configuration
|
||||
- [cache](../../devkit/documents/TargetConfiguration#cache): boolean
|
||||
- [command](../../devkit/documents/TargetConfiguration#command): string
|
||||
- [configurations](../../devkit/documents/TargetConfiguration#configurations): Object
|
||||
- [continuous](../../devkit/documents/TargetConfiguration#continuous): boolean
|
||||
- [defaultConfiguration](../../devkit/documents/TargetConfiguration#defaultconfiguration): string
|
||||
- [dependsOn](../../devkit/documents/TargetConfiguration#dependson): (string | TargetDependencyConfig)[]
|
||||
- [executor](../../devkit/documents/TargetConfiguration#executor): string
|
||||
@ -55,6 +56,14 @@ Sets of options
|
||||
|
||||
---
|
||||
|
||||
### continuous
|
||||
|
||||
• `Optional` **continuous**: `boolean`
|
||||
|
||||
Whether this target runs continuously
|
||||
|
||||
---
|
||||
|
||||
### defaultConfiguration
|
||||
|
||||
• `Optional` **defaultConfiguration**: `string`
|
||||
|
||||
@ -7,6 +7,7 @@ A representation of the invocation of an Executor
|
||||
### Properties
|
||||
|
||||
- [cache](../../devkit/documents/Task#cache): boolean
|
||||
- [continuous](../../devkit/documents/Task#continuous): boolean
|
||||
- [endTime](../../devkit/documents/Task#endtime): number
|
||||
- [hash](../../devkit/documents/Task#hash): string
|
||||
- [hashDetails](../../devkit/documents/Task#hashdetails): Object
|
||||
@ -28,6 +29,14 @@ Determines if a given task should be cacheable.
|
||||
|
||||
---
|
||||
|
||||
### continuous
|
||||
|
||||
• `Optional` **continuous**: `boolean`
|
||||
|
||||
This denotes if the task runs continuously
|
||||
|
||||
---
|
||||
|
||||
### endTime
|
||||
|
||||
• `Optional` **endTime**: `number`
|
||||
|
||||
@ -6,12 +6,19 @@ Graph of Tasks to be executed
|
||||
|
||||
### Properties
|
||||
|
||||
- [continuousDependencies](../../devkit/documents/TaskGraph#continuousdependencies): Record<string, string[]>
|
||||
- [dependencies](../../devkit/documents/TaskGraph#dependencies): Record<string, string[]>
|
||||
- [roots](../../devkit/documents/TaskGraph#roots): string[]
|
||||
- [tasks](../../devkit/documents/TaskGraph#tasks): Record<string, Task>
|
||||
|
||||
## Properties
|
||||
|
||||
### continuousDependencies
|
||||
|
||||
• **continuousDependencies**: `Record`\<`string`, `string`[]\>
|
||||
|
||||
---
|
||||
|
||||
### dependencies
|
||||
|
||||
• **dependencies**: `Record`\<`string`, `string`[]\>
|
||||
|
||||
@ -429,6 +429,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
'lib3:build': [],
|
||||
'lib4:build': [],
|
||||
},
|
||||
continuousDependencies: {},
|
||||
roots: [],
|
||||
tasks: {
|
||||
'lib1:build': {
|
||||
@ -437,6 +438,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib1', target: 'build' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib2:build': {
|
||||
id: 'lib2:build',
|
||||
@ -444,6 +446,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib2', target: 'build' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib2:build-base': {
|
||||
id: 'lib2:build-base',
|
||||
@ -451,6 +454,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib2', target: 'build-base' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib3:build': {
|
||||
id: 'lib3:build',
|
||||
@ -458,6 +462,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib3', target: 'build' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib4:build': {
|
||||
id: 'lib4:build',
|
||||
@ -465,6 +470,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib4', target: 'build' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
};
|
||||
@ -604,6 +610,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
'lib4:build': ['lib4:build-base'],
|
||||
'lib4:build-base': [],
|
||||
},
|
||||
continuousDependencies: {},
|
||||
roots: [],
|
||||
tasks: {
|
||||
'lib1:build': {
|
||||
@ -612,6 +619,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib1', target: 'build' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib1:build-base': {
|
||||
id: 'lib1:build-base',
|
||||
@ -619,6 +627,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib1', target: 'build-base' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib2:build': {
|
||||
id: 'lib2:build',
|
||||
@ -626,6 +635,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib2', target: 'build' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib2:build-base': {
|
||||
id: 'lib2:build-base',
|
||||
@ -633,6 +643,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib2', target: 'build-base' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib3:build': {
|
||||
id: 'lib3:build',
|
||||
@ -640,6 +651,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib3', target: 'build' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib3:build-base': {
|
||||
id: 'lib3:build-base',
|
||||
@ -647,6 +659,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib3', target: 'build-base' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib4:build': {
|
||||
id: 'lib4:build',
|
||||
@ -654,6 +667,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib4', target: 'build' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib4:build-base': {
|
||||
id: 'lib4:build-base',
|
||||
@ -661,6 +675,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
target: { project: 'lib4', target: 'build-base' },
|
||||
outputs: [],
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
};
|
||||
@ -752,6 +767,7 @@ describe('calculateDependenciesFromTaskGraph', () => {
|
||||
// not relevant for this test case
|
||||
const taskGraph: TaskGraph = {
|
||||
dependencies: {},
|
||||
continuousDependencies: {},
|
||||
roots: [],
|
||||
tasks: {},
|
||||
};
|
||||
|
||||
@ -66,6 +66,7 @@
|
||||
"string-width": "^4.2.3",
|
||||
"tar-stream": "~2.2.0",
|
||||
"tmp": "~0.2.1",
|
||||
"tree-kill": "^1.2.2",
|
||||
"tsconfig-paths": "^4.1.2",
|
||||
"tslib": "^2.3.0",
|
||||
"yaml": "^2.6.0",
|
||||
|
||||
@ -980,6 +980,7 @@ function getAllTaskGraphsForWorkspace(projectGraph: ProjectGraph): {
|
||||
taskGraphs[taskId] = {
|
||||
tasks: {},
|
||||
dependencies: {},
|
||||
continuousDependencies: {},
|
||||
roots: [],
|
||||
};
|
||||
|
||||
@ -1006,6 +1007,7 @@ function getAllTaskGraphsForWorkspace(projectGraph: ProjectGraph): {
|
||||
taskGraphs[taskId] = {
|
||||
tasks: {},
|
||||
dependencies: {},
|
||||
continuousDependencies: {},
|
||||
roots: [],
|
||||
};
|
||||
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
import { ProjectGraph, ProjectGraphProjectNode } from '../config/project-graph';
|
||||
import { removeIdsFromGraph } from '../tasks-runner/utils';
|
||||
import { NxArgs } from '../utils/command-line-utils';
|
||||
import { CommandGraph } from './command-graph';
|
||||
import { createCommandGraph } from './create-command-graph';
|
||||
@ -34,3 +33,35 @@ function getSortedProjects(
|
||||
|
||||
return getSortedProjects(newGraph, sortedProjects);
|
||||
}
|
||||
|
||||
function removeIdsFromGraph<T>(
|
||||
graph: {
|
||||
roots: string[];
|
||||
dependencies: Record<string, string[]>;
|
||||
},
|
||||
ids: string[],
|
||||
mapWithIds: Record<string, T>
|
||||
): {
|
||||
mapWithIds: Record<string, T>;
|
||||
roots: string[];
|
||||
dependencies: Record<string, string[]>;
|
||||
} {
|
||||
const filteredMapWithIds = {};
|
||||
const dependencies = {};
|
||||
const removedSet = new Set(ids);
|
||||
for (let id of Object.keys(mapWithIds)) {
|
||||
if (!removedSet.has(id)) {
|
||||
filteredMapWithIds[id] = mapWithIds[id];
|
||||
dependencies[id] = graph.dependencies[id].filter(
|
||||
(depId) => !removedSet.has(depId)
|
||||
);
|
||||
}
|
||||
}
|
||||
return {
|
||||
mapWithIds: filteredMapWithIds,
|
||||
dependencies: dependencies,
|
||||
roots: Object.keys(dependencies).filter(
|
||||
(k) => dependencies[k].length === 0
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
@ -81,6 +81,11 @@ export interface Task {
|
||||
* Determines if a given task should be parallelizable.
|
||||
*/
|
||||
parallelism: boolean;
|
||||
|
||||
/**
|
||||
* This denotes if the task runs continuously
|
||||
*/
|
||||
continuous?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -99,4 +104,6 @@ export interface TaskGraph {
|
||||
* Map of Task IDs to IDs of tasks which the task depends on
|
||||
*/
|
||||
dependencies: Record<string, string[]>;
|
||||
|
||||
continuousDependencies: Record<string, string[]>;
|
||||
}
|
||||
|
||||
@ -269,6 +269,11 @@ export interface TargetConfiguration<T = any> {
|
||||
*/
|
||||
parallelism?: boolean;
|
||||
|
||||
/**
|
||||
* Whether this target runs continuously
|
||||
*/
|
||||
continuous?: boolean;
|
||||
|
||||
/**
|
||||
* List of generators to run before the target to ensure the workspace
|
||||
* is up to date.
|
||||
|
||||
@ -1,40 +1,19 @@
|
||||
import { ChildProcess, exec, Serializable } from 'child_process';
|
||||
import * as path from 'path';
|
||||
import { Serializable } from 'child_process';
|
||||
import * as yargsParser from 'yargs-parser';
|
||||
import { env as appendLocalEnv } from 'npm-run-path';
|
||||
import { ExecutorContext } from '../../config/misc-interfaces';
|
||||
import * as chalk from 'chalk';
|
||||
import {
|
||||
getPseudoTerminal,
|
||||
PseudoTerminal,
|
||||
PseudoTtyProcess,
|
||||
} from '../../tasks-runner/pseudo-terminal';
|
||||
import { signalToCode } from '../../utils/exit-codes';
|
||||
import {
|
||||
loadAndExpandDotEnvFile,
|
||||
unloadDotEnvFile,
|
||||
} from '../../tasks-runner/task-env';
|
||||
import { ParallelRunningTasks, SeriallyRunningTasks } from './running-tasks';
|
||||
|
||||
export const LARGE_BUFFER = 1024 * 1000000;
|
||||
let pseudoTerminal: PseudoTerminal | null;
|
||||
const childProcesses = new Set<ChildProcess | PseudoTtyProcess>();
|
||||
|
||||
function loadEnvVarsFile(path: string, env: Record<string, string> = {}) {
|
||||
unloadDotEnvFile(path, env);
|
||||
const result = loadAndExpandDotEnvFile(path, env);
|
||||
if (result.error) {
|
||||
throw result.error;
|
||||
}
|
||||
}
|
||||
|
||||
export type Json = {
|
||||
[k: string]: any;
|
||||
};
|
||||
|
||||
export interface RunCommandsOptions extends Json {
|
||||
command?: string | string[];
|
||||
commands?: (
|
||||
| {
|
||||
export interface RunCommandsCommandOptions {
|
||||
command: string;
|
||||
forwardAllArgs?: boolean;
|
||||
/**
|
||||
@ -47,8 +26,10 @@ export interface RunCommandsOptions extends Json {
|
||||
color?: string;
|
||||
bgColor?: string;
|
||||
}
|
||||
| string
|
||||
)[];
|
||||
|
||||
export interface RunCommandsOptions extends Json {
|
||||
command?: string | string[];
|
||||
commands?: Array<RunCommandsCommandOptions | string>;
|
||||
color?: boolean;
|
||||
parallel?: boolean;
|
||||
readyWhen?: string | string[];
|
||||
@ -108,7 +89,18 @@ export default async function (
|
||||
success: boolean;
|
||||
terminalOutput: string;
|
||||
}> {
|
||||
registerProcessListener();
|
||||
const task = await runCommands(options, context);
|
||||
const results = await task.getResults();
|
||||
return {
|
||||
...results,
|
||||
success: results.code === 0,
|
||||
};
|
||||
}
|
||||
|
||||
export async function runCommands(
|
||||
options: RunCommandsOptions,
|
||||
context: ExecutorContext
|
||||
) {
|
||||
const normalized = normalizeOptions(options);
|
||||
|
||||
if (normalized.readyWhenStatus.length && !normalized.parallel) {
|
||||
@ -128,11 +120,18 @@ export default async function (
|
||||
);
|
||||
}
|
||||
|
||||
const pseudoTerminal =
|
||||
!options.parallel && PseudoTerminal.isSupported()
|
||||
? getPseudoTerminal()
|
||||
: null;
|
||||
|
||||
try {
|
||||
const result = options.parallel
|
||||
? await runInParallel(normalized, context)
|
||||
: await runSerially(normalized, context);
|
||||
return result;
|
||||
const runningTask = options.parallel
|
||||
? new ParallelRunningTasks(normalized, context)
|
||||
: new SeriallyRunningTasks(normalized, context, pseudoTerminal);
|
||||
|
||||
registerProcessListener(runningTask, pseudoTerminal);
|
||||
return runningTask;
|
||||
} catch (e) {
|
||||
if (process.env.NX_VERBOSE_LOGGING === 'true') {
|
||||
console.error(e);
|
||||
@ -143,77 +142,6 @@ export default async function (
|
||||
}
|
||||
}
|
||||
|
||||
async function runInParallel(
|
||||
options: NormalizedRunCommandsOptions,
|
||||
context: ExecutorContext
|
||||
): Promise<{ success: boolean; terminalOutput: string }> {
|
||||
const procs = options.commands.map((c) =>
|
||||
createProcess(
|
||||
null,
|
||||
c,
|
||||
options.readyWhenStatus,
|
||||
options.color,
|
||||
calculateCwd(options.cwd, context),
|
||||
options.env ?? {},
|
||||
true,
|
||||
options.usePty,
|
||||
options.streamOutput,
|
||||
options.tty,
|
||||
options.envFile
|
||||
).then((result: { success: boolean; terminalOutput: string }) => ({
|
||||
result,
|
||||
command: c.command,
|
||||
}))
|
||||
);
|
||||
|
||||
let terminalOutput = '';
|
||||
if (options.readyWhenStatus.length) {
|
||||
const r: {
|
||||
result: { success: boolean; terminalOutput: string };
|
||||
command: string;
|
||||
} = await Promise.race(procs);
|
||||
terminalOutput += r.result.terminalOutput;
|
||||
if (!r.result.success) {
|
||||
const output = `Warning: command "${r.command}" exited with non-zero status code`;
|
||||
terminalOutput += output;
|
||||
if (options.streamOutput) {
|
||||
process.stderr.write(output);
|
||||
}
|
||||
return { success: false, terminalOutput };
|
||||
} else {
|
||||
return { success: true, terminalOutput };
|
||||
}
|
||||
} else {
|
||||
const r: {
|
||||
result: { success: boolean; terminalOutput: string };
|
||||
command: string;
|
||||
}[] = await Promise.all(procs);
|
||||
terminalOutput += r.map((f) => f.result.terminalOutput).join('');
|
||||
const failed = r.filter((v) => !v.result.success);
|
||||
if (failed.length > 0) {
|
||||
const output = failed
|
||||
.map(
|
||||
(f) =>
|
||||
`Warning: command "${f.command}" exited with non-zero status code`
|
||||
)
|
||||
.join('\r\n');
|
||||
terminalOutput += output;
|
||||
if (options.streamOutput) {
|
||||
process.stderr.write(output);
|
||||
}
|
||||
return {
|
||||
success: false,
|
||||
terminalOutput,
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
success: true,
|
||||
terminalOutput,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function normalizeOptions(
|
||||
options: RunCommandsOptions
|
||||
): NormalizedRunCommandsOptions {
|
||||
@ -279,241 +207,6 @@ function normalizeOptions(
|
||||
return options as NormalizedRunCommandsOptions;
|
||||
}
|
||||
|
||||
async function runSerially(
|
||||
options: NormalizedRunCommandsOptions,
|
||||
context: ExecutorContext
|
||||
): Promise<{ success: boolean; terminalOutput: string }> {
|
||||
pseudoTerminal ??= PseudoTerminal.isSupported() ? getPseudoTerminal() : null;
|
||||
let terminalOutput = '';
|
||||
for (const c of options.commands) {
|
||||
const result: { success: boolean; terminalOutput: string } =
|
||||
await createProcess(
|
||||
pseudoTerminal,
|
||||
c,
|
||||
[],
|
||||
options.color,
|
||||
calculateCwd(options.cwd, context),
|
||||
options.processEnv ?? options.env ?? {},
|
||||
false,
|
||||
options.usePty,
|
||||
options.streamOutput,
|
||||
options.tty,
|
||||
options.envFile
|
||||
);
|
||||
terminalOutput += result.terminalOutput;
|
||||
if (!result.success) {
|
||||
const output = `Warning: command "${c.command}" exited with non-zero status code`;
|
||||
result.terminalOutput += output;
|
||||
if (options.streamOutput) {
|
||||
process.stderr.write(output);
|
||||
}
|
||||
return { success: false, terminalOutput };
|
||||
}
|
||||
}
|
||||
return { success: true, terminalOutput };
|
||||
}
|
||||
|
||||
async function createProcess(
|
||||
pseudoTerminal: PseudoTerminal | null,
|
||||
commandConfig: {
|
||||
command: string;
|
||||
color?: string;
|
||||
bgColor?: string;
|
||||
prefix?: string;
|
||||
prefixColor?: string;
|
||||
},
|
||||
readyWhenStatus: { stringToMatch: string; found: boolean }[] = [],
|
||||
color: boolean,
|
||||
cwd: string,
|
||||
env: Record<string, string>,
|
||||
isParallel: boolean,
|
||||
usePty: boolean = true,
|
||||
streamOutput: boolean = true,
|
||||
tty: boolean,
|
||||
envFile?: string
|
||||
): Promise<{ success: boolean; terminalOutput: string }> {
|
||||
env = processEnv(color, cwd, env, envFile);
|
||||
// The rust runCommand is always a tty, so it will not look nice in parallel and if we need prefixes
|
||||
// currently does not work properly in windows
|
||||
if (
|
||||
pseudoTerminal &&
|
||||
process.env.NX_NATIVE_COMMAND_RUNNER !== 'false' &&
|
||||
!commandConfig.prefix &&
|
||||
readyWhenStatus.length === 0 &&
|
||||
!isParallel &&
|
||||
usePty
|
||||
) {
|
||||
let terminalOutput = chalk.dim('> ') + commandConfig.command + '\r\n\r\n';
|
||||
if (streamOutput) {
|
||||
process.stdout.write(terminalOutput);
|
||||
}
|
||||
|
||||
const cp = pseudoTerminal.runCommand(commandConfig.command, {
|
||||
cwd,
|
||||
jsEnv: env,
|
||||
quiet: !streamOutput,
|
||||
tty,
|
||||
});
|
||||
|
||||
childProcesses.add(cp);
|
||||
|
||||
return new Promise((res) => {
|
||||
cp.onOutput((output) => {
|
||||
terminalOutput += output;
|
||||
});
|
||||
|
||||
cp.onExit((code) => {
|
||||
if (code >= 128) {
|
||||
process.exit(code);
|
||||
} else {
|
||||
res({ success: code === 0, terminalOutput });
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
return nodeProcess(commandConfig, cwd, env, readyWhenStatus, streamOutput);
|
||||
}
|
||||
|
||||
function nodeProcess(
|
||||
commandConfig: {
|
||||
command: string;
|
||||
color?: string;
|
||||
bgColor?: string;
|
||||
prefix?: string;
|
||||
prefixColor?: string;
|
||||
},
|
||||
cwd: string,
|
||||
env: Record<string, string>,
|
||||
readyWhenStatus: { stringToMatch: string; found: boolean }[],
|
||||
streamOutput = true
|
||||
): Promise<{ success: boolean; terminalOutput: string }> {
|
||||
let terminalOutput = chalk.dim('> ') + commandConfig.command + '\r\n\r\n';
|
||||
if (streamOutput) {
|
||||
process.stdout.write(terminalOutput);
|
||||
}
|
||||
return new Promise((res) => {
|
||||
const childProcess = exec(commandConfig.command, {
|
||||
maxBuffer: LARGE_BUFFER,
|
||||
env,
|
||||
cwd,
|
||||
windowsHide: false,
|
||||
});
|
||||
|
||||
childProcesses.add(childProcess);
|
||||
|
||||
childProcess.stdout.on('data', (data) => {
|
||||
const output = addColorAndPrefix(data, commandConfig);
|
||||
terminalOutput += output;
|
||||
if (streamOutput) {
|
||||
process.stdout.write(output);
|
||||
}
|
||||
if (readyWhenStatus.length && isReady(readyWhenStatus, data.toString())) {
|
||||
res({ success: true, terminalOutput });
|
||||
}
|
||||
});
|
||||
childProcess.stderr.on('data', (err) => {
|
||||
const output = addColorAndPrefix(err, commandConfig);
|
||||
terminalOutput += output;
|
||||
if (streamOutput) {
|
||||
process.stderr.write(output);
|
||||
}
|
||||
if (readyWhenStatus.length && isReady(readyWhenStatus, err.toString())) {
|
||||
res({ success: true, terminalOutput });
|
||||
}
|
||||
});
|
||||
childProcess.on('error', (err) => {
|
||||
const ouptput = addColorAndPrefix(err.toString(), commandConfig);
|
||||
terminalOutput += ouptput;
|
||||
if (streamOutput) {
|
||||
process.stderr.write(ouptput);
|
||||
}
|
||||
res({ success: false, terminalOutput });
|
||||
});
|
||||
childProcess.on('exit', (code) => {
|
||||
childProcesses.delete(childProcess);
|
||||
if (!readyWhenStatus.length || isReady(readyWhenStatus)) {
|
||||
res({ success: code === 0, terminalOutput });
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function addColorAndPrefix(
|
||||
out: string,
|
||||
config: {
|
||||
prefix?: string;
|
||||
prefixColor?: string;
|
||||
color?: string;
|
||||
bgColor?: string;
|
||||
}
|
||||
) {
|
||||
if (config.prefix) {
|
||||
out = out
|
||||
.split('\n')
|
||||
.map((l) => {
|
||||
let prefixText = config.prefix;
|
||||
if (config.prefixColor && chalk[config.prefixColor]) {
|
||||
prefixText = chalk[config.prefixColor](prefixText);
|
||||
}
|
||||
prefixText = chalk.bold(prefixText);
|
||||
return l.trim().length > 0 ? `${prefixText} ${l}` : l;
|
||||
})
|
||||
.join('\n');
|
||||
}
|
||||
if (config.color && chalk[config.color]) {
|
||||
out = chalk[config.color](out);
|
||||
}
|
||||
if (config.bgColor && chalk[config.bgColor]) {
|
||||
out = chalk[config.bgColor](out);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function calculateCwd(
|
||||
cwd: string | undefined,
|
||||
context: ExecutorContext
|
||||
): string {
|
||||
if (!cwd) return context.root;
|
||||
if (path.isAbsolute(cwd)) return cwd;
|
||||
return path.join(context.root, cwd);
|
||||
}
|
||||
|
||||
/**
|
||||
* Env variables are processed in the following order:
|
||||
* - env option from executor options
|
||||
* - env file from envFile option if provided
|
||||
* - local env variables
|
||||
*/
|
||||
function processEnv(
|
||||
color: boolean,
|
||||
cwd: string,
|
||||
envOptionFromExecutor: Record<string, string>,
|
||||
envFile?: string
|
||||
) {
|
||||
let localEnv = appendLocalEnv({ cwd: cwd ?? process.cwd() });
|
||||
localEnv = {
|
||||
...process.env,
|
||||
...localEnv,
|
||||
};
|
||||
|
||||
if (process.env.NX_LOAD_DOT_ENV_FILES !== 'false' && envFile) {
|
||||
loadEnvVarsFile(envFile, localEnv);
|
||||
}
|
||||
let res: Record<string, string> = {
|
||||
...localEnv,
|
||||
...envOptionFromExecutor,
|
||||
};
|
||||
// need to override PATH to make sure we are using the local node_modules
|
||||
if (localEnv.PATH) res.PATH = localEnv.PATH; // UNIX-like
|
||||
if (localEnv.Path) res.Path = localEnv.Path; // Windows
|
||||
|
||||
if (color) {
|
||||
res.FORCE_COLOR = `${color}`;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
export function interpolateArgsIntoCommand(
|
||||
command: string,
|
||||
opts: Pick<
|
||||
@ -631,7 +324,10 @@ function filterPropKeysFromUnParsedOptions(
|
||||
|
||||
let registered = false;
|
||||
|
||||
function registerProcessListener() {
|
||||
function registerProcessListener(
|
||||
runningTask: ParallelRunningTasks | SeriallyRunningTasks,
|
||||
pseudoTerminal?: PseudoTerminal
|
||||
) {
|
||||
if (registered) {
|
||||
return;
|
||||
}
|
||||
@ -644,45 +340,25 @@ function registerProcessListener() {
|
||||
pseudoTerminal.sendMessageToChildren(message);
|
||||
}
|
||||
|
||||
childProcesses.forEach((p) => {
|
||||
if ('connected' in p && p.connected) {
|
||||
p.send(message);
|
||||
}
|
||||
});
|
||||
runningTask.send(message);
|
||||
});
|
||||
|
||||
// Terminate any task processes on exit
|
||||
process.on('exit', () => {
|
||||
childProcesses.forEach((p) => {
|
||||
if ('connected' in p ? p.connected : p.isAlive) {
|
||||
p.kill();
|
||||
}
|
||||
});
|
||||
runningTask.kill();
|
||||
});
|
||||
process.on('SIGINT', () => {
|
||||
childProcesses.forEach((p) => {
|
||||
if ('connected' in p ? p.connected : p.isAlive) {
|
||||
p.kill('SIGTERM');
|
||||
}
|
||||
});
|
||||
runningTask.kill('SIGTERM');
|
||||
// we exit here because we don't need to write anything to cache.
|
||||
process.exit(signalToCode('SIGINT'));
|
||||
});
|
||||
process.on('SIGTERM', () => {
|
||||
childProcesses.forEach((p) => {
|
||||
if ('connected' in p ? p.connected : p.isAlive) {
|
||||
p.kill('SIGTERM');
|
||||
}
|
||||
});
|
||||
runningTask.kill('SIGTERM');
|
||||
// no exit here because we expect child processes to terminate which
|
||||
// will store results to the cache and will terminate this process
|
||||
});
|
||||
process.on('SIGHUP', () => {
|
||||
childProcesses.forEach((p) => {
|
||||
if ('connected' in p ? p.connected : p.isAlive) {
|
||||
p.kill('SIGTERM');
|
||||
}
|
||||
});
|
||||
runningTask.kill('SIGTERM');
|
||||
// no exit here because we expect child processes to terminate which
|
||||
// will store results to the cache and will terminate this process
|
||||
});
|
||||
@ -705,19 +381,3 @@ function wrapArgIntoQuotesIfNeeded(arg: string): string {
|
||||
return arg;
|
||||
}
|
||||
}
|
||||
|
||||
function isReady(
|
||||
readyWhenStatus: { stringToMatch: string; found: boolean }[] = [],
|
||||
data?: string
|
||||
): boolean {
|
||||
if (data) {
|
||||
for (const readyWhenElement of readyWhenStatus) {
|
||||
if (data.toString().indexOf(readyWhenElement.stringToMatch) > -1) {
|
||||
readyWhenElement.found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return readyWhenStatus.every((readyWhenElement) => readyWhenElement.found);
|
||||
}
|
||||
|
||||
519
packages/nx/src/executors/run-commands/running-tasks.ts
Normal file
519
packages/nx/src/executors/run-commands/running-tasks.ts
Normal file
@ -0,0 +1,519 @@
|
||||
import { ChildProcess, exec, Serializable } from 'child_process';
|
||||
import { RunningTask } from '../../tasks-runner/running-tasks/running-task';
|
||||
import { ExecutorContext } from '../../config/misc-interfaces';
|
||||
import {
|
||||
LARGE_BUFFER,
|
||||
NormalizedRunCommandsOptions,
|
||||
RunCommandsCommandOptions,
|
||||
RunCommandsOptions,
|
||||
} from './run-commands.impl';
|
||||
import {
|
||||
PseudoTerminal,
|
||||
PseudoTtyProcess,
|
||||
} from '../../tasks-runner/pseudo-terminal';
|
||||
import { isAbsolute, join } from 'path';
|
||||
import * as chalk from 'chalk';
|
||||
import { env as appendLocalEnv } from 'npm-run-path';
|
||||
import {
|
||||
loadAndExpandDotEnvFile,
|
||||
unloadDotEnvFile,
|
||||
} from '../../tasks-runner/task-env';
|
||||
import * as treeKill from 'tree-kill';
|
||||
|
||||
export class ParallelRunningTasks implements RunningTask {
|
||||
private readonly childProcesses: RunningNodeProcess[];
|
||||
private readyWhenStatus: { stringToMatch: string; found: boolean }[];
|
||||
private readonly streamOutput: boolean;
|
||||
|
||||
private exitCallbacks: Array<(code: number, terminalOutput: string) => void> =
|
||||
[];
|
||||
|
||||
constructor(options: NormalizedRunCommandsOptions, context: ExecutorContext) {
|
||||
this.childProcesses = options.commands.map(
|
||||
(commandConfig) =>
|
||||
new RunningNodeProcess(
|
||||
commandConfig,
|
||||
options.color,
|
||||
calculateCwd(options.cwd, context),
|
||||
options.env ?? {},
|
||||
options.readyWhenStatus,
|
||||
options.streamOutput,
|
||||
options.envFile
|
||||
)
|
||||
);
|
||||
this.readyWhenStatus = options.readyWhenStatus;
|
||||
this.streamOutput = options.streamOutput;
|
||||
|
||||
this.run();
|
||||
}
|
||||
|
||||
async getResults(): Promise<{ code: number; terminalOutput: string }> {
|
||||
return new Promise((res) => {
|
||||
this.onExit((code, terminalOutput) => {
|
||||
res({ code, terminalOutput });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
onExit(cb: (code: number, terminalOutput: string) => void): void {
|
||||
this.exitCallbacks.push(cb);
|
||||
}
|
||||
|
||||
send(message: Serializable): void {
|
||||
for (const childProcess of this.childProcesses) {
|
||||
childProcess.send(message);
|
||||
}
|
||||
}
|
||||
|
||||
async kill(signal?: NodeJS.Signals | number) {
|
||||
await Promise.all(
|
||||
this.childProcesses.map(async (p) => {
|
||||
try {
|
||||
return p.kill();
|
||||
} catch (e) {
|
||||
console.error(`Unable to terminate "${p.command}"\nError:`, e);
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
private async run() {
|
||||
if (this.readyWhenStatus.length) {
|
||||
let {
|
||||
childProcess,
|
||||
result: { code, terminalOutput },
|
||||
} = await Promise.race(
|
||||
this.childProcesses.map(
|
||||
(childProcess) =>
|
||||
new Promise<{
|
||||
childProcess: RunningNodeProcess;
|
||||
result: { code: number; terminalOutput: string };
|
||||
}>((res) => {
|
||||
childProcess.onExit((code, terminalOutput) => {
|
||||
res({
|
||||
childProcess,
|
||||
result: { code, terminalOutput },
|
||||
});
|
||||
});
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
if (code !== 0) {
|
||||
const output = `Warning: command "${childProcess.command}" exited with non-zero status code`;
|
||||
terminalOutput += output;
|
||||
if (this.streamOutput) {
|
||||
process.stderr.write(output);
|
||||
}
|
||||
}
|
||||
|
||||
for (const cb of this.exitCallbacks) {
|
||||
cb(code, terminalOutput);
|
||||
}
|
||||
} else {
|
||||
const results = await Promise.all(
|
||||
this.childProcesses.map((childProcess) =>
|
||||
childProcess.getResults().then((result) => ({
|
||||
childProcess,
|
||||
result,
|
||||
}))
|
||||
)
|
||||
);
|
||||
|
||||
let terminalOutput = results
|
||||
.map((r) => r.result.terminalOutput)
|
||||
.join('\r\n');
|
||||
|
||||
const failed = results.filter((result) => result.result.code !== 0);
|
||||
if (failed.length > 0) {
|
||||
const output = failed
|
||||
.map(
|
||||
(failedResult) =>
|
||||
`Warning: command "${failedResult.childProcess.command}" exited with non-zero status code`
|
||||
)
|
||||
.join('\r\n');
|
||||
terminalOutput += output;
|
||||
if (this.streamOutput) {
|
||||
process.stderr.write(output);
|
||||
}
|
||||
|
||||
for (const cb of this.exitCallbacks) {
|
||||
cb(1, terminalOutput);
|
||||
}
|
||||
} else {
|
||||
for (const cb of this.exitCallbacks) {
|
||||
cb(0, terminalOutput);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class SeriallyRunningTasks implements RunningTask {
|
||||
private terminalOutput = '';
|
||||
private currentProcess: RunningTask | PseudoTtyProcess | null = null;
|
||||
private exitCallbacks: Array<(code: number, terminalOutput: string) => void> =
|
||||
[];
|
||||
private code: number | null = 0;
|
||||
private error: any;
|
||||
|
||||
constructor(
|
||||
options: NormalizedRunCommandsOptions,
|
||||
context: ExecutorContext,
|
||||
private pseudoTerminal?: PseudoTerminal
|
||||
) {
|
||||
this.run(options, context)
|
||||
.catch((e) => {
|
||||
this.error = e;
|
||||
})
|
||||
.finally(() => {
|
||||
for (const cb of this.exitCallbacks) {
|
||||
cb(this.code, this.terminalOutput);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
getResults(): Promise<{ code: number; terminalOutput: string }> {
|
||||
return new Promise((res, rej) => {
|
||||
this.onExit((code) => {
|
||||
if (this.error) {
|
||||
rej(this.error);
|
||||
} else {
|
||||
res({ code, terminalOutput: this.terminalOutput });
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
onExit(cb: (code: number, terminalOutput: string) => void): void {
|
||||
this.exitCallbacks.push(cb);
|
||||
}
|
||||
|
||||
send(message: Serializable): void {
|
||||
throw new Error('Not implemented');
|
||||
}
|
||||
|
||||
kill(signal?: NodeJS.Signals | number) {
|
||||
return this.currentProcess.kill(signal);
|
||||
}
|
||||
|
||||
private async run(
|
||||
options: NormalizedRunCommandsOptions,
|
||||
context: ExecutorContext
|
||||
) {
|
||||
for (const c of options.commands) {
|
||||
const childProcess = await this.createProcess(
|
||||
c,
|
||||
[],
|
||||
options.color,
|
||||
calculateCwd(options.cwd, context),
|
||||
options.processEnv ?? options.env ?? {},
|
||||
false,
|
||||
options.usePty,
|
||||
options.streamOutput,
|
||||
options.tty,
|
||||
options.envFile
|
||||
);
|
||||
this.currentProcess = childProcess;
|
||||
|
||||
let { code, terminalOutput } = await childProcess.getResults();
|
||||
this.terminalOutput += terminalOutput;
|
||||
this.code = code;
|
||||
if (code !== 0) {
|
||||
const output = `Warning: command "${c.command}" exited with non-zero status code`;
|
||||
terminalOutput += output;
|
||||
if (options.streamOutput) {
|
||||
process.stderr.write(output);
|
||||
}
|
||||
this.terminalOutput += terminalOutput;
|
||||
|
||||
// Stop running commands
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async createProcess(
|
||||
commandConfig: RunCommandsCommandOptions,
|
||||
readyWhenStatus: { stringToMatch: string; found: boolean }[] = [],
|
||||
color: boolean,
|
||||
cwd: string,
|
||||
env: Record<string, string>,
|
||||
isParallel: boolean,
|
||||
usePty: boolean = true,
|
||||
streamOutput: boolean = true,
|
||||
tty: boolean,
|
||||
envFile?: string
|
||||
): Promise<PseudoTtyProcess | RunningNodeProcess> {
|
||||
// The rust runCommand is always a tty, so it will not look nice in parallel and if we need prefixes
|
||||
// currently does not work properly in windows
|
||||
if (
|
||||
this.pseudoTerminal &&
|
||||
process.env.NX_NATIVE_COMMAND_RUNNER !== 'false' &&
|
||||
!commandConfig.prefix &&
|
||||
readyWhenStatus.length === 0 &&
|
||||
!isParallel &&
|
||||
usePty
|
||||
) {
|
||||
return createProcessWithPseudoTty(
|
||||
this.pseudoTerminal,
|
||||
commandConfig,
|
||||
color,
|
||||
cwd,
|
||||
env,
|
||||
streamOutput,
|
||||
tty,
|
||||
envFile
|
||||
);
|
||||
}
|
||||
|
||||
return new RunningNodeProcess(
|
||||
commandConfig,
|
||||
color,
|
||||
cwd,
|
||||
env,
|
||||
readyWhenStatus,
|
||||
streamOutput,
|
||||
envFile
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
class RunningNodeProcess implements RunningTask {
|
||||
private terminalOutput = '';
|
||||
private childProcess: ChildProcess;
|
||||
private exitCallbacks: Array<(code: number, terminalOutput: string) => void> =
|
||||
[];
|
||||
public command: string;
|
||||
|
||||
constructor(
|
||||
commandConfig: RunCommandsCommandOptions,
|
||||
color: boolean,
|
||||
cwd: string,
|
||||
env: Record<string, string>,
|
||||
private readyWhenStatus: { stringToMatch: string; found: boolean }[],
|
||||
streamOutput = true,
|
||||
envFile: string
|
||||
) {
|
||||
env = processEnv(color, cwd, env, envFile);
|
||||
this.command = commandConfig.command;
|
||||
this.terminalOutput = chalk.dim('> ') + commandConfig.command + '\r\n\r\n';
|
||||
if (streamOutput) {
|
||||
process.stdout.write(this.terminalOutput);
|
||||
}
|
||||
this.childProcess = exec(commandConfig.command, {
|
||||
maxBuffer: LARGE_BUFFER,
|
||||
env,
|
||||
cwd,
|
||||
windowsHide: false,
|
||||
});
|
||||
|
||||
this.addListeners(commandConfig, streamOutput);
|
||||
}
|
||||
|
||||
getResults(): Promise<{ code: number; terminalOutput: string }> {
|
||||
return new Promise((res) => {
|
||||
this.onExit((code, terminalOutput) => {
|
||||
res({ code, terminalOutput });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
onExit(cb: (code: number, terminalOutput: string) => void): void {
|
||||
this.exitCallbacks.push(cb);
|
||||
}
|
||||
|
||||
send(message: Serializable): void {
|
||||
this.childProcess.send(message);
|
||||
}
|
||||
|
||||
kill(signal?: NodeJS.Signals | number): Promise<void> {
|
||||
return new Promise<void>((res, rej) => {
|
||||
treeKill(this.childProcess.pid, signal, (err) => {
|
||||
if (err) {
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private addListeners(
|
||||
commandConfig: RunCommandsCommandOptions,
|
||||
streamOutput: boolean
|
||||
) {
|
||||
this.childProcess.stdout.on('data', (data) => {
|
||||
const output = addColorAndPrefix(data, commandConfig);
|
||||
this.terminalOutput += output;
|
||||
if (streamOutput) {
|
||||
process.stdout.write(output);
|
||||
}
|
||||
if (
|
||||
this.readyWhenStatus.length &&
|
||||
isReady(this.readyWhenStatus, data.toString())
|
||||
) {
|
||||
for (const cb of this.exitCallbacks) {
|
||||
cb(0, this.terminalOutput);
|
||||
}
|
||||
}
|
||||
});
|
||||
this.childProcess.stderr.on('data', (err) => {
|
||||
const output = addColorAndPrefix(err, commandConfig);
|
||||
this.terminalOutput += output;
|
||||
if (streamOutput) {
|
||||
process.stderr.write(output);
|
||||
}
|
||||
if (
|
||||
this.readyWhenStatus.length &&
|
||||
isReady(this.readyWhenStatus, err.toString())
|
||||
) {
|
||||
for (const cb of this.exitCallbacks) {
|
||||
cb(1, this.terminalOutput);
|
||||
}
|
||||
}
|
||||
});
|
||||
this.childProcess.on('error', (err) => {
|
||||
const output = addColorAndPrefix(err.toString(), commandConfig);
|
||||
this.terminalOutput += output;
|
||||
if (streamOutput) {
|
||||
process.stderr.write(output);
|
||||
}
|
||||
for (const cb of this.exitCallbacks) {
|
||||
cb(1, this.terminalOutput);
|
||||
}
|
||||
});
|
||||
this.childProcess.on('exit', (code) => {
|
||||
if (!this.readyWhenStatus.length || isReady(this.readyWhenStatus)) {
|
||||
for (const cb of this.exitCallbacks) {
|
||||
cb(code, this.terminalOutput);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function createProcessWithPseudoTty(
|
||||
pseudoTerminal: PseudoTerminal,
|
||||
commandConfig: {
|
||||
command: string;
|
||||
color?: string;
|
||||
bgColor?: string;
|
||||
prefix?: string;
|
||||
},
|
||||
color: boolean,
|
||||
cwd: string,
|
||||
env: Record<string, string>,
|
||||
streamOutput: boolean = true,
|
||||
tty: boolean,
|
||||
envFile?: string
|
||||
) {
|
||||
let terminalOutput = chalk.dim('> ') + commandConfig.command + '\r\n\r\n';
|
||||
if (streamOutput) {
|
||||
process.stdout.write(terminalOutput);
|
||||
}
|
||||
env = processEnv(color, cwd, env, envFile);
|
||||
const childProcess = pseudoTerminal.runCommand(commandConfig.command, {
|
||||
cwd,
|
||||
jsEnv: env,
|
||||
quiet: !streamOutput,
|
||||
tty,
|
||||
});
|
||||
|
||||
childProcess.onOutput((output) => {
|
||||
terminalOutput += output;
|
||||
});
|
||||
|
||||
return childProcess;
|
||||
}
|
||||
|
||||
function addColorAndPrefix(out: string, config: RunCommandsCommandOptions) {
|
||||
if (config.prefix) {
|
||||
out = out
|
||||
.split('\n')
|
||||
.map((l) => {
|
||||
let prefixText = config.prefix;
|
||||
if (config.prefixColor && chalk[config.prefixColor]) {
|
||||
prefixText = chalk[config.prefixColor](prefixText);
|
||||
}
|
||||
prefixText = chalk.bold(prefixText);
|
||||
return l.trim().length > 0 ? `${prefixText} ${l}` : l;
|
||||
})
|
||||
.join('\n');
|
||||
}
|
||||
if (config.color && chalk[config.color]) {
|
||||
out = chalk[config.color](out);
|
||||
}
|
||||
if (config.bgColor && chalk[config.bgColor]) {
|
||||
out = chalk[config.bgColor](out);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function calculateCwd(
|
||||
cwd: string | undefined,
|
||||
context: ExecutorContext
|
||||
): string {
|
||||
if (!cwd) return context.root;
|
||||
if (isAbsolute(cwd)) return cwd;
|
||||
return join(context.root, cwd);
|
||||
}
|
||||
|
||||
/**
|
||||
* Env variables are processed in the following order:
|
||||
* - env option from executor options
|
||||
* - env file from envFile option if provided
|
||||
* - local env variables
|
||||
*/
|
||||
function processEnv(
|
||||
color: boolean,
|
||||
cwd: string,
|
||||
envOptionFromExecutor: Record<string, string>,
|
||||
envFile?: string
|
||||
) {
|
||||
let localEnv = appendLocalEnv({ cwd: cwd ?? process.cwd() });
|
||||
localEnv = {
|
||||
...process.env,
|
||||
...localEnv,
|
||||
};
|
||||
|
||||
if (process.env.NX_LOAD_DOT_ENV_FILES !== 'false' && envFile) {
|
||||
loadEnvVarsFile(envFile, localEnv);
|
||||
}
|
||||
let res: Record<string, string> = {
|
||||
...localEnv,
|
||||
...envOptionFromExecutor,
|
||||
};
|
||||
// need to override PATH to make sure we are using the local node_modules
|
||||
if (localEnv.PATH) res.PATH = localEnv.PATH; // UNIX-like
|
||||
if (localEnv.Path) res.Path = localEnv.Path; // Windows
|
||||
|
||||
if (color) {
|
||||
res.FORCE_COLOR = `${color}`;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
function isReady(
|
||||
readyWhenStatus: { stringToMatch: string; found: boolean }[] = [],
|
||||
data?: string
|
||||
): boolean {
|
||||
if (data) {
|
||||
for (const readyWhenElement of readyWhenStatus) {
|
||||
if (data.toString().indexOf(readyWhenElement.stringToMatch) > -1) {
|
||||
readyWhenElement.found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return readyWhenStatus.every((readyWhenElement) => readyWhenElement.found);
|
||||
}
|
||||
|
||||
function loadEnvVarsFile(path: string, env: Record<string, string> = {}) {
|
||||
unloadDotEnvFile(path, env);
|
||||
const result = loadAndExpandDotEnvFile(path, env);
|
||||
if (result.error) {
|
||||
throw result.error;
|
||||
}
|
||||
}
|
||||
1
packages/nx/src/native/index.d.ts
vendored
1
packages/nx/src/native/index.d.ts
vendored
@ -11,6 +11,7 @@ export declare class ChildProcess {
|
||||
kill(): void
|
||||
onExit(callback: (message: string) => void): void
|
||||
onOutput(callback: (message: string) => void): void
|
||||
cleanup(): void
|
||||
}
|
||||
|
||||
export declare class FileLock {
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
use crossbeam_channel::Receiver;
|
||||
use crossbeam_channel::Sender;
|
||||
use crossbeam_channel::{bounded, Receiver};
|
||||
use napi::{
|
||||
threadsafe_function::{
|
||||
ErrorStrategy::Fatal, ThreadsafeFunction, ThreadsafeFunctionCallMode::NonBlocking,
|
||||
@ -6,6 +7,7 @@ use napi::{
|
||||
Env, JsFunction,
|
||||
};
|
||||
use portable_pty::ChildKiller;
|
||||
use tracing::warn;
|
||||
|
||||
pub enum ChildProcessMessage {
|
||||
Kill,
|
||||
@ -16,6 +18,7 @@ pub struct ChildProcess {
|
||||
process_killer: Box<dyn ChildKiller + Sync + Send>,
|
||||
message_receiver: Receiver<String>,
|
||||
pub(crate) wait_receiver: Receiver<String>,
|
||||
thread_handles: Vec<Sender<()>>,
|
||||
}
|
||||
#[napi]
|
||||
impl ChildProcess {
|
||||
@ -28,6 +31,7 @@ impl ChildProcess {
|
||||
process_killer,
|
||||
message_receiver,
|
||||
wait_receiver: exit_receiver,
|
||||
thread_handles: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,18 +72,43 @@ impl ChildProcess {
|
||||
|
||||
callback_tsfn.unref(&env)?;
|
||||
|
||||
let (kill_tx, kill_rx) = bounded::<()>(1);
|
||||
|
||||
std::thread::spawn(move || {
|
||||
while let Ok(content) = rx.recv() {
|
||||
loop {
|
||||
if kill_rx.try_recv().is_ok() {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Ok(content) = rx.try_recv() {
|
||||
// windows will add `ESC[6n` to the beginning of the output,
|
||||
// we dont want to store this ANSI code in cache, because replays will cause issues
|
||||
// remove it before sending it to js
|
||||
#[cfg(windows)]
|
||||
let content = content.replace("\x1B[6n", "");
|
||||
|
||||
callback_tsfn.call(content, NonBlocking);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
self.thread_handles.push(kill_tx);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn cleanup(&mut self) {
|
||||
let handles = std::mem::take(&mut self.thread_handles);
|
||||
for handle in handles {
|
||||
if let Err(e) = handle.send(()) {
|
||||
warn!(error = ?e, "Failed to send kill signal to thread");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ChildProcess {
|
||||
fn drop(&mut self) {
|
||||
self.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
@ -159,9 +159,9 @@ pub fn run_command(
|
||||
}
|
||||
|
||||
let (exit_to_process_tx, exit_to_process_rx) = bounded(1);
|
||||
trace!("Running {}", command);
|
||||
let mut child = pair.slave.spawn_command(cmd)?;
|
||||
pseudo_terminal.running.store(true, Ordering::SeqCst);
|
||||
trace!("Running {}", command);
|
||||
let is_tty = tty.unwrap_or_else(|| std::io::stdout().is_tty());
|
||||
if is_tty {
|
||||
trace!("Enabling raw mode");
|
||||
|
||||
@ -48,6 +48,7 @@ describe('createTaskGraph', () => {
|
||||
executor: 'nx:run-commands',
|
||||
},
|
||||
serve: {
|
||||
continuous: true,
|
||||
executor: 'nx:run-commands',
|
||||
},
|
||||
},
|
||||
@ -90,6 +91,7 @@ describe('createTaskGraph', () => {
|
||||
roots: [],
|
||||
tasks: {},
|
||||
dependencies: {},
|
||||
continuousDependencies: {},
|
||||
});
|
||||
});
|
||||
|
||||
@ -117,11 +119,15 @@ describe('createTaskGraph', () => {
|
||||
overrides: { a: 123 },
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
'app1:test': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:test': [],
|
||||
},
|
||||
});
|
||||
|
||||
const twoTasks = createTaskGraph(
|
||||
@ -148,6 +154,7 @@ describe('createTaskGraph', () => {
|
||||
overrides: { a: 123 },
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib1:test': {
|
||||
id: 'lib1:test',
|
||||
@ -159,12 +166,17 @@ describe('createTaskGraph', () => {
|
||||
overrides: { a: 123 },
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
'app1:test': [],
|
||||
'lib1:test': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:test': [],
|
||||
'lib1:test': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -299,6 +311,7 @@ describe('createTaskGraph', () => {
|
||||
overrides: {},
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib2:compile': {
|
||||
id: 'lib2:compile',
|
||||
@ -312,12 +325,17 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib2-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
'lib1:compile:libDefault': ['lib2:compile'],
|
||||
'lib2:compile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'lib1:compile:libDefault': [],
|
||||
'lib2:compile': [],
|
||||
},
|
||||
});
|
||||
|
||||
const compileApp = createTaskGraph(
|
||||
@ -343,6 +361,7 @@ describe('createTaskGraph', () => {
|
||||
overrides: {},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib1:compile:libDefault': {
|
||||
id: 'lib1:compile:libDefault',
|
||||
@ -357,6 +376,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib2:compile:ci': {
|
||||
id: 'lib2:compile:ci',
|
||||
@ -371,6 +391,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib2-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
@ -378,6 +399,11 @@ describe('createTaskGraph', () => {
|
||||
'lib1:compile:libDefault': ['lib2:compile:ci'],
|
||||
'lib2:compile:ci': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:compile:ci': [],
|
||||
'lib1:compile:libDefault': [],
|
||||
'lib2:compile:ci': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -460,6 +486,10 @@ describe('createTaskGraph', () => {
|
||||
'app1:compile': ['lib3:compile'],
|
||||
'lib3:compile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:compile': [],
|
||||
'lib3:compile': [],
|
||||
},
|
||||
roots: ['lib3:compile'],
|
||||
tasks: {
|
||||
'app1:compile': {
|
||||
@ -472,6 +502,7 @@ describe('createTaskGraph', () => {
|
||||
target: 'compile',
|
||||
},
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib3:compile': {
|
||||
id: 'lib3:compile',
|
||||
@ -485,6 +516,7 @@ describe('createTaskGraph', () => {
|
||||
target: 'compile',
|
||||
},
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
});
|
||||
@ -514,11 +546,15 @@ describe('createTaskGraph', () => {
|
||||
overrides: { a: '--value=app1-root' },
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
'app1:test': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:test': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -546,11 +582,15 @@ describe('createTaskGraph', () => {
|
||||
overrides: { a: '--base-href=/app1-root${deploymentId}' },
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
'app1:test': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:test': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -661,6 +701,7 @@ describe('createTaskGraph', () => {
|
||||
overrides: { myFlag: 'flag value' },
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app1:precompile': {
|
||||
id: 'app1:precompile',
|
||||
@ -672,6 +713,7 @@ describe('createTaskGraph', () => {
|
||||
overrides: { myFlag: 'flag value' },
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib1:compile': {
|
||||
id: 'lib1:compile',
|
||||
@ -683,6 +725,7 @@ describe('createTaskGraph', () => {
|
||||
overrides: { myFlag: 'flag value' },
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib2:compile': {
|
||||
id: 'lib2:compile',
|
||||
@ -694,6 +737,7 @@ describe('createTaskGraph', () => {
|
||||
overrides: { __overrides_unparsed__: [] },
|
||||
projectRoot: 'lib2-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
@ -702,6 +746,12 @@ describe('createTaskGraph', () => {
|
||||
'lib1:compile': ['lib2:compile'],
|
||||
'lib2:compile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:compile': [],
|
||||
'app1:precompile': [],
|
||||
'lib1:compile': [],
|
||||
'lib2:compile': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -732,6 +782,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app1:precompile': {
|
||||
id: 'app1:precompile',
|
||||
@ -745,6 +796,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app1:precompile2': {
|
||||
id: 'app1:precompile2',
|
||||
@ -758,6 +810,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib1:compile': {
|
||||
id: 'lib1:compile',
|
||||
@ -771,6 +824,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
@ -779,6 +833,127 @@ describe('createTaskGraph', () => {
|
||||
'app1:precompile2': [],
|
||||
'lib1:compile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:compile': [],
|
||||
'app1:precompile': [],
|
||||
'app1:precompile2': [],
|
||||
'lib1:compile': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('should create graphs with continuous dependencies', () => {
|
||||
projectGraph.nodes['app1'].data.targets['serve'].dependsOn = [
|
||||
{
|
||||
dependencies: true,
|
||||
target: 'serve',
|
||||
},
|
||||
{
|
||||
target: 'compile',
|
||||
},
|
||||
];
|
||||
projectGraph.nodes['app1'].data.targets['compile'].dependsOn = [
|
||||
{
|
||||
dependencies: true,
|
||||
target: 'compile',
|
||||
},
|
||||
];
|
||||
projectGraph.nodes['lib1'].data.targets['serve'] = {
|
||||
executor: 'nx:run-command',
|
||||
continuous: true,
|
||||
dependsOn: [
|
||||
{
|
||||
dependencies: true,
|
||||
target: 'serve',
|
||||
},
|
||||
{
|
||||
target: 'compile',
|
||||
},
|
||||
],
|
||||
};
|
||||
const taskGraph = createTaskGraph(
|
||||
projectGraph,
|
||||
{},
|
||||
['app1'],
|
||||
['serve'],
|
||||
undefined,
|
||||
{
|
||||
__overrides_unparsed__: [],
|
||||
}
|
||||
);
|
||||
// precompile should also be in here
|
||||
expect(taskGraph).toEqual({
|
||||
roots: ['lib1:compile'],
|
||||
tasks: {
|
||||
'app1:serve': {
|
||||
id: 'app1:serve',
|
||||
target: {
|
||||
project: 'app1',
|
||||
target: 'serve',
|
||||
},
|
||||
outputs: [],
|
||||
overrides: {
|
||||
__overrides_unparsed__: [],
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: true,
|
||||
},
|
||||
'app1:compile': {
|
||||
id: 'app1:compile',
|
||||
target: {
|
||||
project: 'app1',
|
||||
target: 'compile',
|
||||
},
|
||||
outputs: [],
|
||||
overrides: {
|
||||
__overrides_unparsed__: [],
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib1:serve': {
|
||||
id: 'lib1:serve',
|
||||
target: {
|
||||
project: 'lib1',
|
||||
target: 'serve',
|
||||
},
|
||||
outputs: [],
|
||||
overrides: {
|
||||
__overrides_unparsed__: [],
|
||||
},
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: true,
|
||||
},
|
||||
'lib1:compile': {
|
||||
id: 'lib1:compile',
|
||||
target: {
|
||||
project: 'lib1',
|
||||
target: 'compile',
|
||||
},
|
||||
outputs: [],
|
||||
overrides: {
|
||||
__overrides_unparsed__: [],
|
||||
},
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
'app1:serve': ['app1:compile'],
|
||||
'app1:compile': ['lib1:compile'],
|
||||
'lib1:serve': ['lib1:compile'],
|
||||
'lib1:compile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:serve': ['lib1:serve'],
|
||||
'app1:compile': [],
|
||||
'lib1:serve': [],
|
||||
'lib1:compile': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -809,6 +984,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app1:precompile': {
|
||||
id: 'app1:precompile',
|
||||
@ -822,6 +998,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app1:precompile2': {
|
||||
id: 'app1:precompile2',
|
||||
@ -835,6 +1012,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib1:compile': {
|
||||
id: 'lib1:compile',
|
||||
@ -848,6 +1026,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
@ -856,6 +1035,12 @@ describe('createTaskGraph', () => {
|
||||
'app1:precompile2': [],
|
||||
'lib1:compile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:compile': [],
|
||||
'app1:precompile': [],
|
||||
'app1:precompile2': [],
|
||||
'lib1:compile': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -955,6 +1140,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib1:compile': {
|
||||
id: 'lib1:compile',
|
||||
@ -968,6 +1154,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib2:compile': {
|
||||
id: 'lib2:compile',
|
||||
@ -981,6 +1168,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib2-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib3:compile': {
|
||||
id: 'lib3:compile',
|
||||
@ -994,6 +1182,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib3-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
@ -1002,6 +1191,12 @@ describe('createTaskGraph', () => {
|
||||
'lib2:compile': ['lib3:compile'],
|
||||
'lib3:compile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:compile': [],
|
||||
'lib1:compile': [],
|
||||
'lib2:compile': [],
|
||||
'lib3:compile': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -1118,6 +1313,7 @@ describe('createTaskGraph', () => {
|
||||
outputs: [],
|
||||
overrides: { myFlag: 'flag value' },
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app2:compile': {
|
||||
id: 'app2:compile',
|
||||
@ -1126,6 +1322,7 @@ describe('createTaskGraph', () => {
|
||||
outputs: [],
|
||||
overrides: { __overrides_unparsed__: [] },
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'coreInfra:apply': {
|
||||
id: 'coreInfra:apply',
|
||||
@ -1134,6 +1331,7 @@ describe('createTaskGraph', () => {
|
||||
outputs: [],
|
||||
overrides: { myFlag: 'flag value' },
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app1:compile': {
|
||||
id: 'app1:compile',
|
||||
@ -1142,6 +1340,7 @@ describe('createTaskGraph', () => {
|
||||
outputs: [],
|
||||
overrides: { __overrides_unparsed__: [] },
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'infra2:apply': {
|
||||
id: 'infra2:apply',
|
||||
@ -1150,6 +1349,7 @@ describe('createTaskGraph', () => {
|
||||
outputs: [],
|
||||
overrides: { myFlag: 'flag value' },
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
@ -1164,6 +1364,13 @@ describe('createTaskGraph', () => {
|
||||
'app1:compile': [],
|
||||
'infra2:apply': ['app2:compile', 'coreInfra:apply'],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'infra1:apply': [],
|
||||
'app2:compile': [],
|
||||
'coreInfra:apply': [],
|
||||
'app1:compile': [],
|
||||
'infra2:apply': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -1217,6 +1424,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app1:test': {
|
||||
id: 'app1:test',
|
||||
@ -1230,12 +1438,17 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
'app1:compile': ['app1:test'],
|
||||
'app1:test': ['app1:compile'],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:compile': [],
|
||||
'app1:test': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -1326,6 +1539,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
'lib2:build': expect.objectContaining({
|
||||
id: 'lib2:build',
|
||||
@ -1339,6 +1553,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib2-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
'lib3:build': expect.objectContaining({
|
||||
id: 'lib3:build',
|
||||
@ -1352,6 +1567,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib3-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
'lib4:build': expect.objectContaining({
|
||||
id: 'lib4:build',
|
||||
@ -1365,6 +1581,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib4-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
},
|
||||
dependencies: {
|
||||
@ -1373,6 +1590,12 @@ describe('createTaskGraph', () => {
|
||||
'lib3:build': ['lib4:build'],
|
||||
'lib4:build': ['lib1:build'],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'lib1:build': [],
|
||||
'lib2:build': [],
|
||||
'lib3:build': [],
|
||||
'lib4:build': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -1458,6 +1681,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
'lib2:build': expect.objectContaining({
|
||||
id: 'lib2:build',
|
||||
@ -1471,6 +1695,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib2-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
'lib4:build': expect.objectContaining({
|
||||
id: 'lib4:build',
|
||||
@ -1484,6 +1709,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib4-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
},
|
||||
dependencies: {
|
||||
@ -1491,6 +1717,11 @@ describe('createTaskGraph', () => {
|
||||
'lib2:build': ['lib4:build'],
|
||||
'lib4:build': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'lib1:build': [],
|
||||
'lib2:build': [],
|
||||
'lib4:build': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -1551,11 +1782,15 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
},
|
||||
dependencies: {
|
||||
'lib1:build': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'lib1:build': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -1642,6 +1877,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
'lib2:build': expect.objectContaining({
|
||||
id: 'lib2:build',
|
||||
@ -1655,6 +1891,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib2-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
'lib4:build': expect.objectContaining({
|
||||
id: 'lib4:build',
|
||||
@ -1668,6 +1905,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib4-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
},
|
||||
dependencies: {
|
||||
@ -1675,6 +1913,11 @@ describe('createTaskGraph', () => {
|
||||
'lib2:build': [],
|
||||
'lib4:build': ['lib1:build'],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'lib1:build': [],
|
||||
'lib2:build': [],
|
||||
'lib4:build': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -1757,6 +2000,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
'lib2:build': expect.objectContaining({
|
||||
id: 'lib2:build',
|
||||
@ -1770,12 +2014,17 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib2-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
}),
|
||||
},
|
||||
dependencies: {
|
||||
'lib1:build': ['lib2:build'],
|
||||
'lib2:build': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'lib1:build': [],
|
||||
'lib2:build': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -1852,6 +2101,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app3:compile': {
|
||||
id: 'app3:compile',
|
||||
@ -1865,12 +2115,17 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app3-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
'app1:compile': [],
|
||||
'app3:compile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:compile': [],
|
||||
'app3:compile': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -1944,6 +2199,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app3:compile': {
|
||||
id: 'app3:compile',
|
||||
@ -1957,12 +2213,17 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app3-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
'app1:compile': [],
|
||||
'app3:compile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:compile': [],
|
||||
'app3:compile': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -2042,6 +2303,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app1:test': {
|
||||
id: 'app1:test',
|
||||
@ -2055,6 +2317,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib2:dep': {
|
||||
id: 'lib2:dep',
|
||||
@ -2068,6 +2331,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib2-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'lib2:dep2': {
|
||||
id: 'lib2:dep2',
|
||||
@ -2081,6 +2345,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'lib2-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
@ -2089,6 +2354,12 @@ describe('createTaskGraph', () => {
|
||||
'lib2:dep': [],
|
||||
'lib2:dep2': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:lint': [],
|
||||
'app1:test': [],
|
||||
'lib2:dep': [],
|
||||
'lib2:dep2': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -2160,11 +2431,15 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
'app1:compile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:compile': [],
|
||||
},
|
||||
});
|
||||
|
||||
const taskGraph2 = createTaskGraph(
|
||||
@ -2194,6 +2469,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app2:compile': {
|
||||
id: 'app2:compile',
|
||||
@ -2207,12 +2483,17 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app2-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
'app1:compile': ['app2:compile'],
|
||||
'app2:compile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:compile': [],
|
||||
'app2:compile': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@ -2706,6 +2987,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app1-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app4:precompile': {
|
||||
id: 'app4:precompile',
|
||||
@ -2719,12 +3001,17 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app4-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
'app1:compile': ['app4:precompile'],
|
||||
'app4:precompile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:compile': [],
|
||||
'app4:precompile': [],
|
||||
},
|
||||
});
|
||||
|
||||
taskGraph = createTaskGraph(
|
||||
@ -2752,6 +3039,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app2-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app3:compile': {
|
||||
id: 'app3:compile',
|
||||
@ -2765,6 +3053,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app3-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
'app4:precompile': {
|
||||
id: 'app4:precompile',
|
||||
@ -2778,6 +3067,7 @@ describe('createTaskGraph', () => {
|
||||
},
|
||||
projectRoot: 'app4-root',
|
||||
parallelism: true,
|
||||
continuous: false,
|
||||
},
|
||||
},
|
||||
dependencies: {
|
||||
@ -2785,6 +3075,11 @@ describe('createTaskGraph', () => {
|
||||
'app3:compile': ['app4:precompile'],
|
||||
'app4:precompile': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app2:compile': [],
|
||||
'app3:compile': [],
|
||||
'app4:precompile': [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@ -16,6 +16,7 @@ export class ProcessTasks {
|
||||
private readonly seen = new Set<string>();
|
||||
readonly tasks: { [id: string]: Task } = {};
|
||||
readonly dependencies: { [k: string]: string[] } = {};
|
||||
readonly continuousDependencies: { [k: string]: string[] } = {};
|
||||
private readonly allTargetNames: string[];
|
||||
|
||||
constructor(
|
||||
@ -58,6 +59,7 @@ export class ProcessTasks {
|
||||
);
|
||||
this.tasks[task.id] = task;
|
||||
this.dependencies[task.id] = [];
|
||||
this.continuousDependencies[task.id] = [];
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -75,6 +77,7 @@ export class ProcessTasks {
|
||||
if (!initialTasks[t]) {
|
||||
delete this.tasks[t];
|
||||
delete this.dependencies[t];
|
||||
delete this.continuousDependencies[t];
|
||||
}
|
||||
}
|
||||
for (let d of Object.keys(this.dependencies)) {
|
||||
@ -82,6 +85,11 @@ export class ProcessTasks {
|
||||
(dd) => !!initialTasks[dd]
|
||||
);
|
||||
}
|
||||
for (let d of Object.keys(this.continuousDependencies)) {
|
||||
this.continuousDependencies[d] = this.continuousDependencies[d].filter(
|
||||
(dd) => !!initialTasks[dd]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
filterDummyTasks(this.dependencies);
|
||||
@ -96,8 +104,22 @@ export class ProcessTasks {
|
||||
}
|
||||
}
|
||||
|
||||
return Object.keys(this.dependencies).filter(
|
||||
(d) => this.dependencies[d].length === 0
|
||||
filterDummyTasks(this.continuousDependencies);
|
||||
|
||||
for (const taskId of Object.keys(this.continuousDependencies)) {
|
||||
if (this.continuousDependencies[taskId].length > 0) {
|
||||
this.continuousDependencies[taskId] = [
|
||||
...new Set(
|
||||
this.continuousDependencies[taskId].filter((d) => d !== taskId)
|
||||
).values(),
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
return Object.keys(this.tasks).filter(
|
||||
(d) =>
|
||||
this.dependencies[d].length === 0 &&
|
||||
this.continuousDependencies[d].length === 0
|
||||
);
|
||||
}
|
||||
|
||||
@ -204,9 +226,6 @@ export class ProcessTasks {
|
||||
dependencyConfig.target,
|
||||
resolvedConfiguration
|
||||
);
|
||||
if (task.id !== selfTaskId) {
|
||||
this.dependencies[task.id].push(selfTaskId);
|
||||
}
|
||||
if (!this.tasks[selfTaskId]) {
|
||||
const newTask = this.createTask(
|
||||
selfTaskId,
|
||||
@ -217,6 +236,7 @@ export class ProcessTasks {
|
||||
);
|
||||
this.tasks[selfTaskId] = newTask;
|
||||
this.dependencies[selfTaskId] = [];
|
||||
this.continuousDependencies[selfTaskId] = [];
|
||||
this.processTask(
|
||||
newTask,
|
||||
newTask.target.project,
|
||||
@ -224,6 +244,13 @@ export class ProcessTasks {
|
||||
overrides
|
||||
);
|
||||
}
|
||||
if (task.id !== selfTaskId) {
|
||||
if (this.tasks[selfTaskId].continuous) {
|
||||
this.continuousDependencies[task.id].push(selfTaskId);
|
||||
} else {
|
||||
this.dependencies[task.id].push(selfTaskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -265,9 +292,18 @@ export class ProcessTasks {
|
||||
resolvedConfiguration
|
||||
);
|
||||
|
||||
const depTargetConfiguration =
|
||||
this.projectGraph.nodes[depProject.name].data.targets[
|
||||
dependencyConfig.target
|
||||
];
|
||||
|
||||
if (task.id !== depTargetId) {
|
||||
if (depTargetConfiguration.continuous) {
|
||||
this.continuousDependencies[task.id].push(depTargetId);
|
||||
} else {
|
||||
this.dependencies[task.id].push(depTargetId);
|
||||
}
|
||||
}
|
||||
if (!this.tasks[depTargetId]) {
|
||||
const newTask = this.createTask(
|
||||
depTargetId,
|
||||
@ -278,6 +314,7 @@ export class ProcessTasks {
|
||||
);
|
||||
this.tasks[depTargetId] = newTask;
|
||||
this.dependencies[depTargetId] = [];
|
||||
this.continuousDependencies[depTargetId] = [];
|
||||
|
||||
this.processTask(
|
||||
newTask,
|
||||
@ -298,6 +335,7 @@ export class ProcessTasks {
|
||||
);
|
||||
this.dependencies[task.id].push(dummyId);
|
||||
this.dependencies[dummyId] ??= [];
|
||||
this.continuousDependencies[dummyId] ??= [];
|
||||
const noopTask = this.createDummyTask(dummyId, task);
|
||||
this.processTask(noopTask, depProject.name, configuration, overrides);
|
||||
}
|
||||
@ -354,6 +392,7 @@ export class ProcessTasks {
|
||||
),
|
||||
cache: project.data.targets[target].cache,
|
||||
parallelism: project.data.targets[target].parallelism ?? true,
|
||||
continuous: project.data.targets[target].continuous ?? false,
|
||||
};
|
||||
}
|
||||
|
||||
@ -405,6 +444,7 @@ export function createTaskGraph(
|
||||
roots,
|
||||
tasks: p.tasks,
|
||||
dependencies: p.dependencies,
|
||||
continuousDependencies: p.continuousDependencies,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@ -1,26 +1,26 @@
|
||||
import { readFileSync, writeFileSync } from 'fs';
|
||||
import { ChildProcess, fork, Serializable } from 'child_process';
|
||||
import * as chalk from 'chalk';
|
||||
import { writeFileSync } from 'fs';
|
||||
import { fork, Serializable } from 'child_process';
|
||||
import { DefaultTasksRunnerOptions } from './default-tasks-runner';
|
||||
import { output } from '../utils/output';
|
||||
import { getCliPath, getPrintableCommandArgsForTask } from './utils';
|
||||
import { Batch } from './tasks-schedule';
|
||||
import { join } from 'path';
|
||||
import {
|
||||
BatchMessage,
|
||||
BatchMessageType,
|
||||
BatchResults,
|
||||
} from './batch/batch-messages';
|
||||
import { BatchMessageType } from './batch/batch-messages';
|
||||
import { stripIndents } from '../utils/strip-indents';
|
||||
import { Task, TaskGraph } from '../config/task-graph';
|
||||
import { Transform } from 'stream';
|
||||
import {
|
||||
PseudoTtyProcess,
|
||||
getPseudoTerminal,
|
||||
PseudoTerminal,
|
||||
PseudoTtyProcess,
|
||||
} from './pseudo-terminal';
|
||||
import { signalToCode } from '../utils/exit-codes';
|
||||
import { ProjectGraph } from '../config/project-graph';
|
||||
import {
|
||||
NodeChildProcessWithDirectOutput,
|
||||
NodeChildProcessWithNonDirectOutput,
|
||||
} from './running-tasks/node-child-process';
|
||||
import { BatchProcess } from './running-tasks/batch-process';
|
||||
import { RunningTask } from './running-tasks/running-task';
|
||||
|
||||
const forkScript = join(__dirname, './fork.js');
|
||||
|
||||
@ -30,8 +30,8 @@ export class ForkedProcessTaskRunner {
|
||||
cliPath = getCliPath();
|
||||
|
||||
private readonly verbose = process.env.NX_VERBOSE_LOGGING === 'true';
|
||||
private processes = new Set<ChildProcess | PseudoTtyProcess>();
|
||||
private finishedProcesses = new Set<ChildProcess | PseudoTtyProcess>();
|
||||
private processes = new Set<RunningTask | BatchProcess>();
|
||||
private finishedProcesses = new Set<BatchProcess>();
|
||||
|
||||
private pseudoTerminal: PseudoTerminal | null = PseudoTerminal.isSupported()
|
||||
? getPseudoTerminal()
|
||||
@ -47,15 +47,12 @@ export class ForkedProcessTaskRunner {
|
||||
}
|
||||
|
||||
// TODO: vsavkin delegate terminal output printing
|
||||
public forkProcessForBatch(
|
||||
public async forkProcessForBatch(
|
||||
{ executorName, taskGraph: batchTaskGraph }: Batch,
|
||||
projectGraph: ProjectGraph,
|
||||
fullTaskGraph: TaskGraph,
|
||||
env: NodeJS.ProcessEnv
|
||||
): Promise<BatchResults> {
|
||||
return new Promise<BatchResults>((res, rej) => {
|
||||
let p: ChildProcess;
|
||||
try {
|
||||
): Promise<BatchProcess> {
|
||||
const count = Object.keys(batchTaskGraph.tasks).length;
|
||||
if (count > 1) {
|
||||
output.logSingleLine(
|
||||
@ -70,72 +67,33 @@ export class ForkedProcessTaskRunner {
|
||||
output.logCommand(args.join(' '));
|
||||
}
|
||||
|
||||
p = fork(workerPath, {
|
||||
const p = fork(workerPath, {
|
||||
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
|
||||
env,
|
||||
});
|
||||
this.processes.add(p);
|
||||
const cp = new BatchProcess(p, executorName);
|
||||
this.processes.add(cp);
|
||||
|
||||
p.once('exit', (code, signal) => {
|
||||
this.processes.delete(p);
|
||||
if (code === null) code = signalToCode(signal);
|
||||
if (code !== 0) {
|
||||
rej(
|
||||
new Error(
|
||||
`"${executorName}" exited unexpectedly with code: ${code}`
|
||||
)
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
p.on('error', (err) => {
|
||||
this.processes.delete(p);
|
||||
rej(err || new Error(`"${executorName}" exited unexpectedly`));
|
||||
});
|
||||
|
||||
p.on('message', (message: BatchMessage) => {
|
||||
switch (message.type) {
|
||||
case BatchMessageType.CompleteBatchExecution: {
|
||||
res(message.results);
|
||||
this.finishedProcesses.add(p);
|
||||
break;
|
||||
}
|
||||
case BatchMessageType.RunTasks: {
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
// Re-emit any non-batch messages from the task process
|
||||
if (process.send) {
|
||||
process.send(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
cp.onExit(() => {
|
||||
this.processes.delete(cp);
|
||||
});
|
||||
|
||||
// Start the tasks
|
||||
p.send({
|
||||
cp.send({
|
||||
type: BatchMessageType.RunTasks,
|
||||
executorName,
|
||||
projectGraph,
|
||||
batchTaskGraph,
|
||||
fullTaskGraph,
|
||||
});
|
||||
} catch (e) {
|
||||
rej(e);
|
||||
if (p) {
|
||||
this.processes.delete(p);
|
||||
p.kill();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return cp;
|
||||
}
|
||||
|
||||
public cleanUpBatchProcesses() {
|
||||
if (this.finishedProcesses.size > 0) {
|
||||
this.finishedProcesses.forEach((p) => {
|
||||
if ('connected' in p ? p.connected : p.isAlive) {
|
||||
p.kill();
|
||||
}
|
||||
});
|
||||
this.finishedProcesses.clear();
|
||||
}
|
||||
@ -156,15 +114,15 @@ export class ForkedProcessTaskRunner {
|
||||
taskGraph: TaskGraph;
|
||||
env: NodeJS.ProcessEnv;
|
||||
}
|
||||
): Promise<{ code: number; terminalOutput: string }> {
|
||||
): Promise<RunningTask> {
|
||||
return pipeOutput
|
||||
? await this.forkProcessPipeOutputCapture(task, {
|
||||
? this.forkProcessWithPrefixAndNotTTY(task, {
|
||||
temporaryOutputPath,
|
||||
streamOutput,
|
||||
taskGraph,
|
||||
env,
|
||||
})
|
||||
: await this.forkProcessDirectOutputCapture(task, {
|
||||
: this.forkProcessDirectOutputCapture(task, {
|
||||
temporaryOutputPath,
|
||||
streamOutput,
|
||||
taskGraph,
|
||||
@ -188,7 +146,7 @@ export class ForkedProcessTaskRunner {
|
||||
env: NodeJS.ProcessEnv;
|
||||
disablePseudoTerminal: boolean;
|
||||
}
|
||||
): Promise<{ code: number; terminalOutput: string }> {
|
||||
): Promise<RunningTask | PseudoTtyProcess> {
|
||||
const shouldPrefix =
|
||||
streamOutput && process.env.NX_PREFIX_OUTPUT === 'true';
|
||||
|
||||
@ -229,7 +187,7 @@ export class ForkedProcessTaskRunner {
|
||||
taskGraph: TaskGraph;
|
||||
env: NodeJS.ProcessEnv;
|
||||
}
|
||||
): Promise<{ code: number; terminalOutput: string }> {
|
||||
): Promise<PseudoTtyProcess> {
|
||||
const args = getPrintableCommandArgsForTask(task);
|
||||
if (streamOutput) {
|
||||
output.logCommand(args.join(' '));
|
||||
@ -256,42 +214,15 @@ export class ForkedProcessTaskRunner {
|
||||
terminalOutput += msg;
|
||||
});
|
||||
|
||||
return new Promise((res) => {
|
||||
p.onExit((code) => {
|
||||
this.processes.delete(p);
|
||||
// If the exit code is greater than 128, it's a special exit code for a signal
|
||||
if (code >= 128) {
|
||||
if (code > 128) {
|
||||
process.exit(code);
|
||||
}
|
||||
this.processes.delete(p);
|
||||
this.writeTerminalOutput(temporaryOutputPath, terminalOutput);
|
||||
res({
|
||||
code,
|
||||
terminalOutput,
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private forkProcessPipeOutputCapture(
|
||||
task: Task,
|
||||
{
|
||||
streamOutput,
|
||||
temporaryOutputPath,
|
||||
taskGraph,
|
||||
env,
|
||||
}: {
|
||||
streamOutput: boolean;
|
||||
temporaryOutputPath: string;
|
||||
taskGraph: TaskGraph;
|
||||
env: NodeJS.ProcessEnv;
|
||||
}
|
||||
) {
|
||||
return this.forkProcessWithPrefixAndNotTTY(task, {
|
||||
streamOutput,
|
||||
temporaryOutputPath,
|
||||
taskGraph,
|
||||
env,
|
||||
});
|
||||
return p;
|
||||
}
|
||||
|
||||
private forkProcessWithPrefixAndNotTTY(
|
||||
@ -308,7 +239,6 @@ export class ForkedProcessTaskRunner {
|
||||
env: NodeJS.ProcessEnv;
|
||||
}
|
||||
) {
|
||||
return new Promise<{ code: number; terminalOutput: string }>((res, rej) => {
|
||||
try {
|
||||
const args = getPrintableCommandArgsForTask(task);
|
||||
if (streamOutput) {
|
||||
@ -319,14 +249,6 @@ export class ForkedProcessTaskRunner {
|
||||
stdio: ['inherit', 'pipe', 'pipe', 'ipc'],
|
||||
env,
|
||||
});
|
||||
this.processes.add(p);
|
||||
|
||||
// Re-emit any messages from the task process
|
||||
p.on('message', (message) => {
|
||||
if (process.send) {
|
||||
process.send(message);
|
||||
}
|
||||
});
|
||||
|
||||
// Send message to run the executor
|
||||
p.send({
|
||||
@ -336,41 +258,14 @@ export class ForkedProcessTaskRunner {
|
||||
isVerbose: this.verbose,
|
||||
});
|
||||
|
||||
if (streamOutput) {
|
||||
if (process.env.NX_PREFIX_OUTPUT === 'true') {
|
||||
const color = getColor(task.target.project);
|
||||
const prefixText = `${task.target.project}:`;
|
||||
|
||||
p.stdout
|
||||
.pipe(
|
||||
logClearLineToPrefixTransformer(color.bold(prefixText) + ' ')
|
||||
)
|
||||
.pipe(addPrefixTransformer(color.bold(prefixText)))
|
||||
.pipe(process.stdout);
|
||||
p.stderr
|
||||
.pipe(logClearLineToPrefixTransformer(color(prefixText) + ' '))
|
||||
.pipe(addPrefixTransformer(color(prefixText)))
|
||||
.pipe(process.stderr);
|
||||
} else {
|
||||
p.stdout.pipe(addPrefixTransformer()).pipe(process.stdout);
|
||||
p.stderr.pipe(addPrefixTransformer()).pipe(process.stderr);
|
||||
}
|
||||
}
|
||||
|
||||
let outWithErr = [];
|
||||
p.stdout.on('data', (chunk) => {
|
||||
outWithErr.push(chunk.toString());
|
||||
});
|
||||
p.stderr.on('data', (chunk) => {
|
||||
outWithErr.push(chunk.toString());
|
||||
const cp = new NodeChildProcessWithNonDirectOutput(p, {
|
||||
streamOutput,
|
||||
prefix: task.target.project,
|
||||
});
|
||||
this.processes.add(cp);
|
||||
|
||||
p.once('exit', (code, signal) => {
|
||||
this.processes.delete(p);
|
||||
if (code === null) code = signalToCode(signal);
|
||||
// we didn't print any output as we were running the command
|
||||
// print all the collected output|
|
||||
const terminalOutput = outWithErr.join('');
|
||||
cp.onExit((code, terminalOutput) => {
|
||||
this.processes.delete(cp);
|
||||
|
||||
if (!streamOutput) {
|
||||
this.options.lifeCycle.printTaskTerminalOutput(
|
||||
@ -380,13 +275,13 @@ export class ForkedProcessTaskRunner {
|
||||
);
|
||||
}
|
||||
this.writeTerminalOutput(temporaryOutputPath, terminalOutput);
|
||||
res({ code, terminalOutput });
|
||||
});
|
||||
|
||||
return cp;
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
rej(e);
|
||||
throw e;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private forkProcessDirectOutputCapture(
|
||||
@ -403,7 +298,6 @@ export class ForkedProcessTaskRunner {
|
||||
env: NodeJS.ProcessEnv;
|
||||
}
|
||||
) {
|
||||
return new Promise<{ code: number; terminalOutput: string }>((res, rej) => {
|
||||
try {
|
||||
const args = getPrintableCommandArgsForTask(task);
|
||||
if (streamOutput) {
|
||||
@ -413,14 +307,9 @@ export class ForkedProcessTaskRunner {
|
||||
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
|
||||
env,
|
||||
});
|
||||
this.processes.add(p);
|
||||
const cp = new NodeChildProcessWithDirectOutput(p, temporaryOutputPath);
|
||||
|
||||
// Re-emit any messages from the task process
|
||||
p.on('message', (message) => {
|
||||
if (process.send) {
|
||||
process.send(message);
|
||||
}
|
||||
});
|
||||
this.processes.add(cp);
|
||||
|
||||
// Send message to run the executor
|
||||
p.send({
|
||||
@ -430,14 +319,12 @@ export class ForkedProcessTaskRunner {
|
||||
isVerbose: this.verbose,
|
||||
});
|
||||
|
||||
p.once('exit', (code, signal) => {
|
||||
this.processes.delete(p);
|
||||
if (code === null) code = signalToCode(signal);
|
||||
cp.onExit((code, signal) => {
|
||||
this.processes.delete(cp);
|
||||
// we didn't print any output as we were running the command
|
||||
// print all the collected output
|
||||
let terminalOutput = '';
|
||||
try {
|
||||
terminalOutput = this.readTerminalOutput(temporaryOutputPath);
|
||||
const terminalOutput = cp.getTerminalOutput();
|
||||
if (!streamOutput) {
|
||||
this.options.lifeCycle.printTaskTerminalOutput(
|
||||
task,
|
||||
@ -454,20 +341,13 @@ export class ForkedProcessTaskRunner {
|
||||
${e.message}
|
||||
`);
|
||||
}
|
||||
res({
|
||||
code,
|
||||
terminalOutput,
|
||||
});
|
||||
});
|
||||
|
||||
return cp;
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
rej(e);
|
||||
throw e;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private readTerminalOutput(outputPath: string) {
|
||||
return readFileSync(outputPath).toString();
|
||||
}
|
||||
|
||||
private writeTerminalOutput(outputPath: string, content: string) {
|
||||
@ -482,13 +362,12 @@ export class ForkedProcessTaskRunner {
|
||||
}
|
||||
|
||||
const messageHandler = (message: Serializable) => {
|
||||
// this.publisher.publish(message.toString());
|
||||
if (this.pseudoTerminal) {
|
||||
this.pseudoTerminal.sendMessageToChildren(message);
|
||||
}
|
||||
|
||||
this.processes.forEach((p) => {
|
||||
if ('connected' in p && p.connected) {
|
||||
if ('connected' in p && p.connected && 'send' in p) {
|
||||
p.send(message);
|
||||
}
|
||||
});
|
||||
@ -499,9 +378,7 @@ export class ForkedProcessTaskRunner {
|
||||
|
||||
const cleanUp = (signal?: NodeJS.Signals) => {
|
||||
this.processes.forEach((p) => {
|
||||
if ('connected' in p ? p.connected : p.isAlive) {
|
||||
p.kill(signal);
|
||||
}
|
||||
});
|
||||
process.off('message', messageHandler);
|
||||
this.cleanUpBatchProcesses();
|
||||
@ -528,60 +405,3 @@ export class ForkedProcessTaskRunner {
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const colors = [
|
||||
chalk.green,
|
||||
chalk.greenBright,
|
||||
chalk.red,
|
||||
chalk.redBright,
|
||||
chalk.cyan,
|
||||
chalk.cyanBright,
|
||||
chalk.yellow,
|
||||
chalk.yellowBright,
|
||||
chalk.magenta,
|
||||
chalk.magentaBright,
|
||||
];
|
||||
|
||||
function getColor(projectName: string) {
|
||||
let code = 0;
|
||||
for (let i = 0; i < projectName.length; ++i) {
|
||||
code += projectName.charCodeAt(i);
|
||||
}
|
||||
const colorIndex = code % colors.length;
|
||||
|
||||
return colors[colorIndex];
|
||||
}
|
||||
|
||||
/**
|
||||
* Prevents terminal escape sequence from clearing line prefix.
|
||||
*/
|
||||
function logClearLineToPrefixTransformer(prefix: string) {
|
||||
let prevChunk = null;
|
||||
return new Transform({
|
||||
transform(chunk, _encoding, callback) {
|
||||
if (prevChunk && prevChunk.toString() === '\x1b[2K') {
|
||||
chunk = chunk.toString().replace(/\x1b\[1G/g, (m) => m + prefix);
|
||||
}
|
||||
this.push(chunk);
|
||||
prevChunk = chunk;
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function addPrefixTransformer(prefix?: string) {
|
||||
const newLineSeparator = process.platform.startsWith('win') ? '\r\n' : '\n';
|
||||
return new Transform({
|
||||
transform(chunk, _encoding, callback) {
|
||||
const list = chunk.toString().split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g);
|
||||
list
|
||||
.filter(Boolean)
|
||||
.forEach((m) =>
|
||||
this.push(
|
||||
prefix ? prefix + ' ' + m + newLineSeparator : m + newLineSeparator
|
||||
)
|
||||
);
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
@ -47,6 +47,10 @@ export async function initTasksRunner(nxArgs: NxArgs) {
|
||||
acc[task.id] = [];
|
||||
return acc;
|
||||
}, {} as any),
|
||||
continuousDependencies: opts.tasks.reduce((acc, task) => {
|
||||
acc[task.id] = [];
|
||||
return acc;
|
||||
}, {} as any),
|
||||
};
|
||||
|
||||
const taskResults = await invokeTasksRunner({
|
||||
|
||||
@ -59,6 +59,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
]
|
||||
@ -78,6 +79,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
]
|
||||
@ -99,6 +101,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
{
|
||||
@ -109,6 +112,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
]
|
||||
@ -131,6 +135,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
]
|
||||
@ -150,6 +155,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
]
|
||||
@ -171,6 +177,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
{
|
||||
@ -181,6 +188,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
]
|
||||
@ -200,6 +208,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
{
|
||||
@ -210,6 +219,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
]
|
||||
@ -229,6 +239,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
{
|
||||
@ -239,6 +250,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
]
|
||||
@ -262,6 +274,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
{
|
||||
@ -272,6 +285,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
]
|
||||
@ -295,6 +309,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
{
|
||||
@ -305,6 +320,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
{
|
||||
@ -315,6 +331,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
]
|
||||
@ -338,6 +355,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
{
|
||||
@ -348,6 +366,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
{
|
||||
@ -358,6 +377,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
{
|
||||
@ -368,6 +388,7 @@ describe('formatTargetsAndProjects', () => {
|
||||
},
|
||||
overrides: {},
|
||||
parallelism: false,
|
||||
continuous: false,
|
||||
outputs: [],
|
||||
},
|
||||
]
|
||||
|
||||
@ -17,6 +17,7 @@ describe('PseudoTerminal', () => {
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should kill a running command', (done) => {
|
||||
const childProcess = terminal.runCommand(
|
||||
'sleep 3 && echo "hello world" > file.txt'
|
||||
@ -31,18 +32,35 @@ describe('PseudoTerminal', () => {
|
||||
|
||||
it('should subscribe to output', (done) => {
|
||||
const childProcess = terminal.runCommand('echo "hello world"');
|
||||
|
||||
let output = '';
|
||||
childProcess.onOutput((chunk) => {
|
||||
output += chunk;
|
||||
});
|
||||
|
||||
childProcess.onExit(() => {
|
||||
try {
|
||||
expect(output.trim()).toContain('hello world');
|
||||
} finally {
|
||||
done();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it('should get results', async () => {
|
||||
const childProcess = terminal.runCommand('echo "hello world"');
|
||||
|
||||
const results = await childProcess.getResults();
|
||||
|
||||
expect(results.code).toEqual(0);
|
||||
expect(results.terminalOutput).toContain('hello world');
|
||||
const childProcess2 = terminal.runCommand('echo "hello jason"');
|
||||
|
||||
const results2 = await childProcess2.getResults();
|
||||
|
||||
expect(results2.code).toEqual(0);
|
||||
expect(results2.terminalOutput).toContain('hello jason');
|
||||
});
|
||||
|
||||
if (process.env.CI !== 'true') {
|
||||
it('should be tty', (done) => {
|
||||
const childProcess = terminal.runCommand(
|
||||
@ -56,17 +74,12 @@ describe('PseudoTerminal', () => {
|
||||
}
|
||||
|
||||
it('should run multiple commands', async () => {
|
||||
function runCommand() {
|
||||
return new Promise((res) => {
|
||||
const cp1 = terminal.runCommand('whoami', {});
|
||||
|
||||
cp1.onExit(res);
|
||||
});
|
||||
}
|
||||
|
||||
let i = 0;
|
||||
while (i < 10) {
|
||||
await runCommand();
|
||||
const childProcess = terminal.runCommand('whoami', {});
|
||||
|
||||
await childProcess.getResults();
|
||||
|
||||
i++;
|
||||
}
|
||||
});
|
||||
|
||||
@ -138,15 +138,32 @@ export class PseudoTerminal {
|
||||
export class PseudoTtyProcess {
|
||||
isAlive = true;
|
||||
|
||||
exitCallbacks = [];
|
||||
private exitCallbacks: Array<(code: number) => void> = [];
|
||||
private outputCallbacks: Array<(output: string) => void> = [];
|
||||
|
||||
private terminalOutput = '';
|
||||
|
||||
constructor(private childProcess: ChildProcess) {
|
||||
childProcess.onOutput((output) => {
|
||||
this.terminalOutput += output;
|
||||
this.outputCallbacks.forEach((cb) => cb(output));
|
||||
});
|
||||
|
||||
childProcess.onExit((message) => {
|
||||
this.isAlive = false;
|
||||
|
||||
const exitCode = messageToCode(message);
|
||||
const code = messageToCode(message);
|
||||
childProcess.cleanup();
|
||||
|
||||
this.exitCallbacks.forEach((cb) => cb(exitCode));
|
||||
this.exitCallbacks.forEach((cb) => cb(code));
|
||||
});
|
||||
}
|
||||
|
||||
async getResults(): Promise<{ code: number; terminalOutput: string }> {
|
||||
return new Promise((res) => {
|
||||
this.onExit((code) => {
|
||||
res({ code, terminalOutput: this.terminalOutput });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@ -155,17 +172,17 @@ export class PseudoTtyProcess {
|
||||
}
|
||||
|
||||
onOutput(callback: (message: string) => void): void {
|
||||
this.childProcess.onOutput(callback);
|
||||
this.outputCallbacks.push(callback);
|
||||
}
|
||||
|
||||
kill(): void {
|
||||
if (this.isAlive) {
|
||||
try {
|
||||
this.childProcess.kill();
|
||||
} catch {
|
||||
// when the child process completes before we explicitly call kill, this will throw
|
||||
// do nothing
|
||||
} finally {
|
||||
if (this.isAlive == true) {
|
||||
this.isAlive = false;
|
||||
}
|
||||
}
|
||||
|
||||
84
packages/nx/src/tasks-runner/running-tasks/batch-process.ts
Normal file
84
packages/nx/src/tasks-runner/running-tasks/batch-process.ts
Normal file
@ -0,0 +1,84 @@
|
||||
import {
|
||||
BatchMessage,
|
||||
BatchMessageType,
|
||||
BatchResults,
|
||||
} from '../batch/batch-messages';
|
||||
import { ChildProcess, Serializable } from 'child_process';
|
||||
import { signalToCode } from '../../utils/exit-codes';
|
||||
|
||||
export class BatchProcess {
|
||||
private exitCallbacks: Array<(code: number) => void> = [];
|
||||
private resultsCallbacks: Array<(results: BatchResults) => void> = [];
|
||||
|
||||
constructor(
|
||||
private childProcess: ChildProcess,
|
||||
private executorName: string
|
||||
) {
|
||||
this.childProcess.on('message', (message: BatchMessage) => {
|
||||
switch (message.type) {
|
||||
case BatchMessageType.CompleteBatchExecution: {
|
||||
for (const cb of this.resultsCallbacks) {
|
||||
cb(message.results);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case BatchMessageType.RunTasks: {
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
// Re-emit any non-batch messages from the task process
|
||||
if (process.send) {
|
||||
process.send(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.childProcess.once('exit', (code, signal) => {
|
||||
if (code === null) code = signalToCode(signal);
|
||||
|
||||
for (const cb of this.exitCallbacks) {
|
||||
cb(code);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
onExit(cb: (code: number) => void) {
|
||||
this.exitCallbacks.push(cb);
|
||||
}
|
||||
|
||||
onResults(cb: (results: BatchResults) => void) {
|
||||
this.resultsCallbacks.push(cb);
|
||||
}
|
||||
|
||||
async getResults(): Promise<BatchResults> {
|
||||
return Promise.race<BatchResults>([
|
||||
new Promise((_, rej) => {
|
||||
this.onExit((code) => {
|
||||
if (code !== 0) {
|
||||
rej(
|
||||
new Error(
|
||||
`"${this.executorName}" exited unexpectedly with code: ${code}`
|
||||
)
|
||||
);
|
||||
}
|
||||
});
|
||||
}),
|
||||
new Promise((res) => {
|
||||
this.onResults(res);
|
||||
}),
|
||||
]);
|
||||
}
|
||||
|
||||
send(message: Serializable): void {
|
||||
if (this.childProcess.connected) {
|
||||
this.childProcess.send(message);
|
||||
}
|
||||
}
|
||||
|
||||
kill(signal?: NodeJS.Signals | number): void {
|
||||
if (this.childProcess.connected) {
|
||||
this.childProcess.kill(signal);
|
||||
}
|
||||
}
|
||||
}
|
||||
215
packages/nx/src/tasks-runner/running-tasks/node-child-process.ts
Normal file
215
packages/nx/src/tasks-runner/running-tasks/node-child-process.ts
Normal file
@ -0,0 +1,215 @@
|
||||
import { ChildProcess, Serializable } from 'child_process';
|
||||
import { signalToCode } from '../../utils/exit-codes';
|
||||
import { RunningTask } from './running-task';
|
||||
import { Transform } from 'stream';
|
||||
import * as chalk from 'chalk';
|
||||
import { readFileSync } from 'fs';
|
||||
|
||||
export class NodeChildProcessWithNonDirectOutput implements RunningTask {
|
||||
private terminalOutput: string = '';
|
||||
private exitCallbacks: Array<(code: number, terminalOutput: string) => void> =
|
||||
[];
|
||||
|
||||
constructor(
|
||||
private childProcess: ChildProcess,
|
||||
{ streamOutput, prefix }: { streamOutput: boolean; prefix: string }
|
||||
) {
|
||||
if (streamOutput) {
|
||||
if (process.env.NX_PREFIX_OUTPUT === 'true') {
|
||||
const color = getColor(prefix);
|
||||
const prefixText = `${prefix}:`;
|
||||
|
||||
this.childProcess.stdout
|
||||
.pipe(logClearLineToPrefixTransformer(color.bold(prefixText) + ' '))
|
||||
.pipe(addPrefixTransformer(color.bold(prefixText)))
|
||||
.pipe(process.stdout);
|
||||
this.childProcess.stderr
|
||||
.pipe(logClearLineToPrefixTransformer(color(prefixText) + ' '))
|
||||
.pipe(addPrefixTransformer(color(prefixText)))
|
||||
.pipe(process.stderr);
|
||||
} else {
|
||||
this.childProcess.stdout
|
||||
.pipe(addPrefixTransformer())
|
||||
.pipe(process.stdout);
|
||||
this.childProcess.stderr
|
||||
.pipe(addPrefixTransformer())
|
||||
.pipe(process.stderr);
|
||||
}
|
||||
}
|
||||
|
||||
this.childProcess.on('exit', (code, signal) => {
|
||||
if (code === null) code = signalToCode(signal);
|
||||
for (const cb of this.exitCallbacks) {
|
||||
cb(code, this.terminalOutput);
|
||||
}
|
||||
});
|
||||
|
||||
// Re-emit any messages from the task process
|
||||
this.childProcess.on('message', (message) => {
|
||||
if (process.send) {
|
||||
process.send(message);
|
||||
}
|
||||
});
|
||||
|
||||
this.childProcess.stdout.on('data', (chunk) => {
|
||||
this.terminalOutput += chunk.toString();
|
||||
});
|
||||
this.childProcess.stderr.on('data', (chunk) => {
|
||||
this.terminalOutput += chunk.toString();
|
||||
});
|
||||
}
|
||||
|
||||
onExit(cb: (code: number, terminalOutput: string) => void) {
|
||||
this.exitCallbacks.push(cb);
|
||||
}
|
||||
|
||||
async getResults(): Promise<{ code: number; terminalOutput: string }> {
|
||||
return new Promise((res) => {
|
||||
this.onExit((code, terminalOutput) => {
|
||||
res({ code, terminalOutput });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
send(message: Serializable): void {
|
||||
if (this.childProcess.connected) {
|
||||
this.childProcess.send(message);
|
||||
}
|
||||
}
|
||||
|
||||
public kill(signal?: NodeJS.Signals | number) {
|
||||
if (this.childProcess.connected) {
|
||||
this.childProcess.kill(signal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function addPrefixTransformer(prefix?: string) {
|
||||
const newLineSeparator = process.platform.startsWith('win') ? '\r\n' : '\n';
|
||||
return new Transform({
|
||||
transform(chunk, _encoding, callback) {
|
||||
const list = chunk.toString().split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g);
|
||||
list
|
||||
.filter(Boolean)
|
||||
.forEach((m) =>
|
||||
this.push(
|
||||
prefix ? prefix + ' ' + m + newLineSeparator : m + newLineSeparator
|
||||
)
|
||||
);
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
const colors = [
|
||||
chalk.green,
|
||||
chalk.greenBright,
|
||||
chalk.red,
|
||||
chalk.redBright,
|
||||
chalk.cyan,
|
||||
chalk.cyanBright,
|
||||
chalk.yellow,
|
||||
chalk.yellowBright,
|
||||
chalk.magenta,
|
||||
chalk.magentaBright,
|
||||
];
|
||||
|
||||
function getColor(projectName: string) {
|
||||
let code = 0;
|
||||
for (let i = 0; i < projectName.length; ++i) {
|
||||
code += projectName.charCodeAt(i);
|
||||
}
|
||||
const colorIndex = code % colors.length;
|
||||
|
||||
return colors[colorIndex];
|
||||
}
|
||||
|
||||
/**
|
||||
* Prevents terminal escape sequence from clearing line prefix.
|
||||
*/
|
||||
function logClearLineToPrefixTransformer(prefix: string) {
|
||||
let prevChunk = null;
|
||||
return new Transform({
|
||||
transform(chunk, _encoding, callback) {
|
||||
if (prevChunk && prevChunk.toString() === '\x1b[2K') {
|
||||
chunk = chunk.toString().replace(/\x1b\[1G/g, (m) => m + prefix);
|
||||
}
|
||||
this.push(chunk);
|
||||
prevChunk = chunk;
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export class NodeChildProcessWithDirectOutput implements RunningTask {
|
||||
private terminalOutput: string | undefined;
|
||||
private exitCallbacks: Array<(code: number, signal: string) => void> = [];
|
||||
|
||||
private exited = false;
|
||||
private exitCode: number;
|
||||
|
||||
constructor(
|
||||
private childProcess: ChildProcess,
|
||||
private temporaryOutputPath: string
|
||||
) {
|
||||
// Re-emit any messages from the task process
|
||||
this.childProcess.on('message', (message) => {
|
||||
if (process.send) {
|
||||
process.send(message);
|
||||
}
|
||||
});
|
||||
|
||||
this.childProcess.on('exit', (code, signal) => {
|
||||
if (code === null) code = signalToCode(signal);
|
||||
|
||||
this.exited = true;
|
||||
this.exitCode = code;
|
||||
|
||||
for (const cb of this.exitCallbacks) {
|
||||
cb(code, signal);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
send(message: Serializable): void {
|
||||
if (this.childProcess.connected) {
|
||||
this.childProcess.send(message);
|
||||
}
|
||||
}
|
||||
|
||||
onExit(cb: (code: number, signal: NodeJS.Signals) => void) {
|
||||
this.exitCallbacks.push(cb);
|
||||
}
|
||||
|
||||
async getResults(): Promise<{ code: number; terminalOutput: string }> {
|
||||
const terminalOutput = this.getTerminalOutput();
|
||||
if (this.exited) {
|
||||
return Promise.resolve({
|
||||
code: this.exitCode,
|
||||
terminalOutput,
|
||||
});
|
||||
}
|
||||
await this.waitForExit();
|
||||
return Promise.resolve({
|
||||
code: this.exitCode,
|
||||
terminalOutput,
|
||||
});
|
||||
}
|
||||
|
||||
waitForExit() {
|
||||
return new Promise<void>((res) => {
|
||||
this.onExit(() => res());
|
||||
});
|
||||
}
|
||||
|
||||
getTerminalOutput() {
|
||||
this.terminalOutput ??= readFileSync(this.temporaryOutputPath).toString();
|
||||
return this.terminalOutput;
|
||||
}
|
||||
|
||||
kill(signal?: NodeJS.Signals | number): void {
|
||||
if (this.childProcess.connected) {
|
||||
this.childProcess.kill(signal);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,20 @@
|
||||
import { Serializable } from 'child_process';
|
||||
import { RunningTask } from './running-task';
|
||||
|
||||
export class NoopChildProcess implements RunningTask {
|
||||
constructor(private results: { code: number; terminalOutput: string }) {}
|
||||
|
||||
send(): void {}
|
||||
|
||||
async getResults(): Promise<{ code: number; terminalOutput: string }> {
|
||||
return this.results;
|
||||
}
|
||||
|
||||
kill(): void {
|
||||
return;
|
||||
}
|
||||
|
||||
onExit(cb: (code: number) => void): void {
|
||||
cb(this.results.code);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,7 @@
|
||||
export abstract class RunningTask {
|
||||
abstract getResults(): Promise<{ code: number; terminalOutput: string }>;
|
||||
|
||||
abstract onExit(cb: (code: number) => void): void;
|
||||
|
||||
abstract kill(signal?: NodeJS.Signals | number): Promise<void> | void;
|
||||
}
|
||||
@ -3,7 +3,7 @@ import { performance } from 'perf_hooks';
|
||||
import { relative } from 'path';
|
||||
import { writeFileSync } from 'fs';
|
||||
import { TaskHasher } from '../hasher/task-hasher';
|
||||
import runCommandsImpl from '../executors/run-commands/run-commands.impl';
|
||||
import { runCommands } from '../executors/run-commands/run-commands.impl';
|
||||
import { ForkedProcessTaskRunner } from './forked-process-task-runner';
|
||||
import { Cache, DbCache, dbCacheEnabled, getCache } from './cache';
|
||||
import { DefaultTasksRunnerOptions } from './default-tasks-runner';
|
||||
@ -33,6 +33,9 @@ import { output } from '../utils/output';
|
||||
import { combineOptionsForExecutor } from '../utils/params';
|
||||
import { NxJsonConfiguration } from '../config/nx-json';
|
||||
import type { TaskDetails } from '../native';
|
||||
import { NoopChildProcess } from './running-tasks/noop-child-process';
|
||||
import { RunningTask } from './running-tasks/running-task';
|
||||
import { NxArgs } from '../utils/command-line-utils';
|
||||
|
||||
export class TaskOrchestrator {
|
||||
private taskDetails: TaskDetails | null = getTaskDetails();
|
||||
@ -64,6 +67,9 @@ export class TaskOrchestrator {
|
||||
|
||||
private bailed = false;
|
||||
|
||||
private runningContinuousTasks = new Map<string, RunningTask>();
|
||||
|
||||
private cleaningUp = false;
|
||||
// endregion internal state
|
||||
|
||||
constructor(
|
||||
@ -72,7 +78,7 @@ export class TaskOrchestrator {
|
||||
private readonly projectGraph: ProjectGraph,
|
||||
private readonly taskGraph: TaskGraph,
|
||||
private readonly nxJson: NxJsonConfiguration,
|
||||
private readonly options: DefaultTasksRunnerOptions,
|
||||
private readonly options: NxArgs & DefaultTasksRunnerOptions,
|
||||
private readonly bail: boolean,
|
||||
private readonly daemon: DaemonClient,
|
||||
private readonly outputStyle: string
|
||||
@ -91,13 +97,17 @@ export class TaskOrchestrator {
|
||||
|
||||
performance.mark('task-execution:start');
|
||||
|
||||
const threadCount =
|
||||
this.options.parallel +
|
||||
Object.values(this.taskGraph.tasks).filter((t) => t.continuous).length;
|
||||
|
||||
const threads = [];
|
||||
|
||||
process.stdout.setMaxListeners(this.options.parallel + defaultMaxListeners);
|
||||
process.stderr.setMaxListeners(this.options.parallel + defaultMaxListeners);
|
||||
process.stdout.setMaxListeners(threadCount + defaultMaxListeners);
|
||||
process.stderr.setMaxListeners(threadCount + defaultMaxListeners);
|
||||
|
||||
// initial seeding of the queue
|
||||
for (let i = 0; i < this.options.parallel; ++i) {
|
||||
for (let i = 0; i < threadCount; ++i) {
|
||||
threads.push(this.executeNextBatchOfTasksUsingTaskSchedule());
|
||||
}
|
||||
await Promise.all(threads);
|
||||
@ -110,6 +120,8 @@ export class TaskOrchestrator {
|
||||
);
|
||||
this.cache.removeOldCacheRecords();
|
||||
|
||||
await this.cleanup();
|
||||
|
||||
return this.completedTasks;
|
||||
}
|
||||
|
||||
@ -139,7 +151,11 @@ export class TaskOrchestrator {
|
||||
if (task) {
|
||||
const groupId = this.closeGroup();
|
||||
|
||||
if (task.continuous) {
|
||||
await this.startContinuousTask(task, groupId);
|
||||
} else {
|
||||
await this.applyFromCacheOrRunTask(doNotSkipCache, task, groupId);
|
||||
}
|
||||
|
||||
this.openGroup(groupId);
|
||||
|
||||
@ -325,12 +341,14 @@ export class TaskOrchestrator {
|
||||
|
||||
private async runBatch(batch: Batch, env: NodeJS.ProcessEnv) {
|
||||
try {
|
||||
const results = await this.forkedProcessTaskRunner.forkProcessForBatch(
|
||||
const batchProcess =
|
||||
await this.forkedProcessTaskRunner.forkProcessForBatch(
|
||||
batch,
|
||||
this.projectGraph,
|
||||
this.taskGraph,
|
||||
env
|
||||
);
|
||||
const results = await batchProcess.getResults();
|
||||
const batchResultEntries = Object.entries(results);
|
||||
return batchResultEntries.map(([taskId, result]) => ({
|
||||
...result,
|
||||
@ -401,6 +419,31 @@ export class TaskOrchestrator {
|
||||
|
||||
// the task wasn't cached
|
||||
if (results.length === 0) {
|
||||
const childProcess = await this.runTask(
|
||||
task,
|
||||
streamOutput,
|
||||
env,
|
||||
temporaryOutputPath,
|
||||
pipeOutput
|
||||
);
|
||||
|
||||
const { code, terminalOutput } = await childProcess.getResults();
|
||||
results.push({
|
||||
task,
|
||||
status: code === 0 ? 'success' : 'failure',
|
||||
terminalOutput,
|
||||
});
|
||||
}
|
||||
await this.postRunSteps([task], results, doNotSkipCache, { groupId });
|
||||
}
|
||||
|
||||
private async runTask(
|
||||
task: Task,
|
||||
streamOutput: boolean,
|
||||
env: { [p: string]: string | undefined; TZ?: string },
|
||||
temporaryOutputPath: string,
|
||||
pipeOutput: boolean
|
||||
): Promise<RunningTask> {
|
||||
const shouldPrefix =
|
||||
streamOutput && process.env.NX_PREFIX_OUTPUT === 'true';
|
||||
const targetConfiguration = getTargetConfigurationForTask(
|
||||
@ -417,8 +460,7 @@ export class TaskOrchestrator {
|
||||
const isRunOne = this.initiatingProject != null;
|
||||
const combinedOptions = combineOptionsForExecutor(
|
||||
task.overrides,
|
||||
task.target.configuration ??
|
||||
targetConfiguration.defaultConfiguration,
|
||||
task.target.configuration ?? targetConfiguration.defaultConfiguration,
|
||||
targetConfiguration,
|
||||
schema,
|
||||
task.target.project,
|
||||
@ -435,32 +477,33 @@ export class TaskOrchestrator {
|
||||
const args = getPrintableCommandArgsForTask(task);
|
||||
output.logCommand(args.join(' '));
|
||||
}
|
||||
const { success, terminalOutput } = await runCommandsImpl(
|
||||
const runningTask = await runCommands(
|
||||
{
|
||||
...combinedOptions,
|
||||
env,
|
||||
usePty: isRunOne && !this.tasksSchedule.hasTasks(),
|
||||
usePty:
|
||||
isRunOne &&
|
||||
!this.tasksSchedule.hasTasks() &&
|
||||
this.runningContinuousTasks.size === 0,
|
||||
streamOutput,
|
||||
},
|
||||
{
|
||||
root: workspaceRoot, // only root is needed in runCommandsImpl
|
||||
root: workspaceRoot, // only root is needed in runCommands
|
||||
} as any
|
||||
);
|
||||
|
||||
const status = success ? 'success' : 'failure';
|
||||
runningTask.onExit((code, terminalOutput) => {
|
||||
if (!streamOutput) {
|
||||
this.options.lifeCycle.printTaskTerminalOutput(
|
||||
task,
|
||||
status,
|
||||
code === 0 ? 'success' : 'failure',
|
||||
terminalOutput
|
||||
);
|
||||
}
|
||||
writeFileSync(temporaryOutputPath, terminalOutput);
|
||||
results.push({
|
||||
task,
|
||||
status,
|
||||
terminalOutput,
|
||||
}
|
||||
});
|
||||
|
||||
return runningTask;
|
||||
} catch (e) {
|
||||
if (process.env.NX_VERBOSE_LOGGING === 'true') {
|
||||
console.error(e);
|
||||
@ -469,37 +512,24 @@ export class TaskOrchestrator {
|
||||
}
|
||||
const terminalOutput = e.stack ?? e.message ?? '';
|
||||
writeFileSync(temporaryOutputPath, terminalOutput);
|
||||
results.push({
|
||||
task,
|
||||
status: 'failure',
|
||||
terminalOutput,
|
||||
});
|
||||
}
|
||||
} else if (targetConfiguration.executor === 'nx:noop') {
|
||||
writeFileSync(temporaryOutputPath, '');
|
||||
results.push({
|
||||
task,
|
||||
status: 'success',
|
||||
return new NoopChildProcess({
|
||||
code: 0,
|
||||
terminalOutput: '',
|
||||
});
|
||||
} else {
|
||||
// cache prep
|
||||
const { code, terminalOutput } = await this.runTaskInForkedProcess(
|
||||
return await this.runTaskInForkedProcess(
|
||||
task,
|
||||
env,
|
||||
pipeOutput,
|
||||
temporaryOutputPath,
|
||||
streamOutput
|
||||
);
|
||||
results.push({
|
||||
task,
|
||||
status: code === 0 ? 'success' : 'failure',
|
||||
terminalOutput,
|
||||
});
|
||||
}
|
||||
}
|
||||
await this.postRunSteps([task], results, doNotSkipCache, { groupId });
|
||||
}
|
||||
|
||||
private async runTaskInForkedProcess(
|
||||
task: Task,
|
||||
@ -511,10 +541,10 @@ export class TaskOrchestrator {
|
||||
try {
|
||||
const usePtyFork = process.env.NX_NATIVE_COMMAND_RUNNER !== 'false';
|
||||
|
||||
// Disable the pseudo terminal if this is a run-many
|
||||
const disablePseudoTerminal = !this.initiatingProject;
|
||||
// Disable the pseudo terminal if this is a run-many or when running a continuous task as part of a run-one
|
||||
const disablePseudoTerminal = !this.initiatingProject || task.continuous;
|
||||
// execution
|
||||
const { code, terminalOutput } = usePtyFork
|
||||
const childProcess = usePtyFork
|
||||
? await this.forkedProcessTaskRunner.forkProcess(task, {
|
||||
temporaryOutputPath,
|
||||
streamOutput,
|
||||
@ -531,20 +561,86 @@ export class TaskOrchestrator {
|
||||
env,
|
||||
});
|
||||
|
||||
return {
|
||||
code,
|
||||
terminalOutput,
|
||||
};
|
||||
return childProcess;
|
||||
} catch (e) {
|
||||
if (process.env.NX_VERBOSE_LOGGING === 'true') {
|
||||
console.error(e);
|
||||
}
|
||||
return {
|
||||
return new NoopChildProcess({
|
||||
code: 1,
|
||||
};
|
||||
terminalOutput: undefined,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async startContinuousTask(task: Task, groupId: number) {
|
||||
const taskSpecificEnv = await this.processedTasks.get(task.id);
|
||||
await this.preRunSteps([task], { groupId });
|
||||
|
||||
const pipeOutput = await this.pipeOutputCapture(task);
|
||||
// obtain metadata
|
||||
const temporaryOutputPath = this.cache.temporaryOutputPath(task);
|
||||
const streamOutput =
|
||||
this.outputStyle === 'static'
|
||||
? false
|
||||
: shouldStreamOutput(task, this.initiatingProject);
|
||||
|
||||
let env = pipeOutput
|
||||
? getEnvVariablesForTask(
|
||||
task,
|
||||
taskSpecificEnv,
|
||||
process.env.FORCE_COLOR === undefined
|
||||
? 'true'
|
||||
: process.env.FORCE_COLOR,
|
||||
this.options.skipNxCache,
|
||||
this.options.captureStderr,
|
||||
null,
|
||||
null
|
||||
)
|
||||
: getEnvVariablesForTask(
|
||||
task,
|
||||
taskSpecificEnv,
|
||||
undefined,
|
||||
this.options.skipNxCache,
|
||||
this.options.captureStderr,
|
||||
temporaryOutputPath,
|
||||
streamOutput
|
||||
);
|
||||
const childProcess = await this.runTask(
|
||||
task,
|
||||
streamOutput,
|
||||
env,
|
||||
temporaryOutputPath,
|
||||
pipeOutput
|
||||
);
|
||||
this.runningContinuousTasks.set(task.id, childProcess);
|
||||
|
||||
childProcess.onExit((code) => {
|
||||
if (!this.cleaningUp) {
|
||||
console.error(
|
||||
`Task "${task.id}" is continuous but exited with code ${code}`
|
||||
);
|
||||
this.cleanup().then(() => {
|
||||
process.exit(1);
|
||||
});
|
||||
}
|
||||
});
|
||||
if (
|
||||
this.initiatingProject === task.target.project &&
|
||||
this.options.targets.length === 1 &&
|
||||
this.options.targets[0] === task.target.target
|
||||
) {
|
||||
await childProcess.getResults();
|
||||
} else {
|
||||
await this.tasksSchedule.scheduleNextTasks();
|
||||
// release blocked threads
|
||||
this.waitingForTasks.forEach((f) => f(null));
|
||||
this.waitingForTasks.length = 0;
|
||||
}
|
||||
|
||||
return childProcess;
|
||||
}
|
||||
|
||||
// endregion Single Task
|
||||
|
||||
// region Lifecycle
|
||||
@ -731,4 +827,17 @@ export class TaskOrchestrator {
|
||||
}
|
||||
|
||||
// endregion utils
|
||||
|
||||
private async cleanup() {
|
||||
this.cleaningUp = true;
|
||||
await Promise.all(
|
||||
Array.from(this.runningContinuousTasks).map(async ([taskId, t]) => {
|
||||
try {
|
||||
return t.kill();
|
||||
} catch (e) {
|
||||
console.error(`Unable to terminate ${taskId}\nError:`, e);
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ function createMockTask(id: string, parallelism: boolean = true): Task {
|
||||
outputs: [],
|
||||
overrides: {},
|
||||
parallelism,
|
||||
continuous: false,
|
||||
};
|
||||
}
|
||||
|
||||
@ -65,6 +66,11 @@ describe('TasksSchedule', () => {
|
||||
'app2:build': [],
|
||||
'lib1:build': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:build': [],
|
||||
'app2:build': [],
|
||||
'lib1:build': [],
|
||||
},
|
||||
roots: ['lib1:build', 'app2:build'],
|
||||
};
|
||||
jest.spyOn(nxJsonUtils, 'readNxJson').mockReturnValue({});
|
||||
@ -274,6 +280,13 @@ describe('TasksSchedule', () => {
|
||||
'app4:test': [],
|
||||
'lib1:test': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:test': [],
|
||||
'app2:test': [],
|
||||
'app3:test': [],
|
||||
'app4:test': [],
|
||||
'lib1:test': [],
|
||||
},
|
||||
roots: [
|
||||
'app1:test',
|
||||
'app2:test',
|
||||
@ -552,6 +565,11 @@ describe('TasksSchedule', () => {
|
||||
'app2:build': [],
|
||||
'lib1:build': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:build': [],
|
||||
'app2:build': [],
|
||||
'lib1:build': [],
|
||||
},
|
||||
roots: ['lib1:build', 'app2:build'],
|
||||
};
|
||||
jest.spyOn(nxJsonUtils, 'readNxJson').mockReturnValue({});
|
||||
@ -719,6 +737,11 @@ describe('TasksSchedule', () => {
|
||||
'app2:test': [],
|
||||
'lib1:test': [],
|
||||
},
|
||||
continuousDependencies: {
|
||||
'app1:test': [],
|
||||
'app2:test': [],
|
||||
'lib1:test': [],
|
||||
},
|
||||
roots: ['app1:test', 'app2:test', 'lib1:test'],
|
||||
};
|
||||
jest.spyOn(nxJsonUtils, 'readNxJson').mockReturnValue({});
|
||||
|
||||
@ -212,12 +212,15 @@ export class TasksSchedule {
|
||||
({
|
||||
tasks: {},
|
||||
dependencies: {},
|
||||
continuousDependencies: {},
|
||||
roots: [],
|
||||
} as TaskGraph));
|
||||
|
||||
batch.tasks[task.id] = task;
|
||||
batch.dependencies[task.id] =
|
||||
this.notScheduledTaskGraph.dependencies[task.id];
|
||||
batch.continuousDependencies[task.id] =
|
||||
this.notScheduledTaskGraph.continuousDependencies[task.id];
|
||||
if (isRoot) {
|
||||
batch.roots.push(task.id);
|
||||
}
|
||||
@ -251,9 +254,13 @@ export class TasksSchedule {
|
||||
const hasDependenciesCompleted = this.taskGraph.dependencies[taskId].every(
|
||||
(id) => this.completedTasks.has(id)
|
||||
);
|
||||
const hasContinuousDependenciesStarted =
|
||||
this.taskGraph.continuousDependencies[taskId].every((id) =>
|
||||
this.runningTasks.has(id)
|
||||
);
|
||||
|
||||
// if dependencies have not completed, cannot schedule
|
||||
if (!hasDependenciesCompleted) {
|
||||
if (!hasDependenciesCompleted || !hasContinuousDependenciesStarted) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@ -451,18 +451,20 @@ export function removeTasksFromTaskGraph(
|
||||
graph: TaskGraph,
|
||||
ids: string[]
|
||||
): TaskGraph {
|
||||
const newGraph = removeIdsFromGraph<Task>(graph, ids, graph.tasks);
|
||||
const newGraph = removeIdsFromTaskGraph<Task>(graph, ids, graph.tasks);
|
||||
return {
|
||||
dependencies: newGraph.dependencies,
|
||||
continuousDependencies: newGraph.continuousDependencies,
|
||||
roots: newGraph.roots,
|
||||
tasks: newGraph.mapWithIds,
|
||||
};
|
||||
}
|
||||
|
||||
export function removeIdsFromGraph<T>(
|
||||
function removeIdsFromTaskGraph<T>(
|
||||
graph: {
|
||||
roots: string[];
|
||||
dependencies: Record<string, string[]>;
|
||||
continuousDependencies: Record<string, string[]>;
|
||||
},
|
||||
ids: string[],
|
||||
mapWithIds: Record<string, T>
|
||||
@ -470,9 +472,11 @@ export function removeIdsFromGraph<T>(
|
||||
mapWithIds: Record<string, T>;
|
||||
roots: string[];
|
||||
dependencies: Record<string, string[]>;
|
||||
continuousDependencies: Record<string, string[]>;
|
||||
} {
|
||||
const filteredMapWithIds = {};
|
||||
const dependencies = {};
|
||||
const continuousDependencies = {};
|
||||
const removedSet = new Set(ids);
|
||||
for (let id of Object.keys(mapWithIds)) {
|
||||
if (!removedSet.has(id)) {
|
||||
@ -480,13 +484,18 @@ export function removeIdsFromGraph<T>(
|
||||
dependencies[id] = graph.dependencies[id].filter(
|
||||
(depId) => !removedSet.has(depId)
|
||||
);
|
||||
continuousDependencies[id] = graph.continuousDependencies[id].filter(
|
||||
(depId) => !removedSet.has(depId)
|
||||
);
|
||||
}
|
||||
}
|
||||
return {
|
||||
mapWithIds: filteredMapWithIds,
|
||||
dependencies: dependencies,
|
||||
roots: Object.keys(dependencies).filter(
|
||||
(k) => dependencies[k].length === 0
|
||||
continuousDependencies,
|
||||
roots: Object.keys(filteredMapWithIds).filter(
|
||||
(k) =>
|
||||
dependencies[k].length === 0 && continuousDependencies[k].length === 0
|
||||
),
|
||||
};
|
||||
}
|
||||
@ -505,6 +514,12 @@ export function calculateReverseDeps(
|
||||
});
|
||||
});
|
||||
|
||||
Object.keys(taskGraph.continuousDependencies).forEach((taskId) => {
|
||||
taskGraph.continuousDependencies[taskId].forEach((d) => {
|
||||
reverseTaskDeps[d].push(taskId);
|
||||
});
|
||||
});
|
||||
|
||||
return reverseTaskDeps;
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user