diff --git a/packages/nx/src/tasks-runner/default-tasks-runner.ts b/packages/nx/src/tasks-runner/default-tasks-runner.ts index 85ff5f09ec..e63471010a 100644 --- a/packages/nx/src/tasks-runner/default-tasks-runner.ts +++ b/packages/nx/src/tasks-runner/default-tasks-runner.ts @@ -1,5 +1,5 @@ import { TasksRunner, TaskStatus } from './tasks-runner'; -import { TaskOrchestrator } from './task-orchestrator'; +import { getThreadCount, TaskOrchestrator } from './task-orchestrator'; import { TaskHasher } from '../hasher/task-hasher'; import { LifeCycle } from './life-cycle'; import { ProjectGraph } from '../config/project-graph'; @@ -121,32 +121,10 @@ export const defaultTasksRunner: TasksRunner< daemon: DaemonClient; } ): Promise<{ [id: string]: TaskStatus }> => { - if ( - (options as any)['parallel'] === 'false' || - (options as any)['parallel'] === false - ) { - (options as any)['parallel'] = 1; - } else if ( - (options as any)['parallel'] === 'true' || - (options as any)['parallel'] === true || - (options as any)['parallel'] === undefined || - (options as any)['parallel'] === '' - ) { - (options as any)['parallel'] = Number((options as any)['maxParallel'] || 3); - } - - const maxParallel = - options['parallel'] + - Object.values(context.taskGraph.tasks).filter((t) => t.continuous).length; - const totalTasks = Object.values(context.taskGraph.tasks).length; - const threadCount = Math.min(maxParallel, totalTasks); - + const threadCount = getThreadCount(options, context.taskGraph); await options.lifeCycle.startCommand(threadCount); try { - return await runAllTasks(options, { - ...context, - threadCount, - }); + return await runAllTasks(options, context); } finally { await options.lifeCycle.endCommand(); } @@ -162,7 +140,6 @@ async function runAllTasks( taskGraph: TaskGraph; hasher: TaskHasher; daemon: DaemonClient; - threadCount: number; } ): Promise<{ [id: string]: TaskStatus }> { const orchestrator = new TaskOrchestrator( @@ -172,7 +149,6 @@ async function runAllTasks( context.taskGraph, context.nxJson, options, - context.threadCount, context.nxArgs?.nxBail, context.daemon, context.nxArgs?.outputStyle diff --git a/packages/nx/src/tasks-runner/init-tasks-runner.ts b/packages/nx/src/tasks-runner/init-tasks-runner.ts index b614f01737..618ad4ba07 100644 --- a/packages/nx/src/tasks-runner/init-tasks-runner.ts +++ b/packages/nx/src/tasks-runner/init-tasks-runner.ts @@ -1,14 +1,29 @@ -import { readNxJson } from '../config/configuration'; +import { readNxJson } from '../config/nx-json'; import { NxArgs } from '../utils/command-line-utils'; import { createProjectGraphAsync } from '../project-graph/project-graph'; import { Task, TaskGraph } from '../config/task-graph'; -import { invokeTasksRunner } from './run-command'; +import { + constructLifeCycles, + getRunner, + invokeTasksRunner, +} from './run-command'; import { InvokeRunnerTerminalOutputLifeCycle } from './life-cycles/invoke-runner-terminal-output-life-cycle'; import { performance } from 'perf_hooks'; import { getOutputs } from './utils'; import { loadRootEnvFiles } from '../utils/dotenv'; -import { TaskResult } from './life-cycle'; +import { CompositeLifeCycle, LifeCycle, TaskResult } from './life-cycle'; +import { TaskOrchestrator } from './task-orchestrator'; +import { createTaskHasher } from '../hasher/create-task-hasher'; +import type { ProjectGraph } from '../config/project-graph'; +import type { NxJsonConfiguration } from '../config/nx-json'; +import { daemonClient } from '../daemon/client/client'; +import { RunningTask } from './running-tasks/running-task'; +import { TaskResultsLifeCycle } from './life-cycles/task-results-life-cycle'; +/** + * This function is deprecated. Do not use this + * @deprecated This function is deprecated. Do not use this + */ export async function initTasksRunner(nxArgs: NxArgs) { performance.mark('init-local'); loadRootEnvFiles(); @@ -77,3 +92,101 @@ export async function initTasksRunner(nxArgs: NxArgs) { }, }; } + +async function createOrchestrator( + tasks: Task[], + projectGraph: ProjectGraph, + taskGraphForHashing: TaskGraph, + nxJson: NxJsonConfiguration, + lifeCycle: LifeCycle +) { + loadRootEnvFiles(); + + const invokeRunnerTerminalLifecycle = new InvokeRunnerTerminalOutputLifeCycle( + tasks + ); + const taskResultsLifecycle = new TaskResultsLifeCycle(); + const compositedLifeCycle: LifeCycle = new CompositeLifeCycle([ + ...constructLifeCycles(invokeRunnerTerminalLifecycle), + taskResultsLifecycle, + lifeCycle, + ]); + + const { runnerOptions: options } = getRunner({}, nxJson); + + let hasher = createTaskHasher(projectGraph, nxJson, options); + + const taskGraph: TaskGraph = { + roots: tasks.map((task) => task.id), + tasks: tasks.reduce((acc, task) => { + acc[task.id] = task; + return acc; + }, {} as any), + dependencies: tasks.reduce((acc, task) => { + acc[task.id] = []; + return acc; + }, {} as any), + continuousDependencies: tasks.reduce((acc, task) => { + acc[task.id] = []; + return acc; + }, {} as any), + }; + + const orchestrator = new TaskOrchestrator( + hasher, + null, + projectGraph, + taskGraph, + nxJson, + { ...options, parallel: tasks.length, lifeCycle: compositedLifeCycle }, + false, + daemonClient, + undefined, + taskGraphForHashing + ); + + await orchestrator.init(); + + await Promise.all(tasks.map((task) => orchestrator.processTask(task.id))); + + return orchestrator; +} + +export async function runDiscreteTasks( + tasks: Task[], + projectGraph: ProjectGraph, + taskGraphForHashing: TaskGraph, + nxJson: NxJsonConfiguration, + lifeCycle: LifeCycle +) { + const orchestrator = await createOrchestrator( + tasks, + projectGraph, + taskGraphForHashing, + nxJson, + lifeCycle + ); + return tasks.map((task, index) => + orchestrator.applyFromCacheOrRunTask(true, task, index) + ); +} + +export async function runContinuousTasks( + tasks: Task[], + projectGraph: ProjectGraph, + taskGraphForHashing: TaskGraph, + nxJson: NxJsonConfiguration, + lifeCycle: LifeCycle +) { + const orchestrator = await createOrchestrator( + tasks, + projectGraph, + taskGraphForHashing, + nxJson, + lifeCycle + ); + return tasks.reduce((current, task, index) => { + current[task.id] = orchestrator.startContinuousTask(task, index); + return current; + }, {} as Record>); +} diff --git a/packages/nx/src/tasks-runner/run-command.ts b/packages/nx/src/tasks-runner/run-command.ts index 3d654ccfd1..091a592611 100644 --- a/packages/nx/src/tasks-runner/run-command.ts +++ b/packages/nx/src/tasks-runner/run-command.ts @@ -933,7 +933,7 @@ export async function invokeTasksRunner({ return taskResultsLifecycle.getTaskResults(); } -function constructLifeCycles(lifeCycle: LifeCycle): LifeCycle[] { +export function constructLifeCycles(lifeCycle: LifeCycle): LifeCycle[] { const lifeCycles = [] as LifeCycle[]; lifeCycles.push(new StoreRunInformationLifeCycle()); lifeCycles.push(lifeCycle); diff --git a/packages/nx/src/tasks-runner/task-orchestrator.ts b/packages/nx/src/tasks-runner/task-orchestrator.ts index 53f04d18bf..38e248e1cb 100644 --- a/packages/nx/src/tasks-runner/task-orchestrator.ts +++ b/packages/nx/src/tasks-runner/task-orchestrator.ts @@ -88,32 +88,38 @@ export class TaskOrchestrator { private readonly taskGraph: TaskGraph, private readonly nxJson: NxJsonConfiguration, private readonly options: NxArgs & DefaultTasksRunnerOptions, - private readonly threadCount: number, private readonly bail: boolean, private readonly daemon: DaemonClient, - private readonly outputStyle: string + private readonly outputStyle: string, + private readonly taskGraphForHashing: TaskGraph = taskGraph ) {} - async run() { + async init() { // Init the ForkedProcessTaskRunner, TasksSchedule, and Cache await Promise.all([ this.forkedProcessTaskRunner.init(), this.tasksSchedule.init(), 'init' in this.cache ? this.cache.init() : null, ]); + } + + async run() { + await this.init(); // initial scheduling await this.tasksSchedule.scheduleNextTasks(); performance.mark('task-execution:start'); + const threadCount = getThreadCount(this.options, this.taskGraph); + const threads = []; - process.stdout.setMaxListeners(this.threadCount + defaultMaxListeners); - process.stderr.setMaxListeners(this.threadCount + defaultMaxListeners); + process.stdout.setMaxListeners(threadCount + defaultMaxListeners); + process.stderr.setMaxListeners(threadCount + defaultMaxListeners); // initial seeding of the queue - for (let i = 0; i < this.threadCount; ++i) { + for (let i = 0; i < threadCount; ++i) { threads.push(this.executeNextBatchOfTasksUsingTaskSchedule()); } await Promise.all(threads); @@ -175,9 +181,7 @@ export class TaskOrchestrator { } // region Processing Scheduled Tasks - private async processScheduledTask( - taskId: string - ): Promise { + async processTask(taskId: string): Promise { const task = this.taskGraph.tasks[taskId]; const taskSpecificEnv = getTaskSpecificEnv(task); @@ -185,7 +189,7 @@ export class TaskOrchestrator { await hashTask( this.hasher, this.projectGraph, - this.taskGraph, + this.taskGraphForHashing, task, taskSpecificEnv, this.taskDetails @@ -204,7 +208,7 @@ export class TaskOrchestrator { await hashTask( this.hasher, this.projectGraph, - this.taskGraph, + this.taskGraphForHashing, task, this.batchEnv, this.taskDetails @@ -225,7 +229,7 @@ export class TaskOrchestrator { for (const taskId of scheduledTasks) { // Task is already handled or being handled if (!this.processedTasks.has(taskId)) { - this.processedTasks.set(taskId, this.processScheduledTask(taskId)); + this.processedTasks.set(taskId, this.processTask(taskId)); } } } @@ -236,6 +240,7 @@ export class TaskOrchestrator { private async applyCachedResults(tasks: Task[]): Promise< { task: Task; + code: number; status: 'local-cache' | 'local-cache-kept-existing' | 'remote-cache'; }[] > { @@ -250,6 +255,7 @@ export class TaskOrchestrator { private async applyCachedResult(task: Task): Promise<{ task: Task; + code: number; status: 'local-cache' | 'local-cache-kept-existing' | 'remote-cache'; }> { const cachedResult = await this.cache.get(task); @@ -277,6 +283,7 @@ export class TaskOrchestrator { cachedResult.terminalOutput ); return { + code: cachedResult.code, task, status, }; @@ -377,7 +384,7 @@ export class TaskOrchestrator { // endregion Batch // region Single Task - private async applyFromCacheOrRunTask( + async applyFromCacheOrRunTask( doNotSkipCache: boolean, task: Task, groupId: number @@ -419,6 +426,7 @@ export class TaskOrchestrator { let results: { task: Task; + code: number; status: TaskStatus; terminalOutput?: string; }[] = doNotSkipCache ? await this.applyCachedResults([task]) : []; @@ -437,11 +445,13 @@ export class TaskOrchestrator { results.push({ task, + code, status: code === 0 ? 'success' : 'failure', terminalOutput, }); } await this.postRunSteps([task], results, doNotSkipCache, { groupId }); + return results[0]; } private async runTask( @@ -617,7 +627,7 @@ export class TaskOrchestrator { } } - private async startContinuousTask(task: Task, groupId: number) { + async startContinuousTask(task: Task, groupId: number) { if (this.runningTasksService.getRunningTasks([task.id]).length) { // task is already running, we need to poll and wait for the running task to finish do { @@ -898,3 +908,28 @@ export class TaskOrchestrator { ); } } + +export function getThreadCount( + options: NxArgs & DefaultTasksRunnerOptions, + taskGraph: TaskGraph +) { + if ( + (options as any)['parallel'] === 'false' || + (options as any)['parallel'] === false + ) { + (options as any)['parallel'] = 1; + } else if ( + (options as any)['parallel'] === 'true' || + (options as any)['parallel'] === true || + (options as any)['parallel'] === undefined || + (options as any)['parallel'] === '' + ) { + (options as any)['parallel'] = Number((options as any)['maxParallel'] || 3); + } + + const maxParallel = + options['parallel'] + + Object.values(taskGraph.tasks).filter((t) => t.continuous).length; + const totalTasks = Object.keys(taskGraph.tasks).length; + return Math.min(maxParallel, totalTasks); +}