feat(core): enable dte to execute continuous tasks (#29993)

<!-- Please make sure you have read the submission guidelines before
posting an PR -->
<!--
https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr
-->

<!-- Please make sure that your commit message follows our format -->
<!-- Example: `fix(nx): must begin with lowercase` -->

<!-- If this is a particularly complex change or feature addition, you
can request a dedicated Nx release for this pull request branch. Mention
someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they
will confirm if the PR warrants its own release for testing purposes,
and generate it for you if appropriate. -->

## Current Behavior
<!-- This is the behavior we have today -->

Continuous tasks are not supported in DTE and there is no good API for
Agents to utilize

## Expected Behavior
<!-- This is the behavior we should expect with the changes in this PR
-->

There is an API for agents to utilize to run continuous tasks.

## Related Issue(s)
<!-- Please link the issue being fixed so it gets closed when this is
merged. -->

Fixes #
This commit is contained in:
Jason Jean 2025-04-09 21:59:58 -04:00
parent 87badb0486
commit 48a5d1987e
4 changed files with 169 additions and 45 deletions

View File

@ -1,5 +1,5 @@
import { TasksRunner, TaskStatus } from './tasks-runner'; import { TasksRunner, TaskStatus } from './tasks-runner';
import { TaskOrchestrator } from './task-orchestrator'; import { getThreadCount, TaskOrchestrator } from './task-orchestrator';
import { TaskHasher } from '../hasher/task-hasher'; import { TaskHasher } from '../hasher/task-hasher';
import { LifeCycle } from './life-cycle'; import { LifeCycle } from './life-cycle';
import { ProjectGraph } from '../config/project-graph'; import { ProjectGraph } from '../config/project-graph';
@ -121,32 +121,10 @@ export const defaultTasksRunner: TasksRunner<
daemon: DaemonClient; daemon: DaemonClient;
} }
): Promise<{ [id: string]: TaskStatus }> => { ): Promise<{ [id: string]: TaskStatus }> => {
if ( const threadCount = getThreadCount(options, context.taskGraph);
(options as any)['parallel'] === 'false' ||
(options as any)['parallel'] === false
) {
(options as any)['parallel'] = 1;
} else if (
(options as any)['parallel'] === 'true' ||
(options as any)['parallel'] === true ||
(options as any)['parallel'] === undefined ||
(options as any)['parallel'] === ''
) {
(options as any)['parallel'] = Number((options as any)['maxParallel'] || 3);
}
const maxParallel =
options['parallel'] +
Object.values(context.taskGraph.tasks).filter((t) => t.continuous).length;
const totalTasks = Object.values(context.taskGraph.tasks).length;
const threadCount = Math.min(maxParallel, totalTasks);
await options.lifeCycle.startCommand(threadCount); await options.lifeCycle.startCommand(threadCount);
try { try {
return await runAllTasks(options, { return await runAllTasks(options, context);
...context,
threadCount,
});
} finally { } finally {
await options.lifeCycle.endCommand(); await options.lifeCycle.endCommand();
} }
@ -162,7 +140,6 @@ async function runAllTasks(
taskGraph: TaskGraph; taskGraph: TaskGraph;
hasher: TaskHasher; hasher: TaskHasher;
daemon: DaemonClient; daemon: DaemonClient;
threadCount: number;
} }
): Promise<{ [id: string]: TaskStatus }> { ): Promise<{ [id: string]: TaskStatus }> {
const orchestrator = new TaskOrchestrator( const orchestrator = new TaskOrchestrator(
@ -172,7 +149,6 @@ async function runAllTasks(
context.taskGraph, context.taskGraph,
context.nxJson, context.nxJson,
options, options,
context.threadCount,
context.nxArgs?.nxBail, context.nxArgs?.nxBail,
context.daemon, context.daemon,
context.nxArgs?.outputStyle context.nxArgs?.outputStyle

View File

@ -1,14 +1,29 @@
import { readNxJson } from '../config/configuration'; import { readNxJson } from '../config/nx-json';
import { NxArgs } from '../utils/command-line-utils'; import { NxArgs } from '../utils/command-line-utils';
import { createProjectGraphAsync } from '../project-graph/project-graph'; import { createProjectGraphAsync } from '../project-graph/project-graph';
import { Task, TaskGraph } from '../config/task-graph'; import { Task, TaskGraph } from '../config/task-graph';
import { invokeTasksRunner } from './run-command'; import {
constructLifeCycles,
getRunner,
invokeTasksRunner,
} from './run-command';
import { InvokeRunnerTerminalOutputLifeCycle } from './life-cycles/invoke-runner-terminal-output-life-cycle'; import { InvokeRunnerTerminalOutputLifeCycle } from './life-cycles/invoke-runner-terminal-output-life-cycle';
import { performance } from 'perf_hooks'; import { performance } from 'perf_hooks';
import { getOutputs } from './utils'; import { getOutputs } from './utils';
import { loadRootEnvFiles } from '../utils/dotenv'; import { loadRootEnvFiles } from '../utils/dotenv';
import { TaskResult } from './life-cycle'; import { CompositeLifeCycle, LifeCycle, TaskResult } from './life-cycle';
import { TaskOrchestrator } from './task-orchestrator';
import { createTaskHasher } from '../hasher/create-task-hasher';
import type { ProjectGraph } from '../config/project-graph';
import type { NxJsonConfiguration } from '../config/nx-json';
import { daemonClient } from '../daemon/client/client';
import { RunningTask } from './running-tasks/running-task';
import { TaskResultsLifeCycle } from './life-cycles/task-results-life-cycle';
/**
* This function is deprecated. Do not use this
* @deprecated This function is deprecated. Do not use this
*/
export async function initTasksRunner(nxArgs: NxArgs) { export async function initTasksRunner(nxArgs: NxArgs) {
performance.mark('init-local'); performance.mark('init-local');
loadRootEnvFiles(); loadRootEnvFiles();
@ -77,3 +92,101 @@ export async function initTasksRunner(nxArgs: NxArgs) {
}, },
}; };
} }
async function createOrchestrator(
tasks: Task[],
projectGraph: ProjectGraph,
taskGraphForHashing: TaskGraph,
nxJson: NxJsonConfiguration,
lifeCycle: LifeCycle
) {
loadRootEnvFiles();
const invokeRunnerTerminalLifecycle = new InvokeRunnerTerminalOutputLifeCycle(
tasks
);
const taskResultsLifecycle = new TaskResultsLifeCycle();
const compositedLifeCycle: LifeCycle = new CompositeLifeCycle([
...constructLifeCycles(invokeRunnerTerminalLifecycle),
taskResultsLifecycle,
lifeCycle,
]);
const { runnerOptions: options } = getRunner({}, nxJson);
let hasher = createTaskHasher(projectGraph, nxJson, options);
const taskGraph: TaskGraph = {
roots: tasks.map((task) => task.id),
tasks: tasks.reduce((acc, task) => {
acc[task.id] = task;
return acc;
}, {} as any),
dependencies: tasks.reduce((acc, task) => {
acc[task.id] = [];
return acc;
}, {} as any),
continuousDependencies: tasks.reduce((acc, task) => {
acc[task.id] = [];
return acc;
}, {} as any),
};
const orchestrator = new TaskOrchestrator(
hasher,
null,
projectGraph,
taskGraph,
nxJson,
{ ...options, parallel: tasks.length, lifeCycle: compositedLifeCycle },
false,
daemonClient,
undefined,
taskGraphForHashing
);
await orchestrator.init();
await Promise.all(tasks.map((task) => orchestrator.processTask(task.id)));
return orchestrator;
}
export async function runDiscreteTasks(
tasks: Task[],
projectGraph: ProjectGraph,
taskGraphForHashing: TaskGraph,
nxJson: NxJsonConfiguration,
lifeCycle: LifeCycle
) {
const orchestrator = await createOrchestrator(
tasks,
projectGraph,
taskGraphForHashing,
nxJson,
lifeCycle
);
return tasks.map((task, index) =>
orchestrator.applyFromCacheOrRunTask(true, task, index)
);
}
export async function runContinuousTasks(
tasks: Task[],
projectGraph: ProjectGraph,
taskGraphForHashing: TaskGraph,
nxJson: NxJsonConfiguration,
lifeCycle: LifeCycle
) {
const orchestrator = await createOrchestrator(
tasks,
projectGraph,
taskGraphForHashing,
nxJson,
lifeCycle
);
return tasks.reduce((current, task, index) => {
current[task.id] = orchestrator.startContinuousTask(task, index);
return current;
}, {} as Record<string, Promise<RunningTask>>);
}

View File

@ -933,7 +933,7 @@ export async function invokeTasksRunner({
return taskResultsLifecycle.getTaskResults(); return taskResultsLifecycle.getTaskResults();
} }
function constructLifeCycles(lifeCycle: LifeCycle): LifeCycle[] { export function constructLifeCycles(lifeCycle: LifeCycle): LifeCycle[] {
const lifeCycles = [] as LifeCycle[]; const lifeCycles = [] as LifeCycle[];
lifeCycles.push(new StoreRunInformationLifeCycle()); lifeCycles.push(new StoreRunInformationLifeCycle());
lifeCycles.push(lifeCycle); lifeCycles.push(lifeCycle);

View File

@ -88,32 +88,38 @@ export class TaskOrchestrator {
private readonly taskGraph: TaskGraph, private readonly taskGraph: TaskGraph,
private readonly nxJson: NxJsonConfiguration, private readonly nxJson: NxJsonConfiguration,
private readonly options: NxArgs & DefaultTasksRunnerOptions, private readonly options: NxArgs & DefaultTasksRunnerOptions,
private readonly threadCount: number,
private readonly bail: boolean, private readonly bail: boolean,
private readonly daemon: DaemonClient, private readonly daemon: DaemonClient,
private readonly outputStyle: string private readonly outputStyle: string,
private readonly taskGraphForHashing: TaskGraph = taskGraph
) {} ) {}
async run() { async init() {
// Init the ForkedProcessTaskRunner, TasksSchedule, and Cache // Init the ForkedProcessTaskRunner, TasksSchedule, and Cache
await Promise.all([ await Promise.all([
this.forkedProcessTaskRunner.init(), this.forkedProcessTaskRunner.init(),
this.tasksSchedule.init(), this.tasksSchedule.init(),
'init' in this.cache ? this.cache.init() : null, 'init' in this.cache ? this.cache.init() : null,
]); ]);
}
async run() {
await this.init();
// initial scheduling // initial scheduling
await this.tasksSchedule.scheduleNextTasks(); await this.tasksSchedule.scheduleNextTasks();
performance.mark('task-execution:start'); performance.mark('task-execution:start');
const threadCount = getThreadCount(this.options, this.taskGraph);
const threads = []; const threads = [];
process.stdout.setMaxListeners(this.threadCount + defaultMaxListeners); process.stdout.setMaxListeners(threadCount + defaultMaxListeners);
process.stderr.setMaxListeners(this.threadCount + defaultMaxListeners); process.stderr.setMaxListeners(threadCount + defaultMaxListeners);
// initial seeding of the queue // initial seeding of the queue
for (let i = 0; i < this.threadCount; ++i) { for (let i = 0; i < threadCount; ++i) {
threads.push(this.executeNextBatchOfTasksUsingTaskSchedule()); threads.push(this.executeNextBatchOfTasksUsingTaskSchedule());
} }
await Promise.all(threads); await Promise.all(threads);
@ -175,9 +181,7 @@ export class TaskOrchestrator {
} }
// region Processing Scheduled Tasks // region Processing Scheduled Tasks
private async processScheduledTask( async processTask(taskId: string): Promise<NodeJS.ProcessEnv> {
taskId: string
): Promise<NodeJS.ProcessEnv> {
const task = this.taskGraph.tasks[taskId]; const task = this.taskGraph.tasks[taskId];
const taskSpecificEnv = getTaskSpecificEnv(task); const taskSpecificEnv = getTaskSpecificEnv(task);
@ -185,7 +189,7 @@ export class TaskOrchestrator {
await hashTask( await hashTask(
this.hasher, this.hasher,
this.projectGraph, this.projectGraph,
this.taskGraph, this.taskGraphForHashing,
task, task,
taskSpecificEnv, taskSpecificEnv,
this.taskDetails this.taskDetails
@ -204,7 +208,7 @@ export class TaskOrchestrator {
await hashTask( await hashTask(
this.hasher, this.hasher,
this.projectGraph, this.projectGraph,
this.taskGraph, this.taskGraphForHashing,
task, task,
this.batchEnv, this.batchEnv,
this.taskDetails this.taskDetails
@ -225,7 +229,7 @@ export class TaskOrchestrator {
for (const taskId of scheduledTasks) { for (const taskId of scheduledTasks) {
// Task is already handled or being handled // Task is already handled or being handled
if (!this.processedTasks.has(taskId)) { if (!this.processedTasks.has(taskId)) {
this.processedTasks.set(taskId, this.processScheduledTask(taskId)); this.processedTasks.set(taskId, this.processTask(taskId));
} }
} }
} }
@ -236,6 +240,7 @@ export class TaskOrchestrator {
private async applyCachedResults(tasks: Task[]): Promise< private async applyCachedResults(tasks: Task[]): Promise<
{ {
task: Task; task: Task;
code: number;
status: 'local-cache' | 'local-cache-kept-existing' | 'remote-cache'; status: 'local-cache' | 'local-cache-kept-existing' | 'remote-cache';
}[] }[]
> { > {
@ -250,6 +255,7 @@ export class TaskOrchestrator {
private async applyCachedResult(task: Task): Promise<{ private async applyCachedResult(task: Task): Promise<{
task: Task; task: Task;
code: number;
status: 'local-cache' | 'local-cache-kept-existing' | 'remote-cache'; status: 'local-cache' | 'local-cache-kept-existing' | 'remote-cache';
}> { }> {
const cachedResult = await this.cache.get(task); const cachedResult = await this.cache.get(task);
@ -277,6 +283,7 @@ export class TaskOrchestrator {
cachedResult.terminalOutput cachedResult.terminalOutput
); );
return { return {
code: cachedResult.code,
task, task,
status, status,
}; };
@ -377,7 +384,7 @@ export class TaskOrchestrator {
// endregion Batch // endregion Batch
// region Single Task // region Single Task
private async applyFromCacheOrRunTask( async applyFromCacheOrRunTask(
doNotSkipCache: boolean, doNotSkipCache: boolean,
task: Task, task: Task,
groupId: number groupId: number
@ -419,6 +426,7 @@ export class TaskOrchestrator {
let results: { let results: {
task: Task; task: Task;
code: number;
status: TaskStatus; status: TaskStatus;
terminalOutput?: string; terminalOutput?: string;
}[] = doNotSkipCache ? await this.applyCachedResults([task]) : []; }[] = doNotSkipCache ? await this.applyCachedResults([task]) : [];
@ -437,11 +445,13 @@ export class TaskOrchestrator {
results.push({ results.push({
task, task,
code,
status: code === 0 ? 'success' : 'failure', status: code === 0 ? 'success' : 'failure',
terminalOutput, terminalOutput,
}); });
} }
await this.postRunSteps([task], results, doNotSkipCache, { groupId }); await this.postRunSteps([task], results, doNotSkipCache, { groupId });
return results[0];
} }
private async runTask( private async runTask(
@ -617,7 +627,7 @@ export class TaskOrchestrator {
} }
} }
private async startContinuousTask(task: Task, groupId: number) { async startContinuousTask(task: Task, groupId: number) {
if (this.runningTasksService.getRunningTasks([task.id]).length) { if (this.runningTasksService.getRunningTasks([task.id]).length) {
// task is already running, we need to poll and wait for the running task to finish // task is already running, we need to poll and wait for the running task to finish
do { do {
@ -898,3 +908,28 @@ export class TaskOrchestrator {
); );
} }
} }
export function getThreadCount(
options: NxArgs & DefaultTasksRunnerOptions,
taskGraph: TaskGraph
) {
if (
(options as any)['parallel'] === 'false' ||
(options as any)['parallel'] === false
) {
(options as any)['parallel'] = 1;
} else if (
(options as any)['parallel'] === 'true' ||
(options as any)['parallel'] === true ||
(options as any)['parallel'] === undefined ||
(options as any)['parallel'] === ''
) {
(options as any)['parallel'] = Number((options as any)['maxParallel'] || 3);
}
const maxParallel =
options['parallel'] +
Object.values(taskGraph.tasks).filter((t) => t.continuous).length;
const totalTasks = Object.keys(taskGraph.tasks).length;
return Math.min(maxParallel, totalTasks);
}