fix(core): run discrete tasks using batches if possible (#30991)

<!-- Please make sure you have read the submission guidelines before
posting an PR -->
<!--
https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr
-->

<!-- Please make sure that your commit message follows our format -->
<!-- Example: `fix(nx): must begin with lowercase` -->

<!-- If this is a particularly complex change or feature addition, you
can request a dedicated Nx release for this pull request branch. Mention
someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they
will confirm if the PR warrants its own release for testing purposes,
and generate it for you if appropriate. -->

## Current Behavior
<!-- This is the behavior we have today -->

Discrete tasks are not run with batches.

## Expected Behavior
<!-- This is the behavior we should expect with the changes in this PR
-->

Discrete tasks are run as batches if possible.

## Related Issue(s)
<!-- Please link the issue being fixed so it gets closed when this is
merged. -->

Fixes #
This commit is contained in:
Jason Jean 2025-05-01 23:37:47 -04:00 committed by GitHub
parent 110614da07
commit cd2e35d402
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 57 additions and 29 deletions

View File

@ -157,7 +157,7 @@ async function createOrchestrator(
await orchestrator.init();
orchestrator.processTasks(tasks.map((task) => task.id));
orchestrator.processAllScheduledTasks();
return orchestrator;
}
@ -168,7 +168,7 @@ export async function runDiscreteTasks(
taskGraphForHashing: TaskGraph,
nxJson: NxJsonConfiguration,
lifeCycle: LifeCycle
) {
): Promise<Array<Promise<TaskResult[]>>> {
const orchestrator = await createOrchestrator(
tasks,
projectGraph,
@ -176,9 +176,36 @@ export async function runDiscreteTasks(
nxJson,
lifeCycle
);
return tasks.map((task, index) =>
orchestrator.applyFromCacheOrRunTask(true, task, index)
let groupId = 0;
let nextBatch = orchestrator.nextBatch();
let batchResults: Array<Promise<TaskResult[]>> = [];
/**
* Set of task ids that were part of batches
*/
const batchTasks = new Set<string>();
while (nextBatch) {
for (const task in nextBatch.taskGraph.tasks) {
batchTasks.add(task);
}
batchResults.push(
orchestrator.applyFromCacheOrRunBatch(true, nextBatch, groupId++)
);
nextBatch = orchestrator.nextBatch();
}
const taskResults = tasks
// Filter out tasks which were not part of batches
.filter((task) => !batchTasks.has(task.id))
.map((task) =>
orchestrator
.applyFromCacheOrRunTask(true, task, groupId++)
.then((r) => [r])
);
return [...batchResults, ...taskResults];
}
export async function runContinuousTasks(

View File

@ -25,7 +25,7 @@ import { Cache, DbCache, dbCacheEnabled, getCache } from './cache';
import { DefaultTasksRunnerOptions } from './default-tasks-runner';
import { ForkedProcessTaskRunner } from './forked-process-task-runner';
import { isTuiEnabled } from './is-tui-enabled';
import { TaskMetadata } from './life-cycle';
import { TaskMetadata, TaskResult } from './life-cycle';
import { PseudoTtyProcess } from './pseudo-terminal';
import { NoopChildProcess } from './running-tasks/noop-child-process';
import { RunningTask } from './running-tasks/running-task';
@ -108,7 +108,9 @@ export class TaskOrchestrator {
// Init the ForkedProcessTaskRunner, TasksSchedule, and Cache
await Promise.all([
this.forkedProcessTaskRunner.init(),
this.tasksSchedule.init(),
this.tasksSchedule.init().then(() => {
return this.tasksSchedule.scheduleNextTasks();
}),
'init' in this.cache ? this.cache.init() : null,
]);
}
@ -116,9 +118,6 @@ export class TaskOrchestrator {
async run() {
await this.init();
// initial scheduling
await this.tasksSchedule.scheduleNextTasks();
performance.mark('task-execution:start');
const threadCount = getThreadCount(this.options, this.taskGraph);
@ -160,6 +159,10 @@ export class TaskOrchestrator {
return this.completedTasks;
}
public nextBatch() {
return this.tasksSchedule.nextBatch();
}
private async executeNextBatchOfTasksUsingTaskSchedule() {
// completed all the tasks
if (!this.tasksSchedule.hasTasks() || this.bailed) {
@ -171,7 +174,7 @@ export class TaskOrchestrator {
this.options.skipNxCache === undefined;
this.processAllScheduledTasks();
const batch = this.tasksSchedule.nextBatch();
const batch = this.nextBatch();
if (batch) {
const groupId = this.closeGroup();
@ -203,7 +206,7 @@ export class TaskOrchestrator {
);
}
processTasks(taskIds: string[]) {
private processTasks(taskIds: string[]) {
for (const taskId of taskIds) {
// Task is already handled or being handled
if (!this.processedTasks.has(taskId)) {
@ -251,7 +254,7 @@ export class TaskOrchestrator {
);
}
private processAllScheduledTasks() {
public processAllScheduledTasks() {
const { scheduledTasks, scheduledBatches } =
this.tasksSchedule.getAllScheduledTasks();
@ -264,13 +267,7 @@ export class TaskOrchestrator {
// endregion Processing Scheduled Tasks
// region Applying Cache
private async applyCachedResults(tasks: Task[]): Promise<
{
task: Task;
code: number;
status: 'local-cache' | 'local-cache-kept-existing' | 'remote-cache';
}[]
> {
private async applyCachedResults(tasks: Task[]): Promise<TaskResult[]> {
const cacheableTasks = tasks.filter((t) =>
isCacheableTask(t, this.options)
);
@ -319,11 +316,11 @@ export class TaskOrchestrator {
// endregion Applying Cache
// region Batch
private async applyFromCacheOrRunBatch(
public async applyFromCacheOrRunBatch(
doNotSkipCache: boolean,
batch: Batch,
groupId: number
) {
): Promise<TaskResult[]> {
const applyFromCacheOrRunBatchStart = performance.mark(
'TaskOrchestrator-apply-from-cache-or-run-batch:start'
);
@ -335,11 +332,9 @@ export class TaskOrchestrator {
await this.preRunSteps(tasks, { groupId });
let results: {
task: Task;
status: TaskStatus;
terminalOutput?: string;
}[] = doNotSkipCache ? await this.applyCachedResults(tasks) : [];
let results: TaskResult[] = doNotSkipCache
? await this.applyCachedResults(tasks)
: [];
// Run tasks that were not cached
if (results.length !== taskEntries.length) {
@ -389,9 +384,13 @@ export class TaskOrchestrator {
applyFromCacheOrRunBatchStart.name,
applyFromCacheOrRunBatchEnd.name
);
return results;
}
private async runBatch(batch: Batch, env: NodeJS.ProcessEnv) {
private async runBatch(
batch: Batch,
env: NodeJS.ProcessEnv
): Promise<TaskResult[]> {
const runBatchStart = performance.mark('TaskOrchestrator-run-batch:start');
try {
const batchProcess =
@ -405,6 +404,7 @@ export class TaskOrchestrator {
const batchResultEntries = Object.entries(results);
return batchResultEntries.map(([taskId, result]) => ({
...result,
code: result.success ? 0 : 1,
task: {
...this.taskGraph.tasks[taskId],
startTime: result.startTime,
@ -416,6 +416,7 @@ export class TaskOrchestrator {
} catch (e) {
return batch.taskGraph.roots.map((rootTaskId) => ({
task: this.taskGraph.tasks[rootTaskId],
code: 1,
status: 'failure' as TaskStatus,
}));
} finally {
@ -435,7 +436,7 @@ export class TaskOrchestrator {
doNotSkipCache: boolean,
task: Task,
groupId: number
) {
): Promise<TaskResult> {
// Wait for task to be processed
const taskSpecificEnv = await this.processedTasks.get(task.id);