diff --git a/docs/generated/devkit/defaultTasksRunner.md b/docs/generated/devkit/defaultTasksRunner.md index c28df12f4b..d827134f1a 100644 --- a/docs/generated/devkit/defaultTasksRunner.md +++ b/docs/generated/devkit/defaultTasksRunner.md @@ -12,6 +12,7 @@ | `context.daemon?` | `DaemonClient` | | `context.hasher?` | [`TaskHasher`](../../devkit/documents/TaskHasher) | | `context.initiatingProject?` | `string` | +| `context.initiatingTasks` | [`Task`](../../devkit/documents/Task)[] | | `context.nxArgs` | `NxArgs` | | `context.nxJson` | [`NxJsonConfiguration`](../../devkit/documents/NxJsonConfiguration)\<`string`[] \| `"*"`\> | | `context.projectGraph` | [`ProjectGraph`](../../devkit/documents/ProjectGraph) | diff --git a/packages/nx/src/native/index.d.ts b/packages/nx/src/native/index.d.ts index 7ed9ec057e..3cdc799e90 100644 --- a/packages/nx/src/native/index.d.ts +++ b/packages/nx/src/native/index.d.ts @@ -322,7 +322,8 @@ export declare const enum TaskStatus { RemoteCache = 5, NotStarted = 6, InProgress = 7, - Shared = 8 + Shared = 8, + Stopped = 9 } export interface TaskTarget { diff --git a/packages/nx/src/native/tui/app.rs b/packages/nx/src/native/tui/app.rs index 0be260ff68..8dbdee4f11 100644 --- a/packages/nx/src/native/tui/app.rs +++ b/packages/nx/src/native/tui/app.rs @@ -98,7 +98,7 @@ impl App { tasks_list.start_tasks(tasks); } } - + pub fn set_task_status(&mut self, task_id: String, status: TaskStatus) { if let Some(tasks_list) = self .components @@ -112,7 +112,6 @@ impl App { pub fn print_task_terminal_output( &mut self, task_id: String, - status: TaskStatus, output: String, ) { if let Some(tasks_list) = self @@ -120,8 +119,9 @@ impl App { .iter_mut() .find_map(|c| c.as_any_mut().downcast_mut::()) { - // If the status is a cache hit, we need to create a new parser and writer for the task in order to print the output - if is_cache_hit(status) { + // Tasks run within a pseudo-terminal always have a pty instance and do not need a new one + // Tasks not run within a pseudo-terminal need a new pty instance to print output + if !tasks_list.pty_instances.contains_key(&task_id) { let (parser, parser_and_writer) = TasksList::create_empty_parser_and_noop_writer(); // Add ANSI escape sequence to hide cursor at the end of output, it would be confusing to have it visible when a task is a cache hit @@ -129,7 +129,6 @@ impl App { TasksList::write_output_to_parser(parser, output_with_hidden_cursor); tasks_list.create_and_register_pty_instance(&task_id, parser_and_writer); - tasks_list.update_task_status(task_id.clone(), status); let _ = tasks_list.handle_resize(None); return; } @@ -137,7 +136,6 @@ impl App { // If the task is continuous, we are only updating the status, not the output if let Some(task) = tasks_list.tasks.iter_mut().find(|t| t.name == task_id) { if task.continuous { - tasks_list.update_task_status(task_id.clone(), status); let _ = tasks_list.handle_resize(None); } } diff --git a/packages/nx/src/native/tui/components/tasks_list.rs b/packages/nx/src/native/tui/components/tasks_list.rs index 78899f27ed..59fa2d4fad 100644 --- a/packages/nx/src/native/tui/components/tasks_list.rs +++ b/packages/nx/src/native/tui/components/tasks_list.rs @@ -124,6 +124,8 @@ pub enum TaskStatus { InProgress, // This task is being run in a different process Shared, + // This continuous task has been stopped by Nx + Stopped, } impl std::str::FromStr for TaskStatus { @@ -951,17 +953,6 @@ impl TasksList { task_result.task.end_time.unwrap() as u128, ); } - - // If the task never had a pty, it must mean that it was run outside of the pseudo-terminal. - // We create a new parser and writer for the task and register it and then write the final output to the parser - if !self.pty_instances.contains_key(&task.name) { - let (parser, parser_and_writer) = Self::create_empty_parser_and_noop_writer(); - if let Some(task_result_output) = task_result.terminal_output { - Self::write_output_to_parser(parser, task_result_output); - } - let task_name = task.name.clone(); - self.create_and_register_pty_instance(&task_name, parser_and_writer); - } } } self.sort_tasks(); @@ -1402,6 +1393,12 @@ impl Component for TasksList { Cell::from(Line::from(spans)) } + TaskStatus::Stopped => Cell::from(Line::from(vec![ + Span::raw(if is_selected { ">" } else { " " }), + Span::raw(" "), + Span::styled("⯀️", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + ])), TaskStatus::NotStarted => Cell::from(Line::from(vec![ Span::raw(if is_selected { ">" } else { " " }), // No need for parallel section check for pending tasks diff --git a/packages/nx/src/native/tui/components/terminal_pane.rs b/packages/nx/src/native/tui/components/terminal_pane.rs index 8a4d3f6278..5007082c41 100644 --- a/packages/nx/src/native/tui/components/terminal_pane.rs +++ b/packages/nx/src/native/tui/components/terminal_pane.rs @@ -231,6 +231,12 @@ impl<'a> TerminalPane<'a> { .fg(Color::LightCyan) .add_modifier(Modifier::BOLD), ), + TaskStatus::Stopped => Span::styled( + " ⯀️ ", + Style::default() + .fg(Color::DarkGray) + .add_modifier(Modifier::BOLD), + ), TaskStatus::NotStarted => Span::styled( " · ", Style::default() @@ -249,7 +255,7 @@ impl<'a> TerminalPane<'a> { TaskStatus::Failure => Color::Red, TaskStatus::Skipped => Color::Yellow, TaskStatus::InProgress | TaskStatus::Shared=> Color::LightCyan, - TaskStatus::NotStarted => Color::DarkGray, + TaskStatus::NotStarted | TaskStatus::Stopped=> Color::DarkGray, }) } @@ -368,6 +374,27 @@ impl<'a> StatefulWidget for TerminalPane<'a> { return; } + // If the task has been stopped but does not have a pty + if matches!(state.task_status, TaskStatus::Stopped) && !state.has_pty { + let message = vec![Line::from(vec![Span::styled( + "Running in another Nx process...", + if state.is_focused { + self.get_base_style(TaskStatus::Stopped) + } else { + self.get_base_style(TaskStatus::Stopped) + .add_modifier(Modifier::DIM) + }, + )])]; + + let paragraph = Paragraph::new(message) + .block(block) + .alignment(Alignment::Center) + .style(Style::default()); + + Widget::render(paragraph, area, buf); + return; + } + let inner_area = block.inner(area); if let Some(pty_data) = &self.pty_data { diff --git a/packages/nx/src/native/tui/lifecycle.rs b/packages/nx/src/native/tui/lifecycle.rs index 86c42fef6a..423f0dc315 100644 --- a/packages/nx/src/native/tui/lifecycle.rs +++ b/packages/nx/src/native/tui/lifecycle.rs @@ -114,11 +114,12 @@ impl AppLifeCycle { pub fn print_task_terminal_output( &mut self, task: Task, - status: String, + _status: String, output: String, ) -> napi::Result<()> { + debug!("Received task terminal output for {}", task.id); if let Ok(mut app) = self.app.lock() { - app.print_task_terminal_output(task.id, status.parse().unwrap(), output); + app.print_task_terminal_output(task.id, output); } Ok(()) } diff --git a/packages/nx/src/native/tui/utils.rs b/packages/nx/src/native/tui/utils.rs index f5dc2b3e8c..4fdd9e88d1 100644 --- a/packages/nx/src/native/tui/utils.rs +++ b/packages/nx/src/native/tui/utils.rs @@ -61,7 +61,8 @@ pub fn sort_task_items(tasks: &mut [TaskItem]) { | TaskStatus::LocalCacheKeptExisting | TaskStatus::LocalCache | TaskStatus::RemoteCache - | TaskStatus::Skipped => 2, + | TaskStatus::Skipped + | TaskStatus::Stopped => 2, TaskStatus::NotStarted => 3, } }; @@ -326,6 +327,7 @@ mod tests { | TaskStatus::LocalCacheKeptExisting | TaskStatus::LocalCache | TaskStatus::RemoteCache + | TaskStatus::Stopped | TaskStatus::Skipped => 2, TaskStatus::NotStarted => 3, } diff --git a/packages/nx/src/tasks-runner/default-tasks-runner.ts b/packages/nx/src/tasks-runner/default-tasks-runner.ts index e63471010a..ed4bef7633 100644 --- a/packages/nx/src/tasks-runner/default-tasks-runner.ts +++ b/packages/nx/src/tasks-runner/default-tasks-runner.ts @@ -113,6 +113,7 @@ export const defaultTasksRunner: TasksRunner< context: { target: string; initiatingProject?: string; + initiatingTasks: Task[]; projectGraph: ProjectGraph; nxJson: NxJsonConfiguration; nxArgs: NxArgs; @@ -134,6 +135,7 @@ async function runAllTasks( options: DefaultTasksRunnerOptions, context: { initiatingProject?: string; + initiatingTasks: Task[]; projectGraph: ProjectGraph; nxJson: NxJsonConfiguration; nxArgs: NxArgs; @@ -145,6 +147,7 @@ async function runAllTasks( const orchestrator = new TaskOrchestrator( context.hasher, context.initiatingProject, + context.initiatingTasks, context.projectGraph, context.taskGraph, context.nxJson, diff --git a/packages/nx/src/tasks-runner/init-tasks-runner.ts b/packages/nx/src/tasks-runner/init-tasks-runner.ts index 618ad4ba07..b656a35135 100644 --- a/packages/nx/src/tasks-runner/init-tasks-runner.ts +++ b/packages/nx/src/tasks-runner/init-tasks-runner.ts @@ -77,6 +77,7 @@ export async function initTasksRunner(nxArgs: NxArgs) { nxArgs: { ...nxArgs, parallel: opts.parallel }, loadDotEnvFiles: true, initiatingProject: null, + initiatingTasks: [], }); return { @@ -135,6 +136,7 @@ async function createOrchestrator( const orchestrator = new TaskOrchestrator( hasher, null, + [], projectGraph, taskGraph, nxJson, diff --git a/packages/nx/src/tasks-runner/life-cycle.ts b/packages/nx/src/tasks-runner/life-cycle.ts index d362267896..b0f02276aa 100644 --- a/packages/nx/src/tasks-runner/life-cycle.ts +++ b/packages/nx/src/tasks-runner/life-cycle.ts @@ -67,7 +67,7 @@ export interface LifeCycle { registerRunningTask?( taskId: string, parserAndWriter: ExternalObject<[any, any]> - ): Promise; + ): void; setTaskStatus?(taskId: string, status: NativeTaskStatus): void; @@ -152,13 +152,13 @@ export class CompositeLifeCycle implements LifeCycle { } } - async registerRunningTask( + registerRunningTask( taskId: string, parserAndWriter: ExternalObject<[any, any]> - ): Promise { + ): void { for (let l of this.lifeCycles) { if (l.registerRunningTask) { - await l.registerRunningTask(taskId, parserAndWriter); + l.registerRunningTask(taskId, parserAndWriter); } } } diff --git a/packages/nx/src/tasks-runner/life-cycles/tui-summary-life-cycle.ts b/packages/nx/src/tasks-runner/life-cycles/tui-summary-life-cycle.ts index 5c716a5429..71a21ff06d 100644 --- a/packages/nx/src/tasks-runner/life-cycles/tui-summary-life-cycle.ts +++ b/packages/nx/src/tasks-runner/life-cycles/tui-summary-life-cycle.ts @@ -1,4 +1,5 @@ import { EOL } from 'node:os'; +import { TaskStatus as NativeTaskStatus } from '../../native'; import { Task } from '../../config/task-graph'; import { output } from '../../utils/output'; import type { LifeCycle } from '../life-cycle'; @@ -18,6 +19,7 @@ export function getTuiTerminalSummaryLifeCycle({ args, overrides, initiatingProject, + initiatingTasks, resolveRenderIsDonePromise, }: { projectNames: string[]; @@ -25,6 +27,7 @@ export function getTuiTerminalSummaryLifeCycle({ args: { targets?: string[]; configuration?: string; parallel?: number }; overrides: Record; initiatingProject: string; + initiatingTasks: Task[]; resolveRenderIsDonePromise: (value: void) => void; }) { const lifeCycle = {} as Partial; @@ -37,6 +40,7 @@ export function getTuiTerminalSummaryLifeCycle({ let totalSuccessfulTasks = 0; let totalFailedTasks = 0; let totalCompletedTasks = 0; + let totalStoppedTasks = 0; let timeTakenText: string; const failedTasks = new Set(); @@ -55,13 +59,20 @@ export function getTuiTerminalSummaryLifeCycle({ lifeCycle.printTaskTerminalOutput = (task, taskStatus, terminalOutput) => { tasksToTerminalOutputs[task.id] = { terminalOutput, taskStatus }; - taskIdsInOrderOfCompletion.push(task.id); + }; + + lifeCycle.setTaskStatus = (taskId, taskStatus) => { + if (taskStatus === NativeTaskStatus.Stopped) { + totalStoppedTasks++; + taskIdsInOrderOfCompletion.push(taskId); + } }; lifeCycle.endTasks = (taskResults) => { for (let t of taskResults) { totalCompletedTasks++; inProgressTasks.delete(t.task.id); + taskIdsInOrderOfCompletion.push(t.task.id); switch (t.status) { case 'remote-cache': @@ -106,7 +117,7 @@ export function getTuiTerminalSummaryLifeCycle({ const printRunOneSummary = () => { let lines: string[] = []; - const failure = totalSuccessfulTasks !== totalTasks; + const failure = totalSuccessfulTasks + totalStoppedTasks !== totalTasks; // Prints task outputs in the order they were completed // above the summary, since run-one should print all task results. @@ -153,7 +164,7 @@ export function getTuiTerminalSummaryLifeCycle({ ); } lines = [output.colors.green(lines.join(EOL))]; - } else if (totalCompletedTasks === totalTasks) { + } else if (totalCompletedTasks + totalStoppedTasks === totalTasks) { let text = `Ran target ${output.bold( targets[0] )} for project ${output.bold(initiatingProject)}`; @@ -219,7 +230,7 @@ export function getTuiTerminalSummaryLifeCycle({ console.log(''); const lines: string[] = []; - const failure = totalSuccessfulTasks !== totalTasks; + const failure = totalSuccessfulTasks + totalStoppedTasks !== totalTasks; for (const taskId of taskIdsInOrderOfCompletion) { const { terminalOutput, taskStatus } = tasksToTerminalOutputs[taskId]; @@ -241,7 +252,7 @@ export function getTuiTerminalSummaryLifeCycle({ lines.push(...output.getVerticalSeparatorLines(failure ? 'red' : 'green')); - if (totalSuccessfulTasks === totalTasks) { + if (totalSuccessfulTasks + totalStoppedTasks === totalTasks) { const successSummaryRows = []; const text = `Successfully ran ${formatTargetsAndProjects( projectNames, diff --git a/packages/nx/src/tasks-runner/run-command.ts b/packages/nx/src/tasks-runner/run-command.ts index 091a592611..40bb19fb97 100644 --- a/packages/nx/src/tasks-runner/run-command.ts +++ b/packages/nx/src/tasks-runner/run-command.ts @@ -75,13 +75,18 @@ const originalConsoleError = console.error.bind(console); async function getTerminalOutputLifeCycle( initiatingProject: string, + initiatingTasks: Task[], projectNames: string[], tasks: Task[], taskGraph: TaskGraph, nxArgs: NxArgs, nxJson: NxJsonConfiguration, overrides: Record -): Promise<{ lifeCycle: LifeCycle; renderIsDone: Promise }> { +): Promise<{ + lifeCycle: LifeCycle; + printSummary?: () => void; + renderIsDone: Promise; +}> { const overridesWithoutHidden = { ...overrides }; delete overridesWithoutHidden['__overrides_unparsed__']; @@ -129,11 +134,7 @@ async function getTerminalOutputLifeCycle( let titleText = ''; if (isRunOne) { - const mainTaskId = createTaskId( - initiatingProject, - nxArgs.targets[0], - nxArgs.configuration - ); + const mainTaskId = initiatingTasks[0].id; pinnedTasks.push(mainTaskId); const mainContinuousDependencies = taskGraph.continuousDependencies[mainTaskId]; @@ -169,6 +170,7 @@ async function getTerminalOutputLifeCycle( args: nxArgs, overrides: overridesWithoutHidden, initiatingProject, + initiatingTasks, resolveRenderIsDonePromise, }); @@ -179,7 +181,6 @@ async function getTerminalOutputLifeCycle( process.stderr.write = originalStderrWrite; console.log = originalConsoleLog; console.error = originalConsoleError; - printSummary(); }); } @@ -264,7 +265,6 @@ async function getTerminalOutputLifeCycle( process.stderr.write = originalStderrWrite; console.log = originalConsoleLog; console.error = originalConsoleError; - printSummary(); // Print the intercepted Nx Cloud logs for (const log of interceptedNxCloudLogs) { const logString = log.toString().trimStart(); @@ -278,6 +278,7 @@ async function getTerminalOutputLifeCycle( return { lifeCycle: new CompositeLifeCycle(lifeCycles), + printSummary, renderIsDone, }; } @@ -445,6 +446,7 @@ export async function runCommandForTasks( extraOptions: { excludeTaskDependencies: boolean; loadDotEnvFiles: boolean } ): Promise { const projectNames = projectsToRun.map((t) => t.name); + const projectNameSet = new Set(projectNames); const { projectGraph, taskGraph } = await ensureWorkspaceIsInSyncAndGetGraphs( currentProjectGraph, @@ -457,16 +459,24 @@ export async function runCommandForTasks( ); const tasks = Object.values(taskGraph.tasks); - const { lifeCycle, renderIsDone } = await getTerminalOutputLifeCycle( - initiatingProject, - projectNames, - tasks, - taskGraph, - nxArgs, - nxJson, - overrides + const initiatingTasks = tasks.filter( + (t) => + projectNameSet.has(t.target.project) && + nxArgs.targets.includes(t.target.target) ); + const { lifeCycle, renderIsDone, printSummary } = + await getTerminalOutputLifeCycle( + initiatingProject, + initiatingTasks, + projectNames, + tasks, + taskGraph, + nxArgs, + nxJson, + overrides + ); + const taskResults = await invokeTasksRunner({ tasks, projectGraph, @@ -476,10 +486,15 @@ export async function runCommandForTasks( nxArgs, loadDotEnvFiles: extraOptions.loadDotEnvFiles, initiatingProject, + initiatingTasks, }); await renderIsDone; + if (printSummary) { + printSummary(); + } + await printNxKey(); return taskResults; @@ -814,6 +829,7 @@ export async function invokeTasksRunner({ nxArgs, loadDotEnvFiles, initiatingProject, + initiatingTasks, }: { tasks: Task[]; projectGraph: ProjectGraph; @@ -823,6 +839,7 @@ export async function invokeTasksRunner({ nxArgs: NxArgs; loadDotEnvFiles: boolean; initiatingProject: string | null; + initiatingTasks: Task[]; }): Promise<{ [id: string]: TaskResult }> { setEnvVarsBasedOnArgs(nxArgs, loadDotEnvFiles); @@ -861,6 +878,7 @@ export async function invokeTasksRunner({ { initiatingProject: nxArgs.outputStyle === 'compact' ? null : initiatingProject, + initiatingTasks, projectGraph, nxJson, nxArgs, diff --git a/packages/nx/src/tasks-runner/running-tasks/running-task.ts b/packages/nx/src/tasks-runner/running-tasks/running-task.ts index 7355d16edb..ed4fa27268 100644 --- a/packages/nx/src/tasks-runner/running-tasks/running-task.ts +++ b/packages/nx/src/tasks-runner/running-tasks/running-task.ts @@ -1,7 +1,11 @@ +import type { Serializable } from 'child_process'; + export abstract class RunningTask { abstract getResults(): Promise<{ code: number; terminalOutput: string }>; abstract onExit(cb: (code: number) => void): void; abstract kill(signal?: NodeJS.Signals | number): Promise | void; + + abstract send?(message: Serializable): void; } diff --git a/packages/nx/src/tasks-runner/running-tasks/shared-running-task.ts b/packages/nx/src/tasks-runner/running-tasks/shared-running-task.ts new file mode 100644 index 0000000000..ca4e848cf4 --- /dev/null +++ b/packages/nx/src/tasks-runner/running-tasks/shared-running-task.ts @@ -0,0 +1,36 @@ +import { RunningTask } from './running-task'; +import { RunningTasksService } from '../../native'; + +export class SharedRunningTask implements RunningTask { + private exitCallbacks: ((code: number) => void)[] = []; + + constructor( + private runningTasksService: RunningTasksService, + taskId: string + ) { + this.waitForTaskToFinish(taskId).then(() => { + // notify exit callbacks + this.exitCallbacks.forEach((cb) => cb(0)); + }); + } + + async getResults(): Promise<{ code: number; terminalOutput: string }> { + throw new Error('Results cannot be retrieved from a shared task'); + } + + kill(): void { + this.exitCallbacks.forEach((cb) => cb(0)); + } + + onExit(cb: (code: number) => void): void { + this.exitCallbacks.push(cb); + } + + private async waitForTaskToFinish(taskId: string) { + console.log(`Waiting for ${taskId} in another nx process`); + // wait for the running task to finish + do { + await new Promise((resolve) => setTimeout(resolve, 100)); + } while (this.runningTasksService.getRunningTasks([taskId]).length); + } +} diff --git a/packages/nx/src/tasks-runner/task-orchestrator.ts b/packages/nx/src/tasks-runner/task-orchestrator.ts index 95f8066de9..fa5789a98c 100644 --- a/packages/nx/src/tasks-runner/task-orchestrator.ts +++ b/packages/nx/src/tasks-runner/task-orchestrator.ts @@ -43,6 +43,7 @@ import { removeTasksFromTaskGraph, shouldStreamOutput, } from './utils'; +import { SharedRunningTask } from './running-tasks/shared-running-task'; export class TaskOrchestrator { private taskDetails: TaskDetails | null = getTaskDetails(); @@ -67,6 +68,8 @@ export class TaskOrchestrator { ); private reverseTaskDeps = calculateReverseDeps(this.taskGraph); + private initializingTaskIds = new Set(this.initiatingTasks.map((t) => t.id)); + private processedTasks = new Map>(); private processedBatches = new Map>(); @@ -81,13 +84,12 @@ export class TaskOrchestrator { private runningContinuousTasks = new Map(); - private cleaningUp = false; - // endregion internal state constructor( private readonly hasher: TaskHasher, private readonly initiatingProject: string | undefined, + private readonly initiatingTasks: Task[], private readonly projectGraph: ProjectGraph, private readonly taskGraph: TaskGraph, private readonly nxJson: NxJsonConfiguration, @@ -652,16 +654,24 @@ export class TaskOrchestrator { this.options.lifeCycle.setTaskStatus(task.id, NativeTaskStatus.Shared); } + const runningTask = new SharedRunningTask( + this.runningTasksService, + task.id + ); + + this.runningContinuousTasks.set(task.id, runningTask); + runningTask.onExit(() => { + this.runningContinuousTasks.delete(task.id); + }); + // task is already running by another process, we schedule the next tasks // and release the threads await this.scheduleNextTasksAndReleaseThreads(); - - // wait for the running task to finish - do { - console.log(`Waiting for ${task.id} in another nx process`); - await new Promise((resolve) => setTimeout(resolve, 100)); - } while (this.runningTasksService.getRunningTasks([task.id]).length); - return; + if (this.initializingTaskIds.has(task.id)) { + // Hold the thread forever + await new Promise(() => {}); + } + return runningTask; } const taskSpecificEnv = await this.processedTasks.get(task.id); @@ -708,15 +718,12 @@ export class TaskOrchestrator { childProcess.onExit(() => { this.runningTasksService.removeRunningTask(task.id); + this.runningContinuousTasks.delete(task.id); }); - if ( - this.initiatingProject === task.target.project && - this.options.targets.length === 1 && - this.options.targets[0] === task.target.target - ) { - await childProcess.getResults(); - } else { - await this.scheduleNextTasksAndReleaseThreads(); + await this.scheduleNextTasksAndReleaseThreads(); + if (this.initializingTaskIds.has(task.id)) { + // Hold the thread forever + await new Promise(() => {}); } return childProcess; @@ -832,6 +839,8 @@ export class TaskOrchestrator { ) { this.tasksSchedule.complete(taskResults.map(({ taskId }) => taskId)); + this.cleanUpUnneededContinuousTasks(); + for (const { taskId, status } of taskResults) { if (this.completedTasks[taskId] === undefined) { this.completedTasks[taskId] = status; @@ -914,11 +923,14 @@ export class TaskOrchestrator { // endregion utils private async cleanup() { - this.cleaningUp = true; await Promise.all( Array.from(this.runningContinuousTasks).map(async ([taskId, t]) => { try { - return t.kill(); + await t.kill(); + this.options.lifeCycle.setTaskStatus( + taskId, + NativeTaskStatus.Stopped + ); } catch (e) { console.error(`Unable to terminate ${taskId}\nError:`, e); } finally { @@ -927,6 +939,31 @@ export class TaskOrchestrator { }) ); } + + private cleanUpUnneededContinuousTasks() { + const incompleteTasks = this.tasksSchedule.getIncompleteTasks(); + const neededContinuousTasks = new Set(this.initializingTaskIds); + for (const task of incompleteTasks) { + const continuousDependencies = + this.taskGraph.continuousDependencies[task.id]; + for (const continuousDependency of continuousDependencies) { + neededContinuousTasks.add(continuousDependency); + } + } + + for (const taskId of this.runningContinuousTasks.keys()) { + if (!neededContinuousTasks.has(taskId)) { + const runningTask = this.runningContinuousTasks.get(taskId); + if (runningTask) { + runningTask.kill(); + this.options.lifeCycle.setTaskStatus( + taskId, + NativeTaskStatus.Stopped + ); + } + } + } + } } export function getThreadCount( diff --git a/packages/nx/src/tasks-runner/tasks-runner.ts b/packages/nx/src/tasks-runner/tasks-runner.ts index cb5fa64323..118ad9b8ed 100644 --- a/packages/nx/src/tasks-runner/tasks-runner.ts +++ b/packages/nx/src/tasks-runner/tasks-runner.ts @@ -23,6 +23,7 @@ export type TasksRunner = ( context?: { target?: string; initiatingProject?: string | null; + initiatingTasks: Task[]; projectGraph: ProjectGraph; nxJson: NxJsonConfiguration; nxArgs: NxArgs; diff --git a/packages/nx/src/tasks-runner/tasks-schedule.ts b/packages/nx/src/tasks-runner/tasks-schedule.ts index d7df4e981a..871dff52ff 100644 --- a/packages/nx/src/tasks-runner/tasks-schedule.ts +++ b/packages/nx/src/tasks-runner/tasks-schedule.ts @@ -101,6 +101,16 @@ export class TasksSchedule { : null; } + public getIncompleteTasks(): Task[] { + const incompleteTasks: Task[] = []; + for (const taskId in this.taskGraph.tasks) { + if (!this.completedTasks.has(taskId)) { + incompleteTasks.push(this.taskGraph.tasks[taskId]); + } + } + return incompleteTasks; + } + private async scheduleTasks() { if (this.options.batch || process.env.NX_BATCH_MODE === 'true') { await this.scheduleBatches();