feat(core): share continuous tasks (#29901)

<!-- Please make sure you have read the submission guidelines before
posting an PR -->
<!--
https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr
-->

<!-- Please make sure that your commit message follows our format -->
<!-- Example: `fix(nx): must begin with lowercase` -->

<!-- If this is a particularly complex change or feature addition, you
can request a dedicated Nx release for this pull request branch. Mention
someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they
will confirm if the PR warrants its own release for testing purposes,
and generate it for you if appropriate. -->

<!-- This is the behavior we have today -->

<!-- This is the behavior we should expect with the changes in this PR
-->

<!-- Please link the issue being fixed so it gets closed when this is
merged. -->

Fixes #

---------

Co-authored-by: Leosvel Pérez Espinosa <leosvel.perez.espinosa@gmail.com>
This commit is contained in:
Jason Jean 2025-04-08 02:38:17 +02:00
parent dee4906f5e
commit c5fb467118
9 changed files with 248 additions and 10 deletions

View File

@ -81,7 +81,6 @@ describe('env vars', () => {
`e2e ${myapp}-e2e --config \\'{\\"env\\":{\\"cliArg\\":\\"i am from the cli args\\"}}\\'`
);
expect(run1).toContain('All specs passed!');
await killPort(4200);
// tests should not fail because of a config change
updateFile(
`apps/${myapp}-e2e/cypress.config.ts`,
@ -114,7 +113,6 @@ export default defineConfig({
`e2e ${myapp}-e2e --config \\'{\\"env\\":{\\"cliArg\\":\\"i am from the cli args\\"}}\\'`
);
expect(run2).toContain('All specs passed!');
await killPort(4200);
// make sure project.json env vars also work
checkFilesExist(`apps/${myapp}-e2e/src/e2e/env.cy.ts`);
@ -143,8 +141,6 @@ export default defineConfig({
);
const run3 = runCLI(`e2e ${myapp}-e2e`);
expect(run3).toContain('All specs passed!');
expect(await killPort(4200)).toBeTruthy();
}
},
TEN_MINS_MS

View File

@ -42,6 +42,7 @@
"@phenomnomnominal/tsquery": "~5.0.1",
"detect-port": "^1.5.1",
"semver": "^7.6.3",
"tree-kill": "1.2.2",
"tslib": "^2.3.0"
},
"peerDependencies": {

View File

@ -8,6 +8,7 @@ import { dirname, join, relative } from 'path';
import type { InlineConfig } from 'vite';
import vitePreprocessor from '../src/plugins/preprocessor-vite';
import { NX_PLUGIN_OPTIONS } from '../src/utils/constants';
import * as treeKill from 'tree-kill';
// Importing the cypress type here causes the angular and next unit
// tests to fail when transpiling, it seems like the cypress types are
@ -79,7 +80,7 @@ function startWebServer(webServerCommand: string) {
windowsHide: false,
});
return () => {
return async () => {
if (process.platform === 'win32') {
try {
execSync('taskkill /pid ' + serverProcess.pid + ' /T /F', {
@ -91,9 +92,14 @@ function startWebServer(webServerCommand: string) {
}
}
} else {
// child.kill() does not work on linux
// process.kill will kill the whole process group on unix
process.kill(-serverProcess.pid, 'SIGKILL');
return new Promise<void>((res, rej) => {
treeKill(serverProcess.pid, (err) => {
if (err) {
rej(err);
}
res();
});
});
}
};
}
@ -172,7 +178,7 @@ export function nxE2EPreset(
const killWebServer = startWebServer(webServerCommand);
on('after:run', () => {
killWebServer();
return killWebServer();
});
await waitForServer(config.baseUrl, options.webServerConfig);
}

View File

@ -62,6 +62,13 @@ export declare class NxTaskHistory {
getEstimatedTaskTimings(targets: Array<TaskTarget>): Record<string, number>
}
export declare class RunningTasksService {
constructor(db: ExternalObject<NxDbConnection>)
getRunningTasks(ids: Array<string>): Array<string>
addRunningTask(taskId: string): void
removeRunningTask(taskId: string): void
}
export declare class RustPseudoTerminal {
constructor()
runCommand(command: string, commandDir?: string | undefined | null, jsEnv?: Record<string, string> | undefined | null, execArgv?: Array<string> | undefined | null, quiet?: boolean | undefined | null, tty?: boolean | undefined | null): ChildProcess

View File

@ -368,6 +368,7 @@ module.exports.HttpRemoteCache = nativeBinding.HttpRemoteCache
module.exports.ImportResult = nativeBinding.ImportResult
module.exports.NxCache = nativeBinding.NxCache
module.exports.NxTaskHistory = nativeBinding.NxTaskHistory
module.exports.RunningTasksService = nativeBinding.RunningTasksService
module.exports.RustPseudoTerminal = nativeBinding.RustPseudoTerminal
module.exports.TaskDetails = nativeBinding.TaskDetails
module.exports.TaskHasher = nativeBinding.TaskHasher

View File

@ -10,3 +10,5 @@ mod utils;
pub mod details;
#[cfg(not(target_arch = "wasm32"))]
pub mod task_history;
#[cfg(not(target_arch = "wasm32"))]
pub mod running_tasks_service;

View File

@ -0,0 +1,167 @@
use crate::native::db::connection::NxDbConnection;
use crate::native::utils::Normalize;
use hashbrown::HashSet;
use napi::bindgen_prelude::External;
use std::env::args_os;
use std::ffi::OsString;
use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
use tracing::debug;
#[napi]
struct RunningTasksService {
db: External<NxDbConnection>,
added_tasks: HashSet<String>,
}
#[napi]
impl RunningTasksService {
#[napi(constructor)]
pub fn new(db: External<NxDbConnection>) -> anyhow::Result<Self> {
let s = Self {
db,
added_tasks: Default::default(),
};
s.setup()?;
Ok(s)
}
#[napi]
pub fn get_running_tasks(&mut self, ids: Vec<String>) -> anyhow::Result<Vec<String>> {
let mut results = Vec::<String>::with_capacity(ids.len());
for id in ids.into_iter() {
if self.is_task_running(&id)? {
results.push(id);
}
}
Ok(results)
}
fn is_task_running(&self, task_id: &String) -> anyhow::Result<bool> {
let mut stmt = self
.db
.prepare("SELECT pid, command, cwd FROM running_tasks WHERE task_id = ?")?;
if let Ok((pid, db_process_command, db_process_cwd)) = stmt.query_row([task_id], |row| {
let pid: u32 = row.get(0)?;
let command: String = row.get(1)?;
let cwd: String = row.get(2)?;
Ok((pid, command, cwd))
}) {
debug!("Checking if {} exists", pid);
let mut sys = System::new();
sys.refresh_processes_specifics(
ProcessesToUpdate::Some(&[Pid::from(pid as usize)]),
true,
ProcessRefreshKind::everything(),
);
match sys.process(sysinfo::Pid::from(pid as usize)) {
Some(process_info) => {
let cmd = process_info.cmd().to_vec();
let cmd_str = cmd
.iter()
.map(|s| s.to_string_lossy().to_string())
.collect::<Vec<_>>()
.join(" ");
if let Some(cwd_path) = process_info.cwd() {
let cwd_str = cwd_path.to_normalized_string();
Ok(cmd_str == db_process_command && cwd_str == db_process_cwd)
} else {
Ok(cmd_str == db_process_command)
}
}
None => Ok(false),
}
} else {
Ok(false)
}
}
#[napi]
pub fn add_running_task(&mut self, task_id: String) -> anyhow::Result<()> {
let pid = std::process::id();
let command = args_os().collect::<Vec<OsString>>();
// Convert command vector to a string representation
let command_str = command
.iter()
.map(|s| s.to_string_lossy().to_string())
.collect::<Vec<_>>()
.join(" ");
let cwd = std::env::current_dir()
.expect("The current working directory does not exist")
.to_normalized_string();
let mut stmt = self.db.prepare(
"INSERT OR REPLACE INTO running_tasks (task_id, pid, command, cwd) VALUES (?, ?, ?, ?)",
)?;
stmt.execute([&task_id, &pid.to_string(), &command_str, &cwd])?;
self.added_tasks.insert(task_id);
Ok(())
}
#[napi]
pub fn remove_running_task(&self, task_id: String) -> anyhow::Result<()> {
let mut stmt = self
.db
.prepare("DELETE FROM running_tasks WHERE task_id = ?")?;
stmt.execute([task_id])?;
Ok(())
}
fn setup(&self) -> anyhow::Result<()> {
self.db.execute_batch(
"
CREATE TABLE IF NOT EXISTS running_tasks (
task_id TEXT PRIMARY KEY NOT NULL,
pid INTEGER NOT NULL,
command TEXT NOT NULL,
cwd TEXT NOT NULL
);
",
)?;
Ok(())
}
}
impl Drop for RunningTasksService {
fn drop(&mut self) {
// Remove tasks added by this service. This might happen if process exits because of SIGKILL
for task_id in self.added_tasks.iter() {
self.remove_running_task(task_id.clone()).ok();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env::args_os;
use std::ffi::OsString;
#[test]
fn test_add_task() {
let pid = std::process::id();
let mut sys = System::new();
sys.refresh_processes_specifics(
ProcessesToUpdate::Some(&[Pid::from(pid as usize)]),
true,
ProcessRefreshKind::everything(),
);
if let Some(process_info) = sys.process(sysinfo::Pid::from(pid as usize)) {
// Check if the process name contains "nx" or is related to nx
// TODO: check is the process is actually the same process
dbg!(process_info);
dbg!("Process {} is running", pid);
let cmd = process_info.cmd().to_vec();
let command = args_os().collect::<Vec<OsString>>();
assert_eq!(cmd, command);
} else {
dbg!("Process {} is not running", pid);
}
}
}

View File

@ -0,0 +1,42 @@
import { RunningTasksService, TaskDetails } from '../index';
import { join } from 'path';
import { TempFs } from '../../internal-testing-utils/temp-fs';
import { rmSync } from 'fs';
import { getDbConnection } from '../../utils/db-connection';
import { randomBytes } from 'crypto';
const dbOutputFolder = 'temp-db-task';
describe('RunningTasksService', () => {
let runningTasksService: RunningTasksService;
let tempFs: TempFs;
beforeEach(() => {
tempFs = new TempFs('running-tasks-service');
const dbConnection = getDbConnection({
directory: join(__dirname, dbOutputFolder),
dbName: `temp-db-${randomBytes(4).toString('hex')}`,
});
runningTasksService = new RunningTasksService(dbConnection);
});
afterAll(() => {
rmSync(join(__dirname, dbOutputFolder), {
recursive: true,
force: true,
});
});
it('should record a task as running', () => {
runningTasksService.addRunningTask('app:build');
expect(runningTasksService.getRunningTasks(['app:build'])).toEqual([
'app:build',
]);
});
it('should remove a task from running tasks', () => {
runningTasksService.addRunningTask('app:build');
runningTasksService.removeRunningTask('app:build');
expect(runningTasksService.getRunningTasks(['app:build'])).toEqual([]);
});
});

View File

@ -32,16 +32,18 @@ import { workspaceRoot } from '../utils/workspace-root';
import { output } from '../utils/output';
import { combineOptionsForExecutor } from '../utils/params';
import { NxJsonConfiguration } from '../config/nx-json';
import type { TaskDetails } from '../native';
import { RunningTasksService, type TaskDetails } from '../native';
import { NoopChildProcess } from './running-tasks/noop-child-process';
import { RunningTask } from './running-tasks/running-task';
import { NxArgs } from '../utils/command-line-utils';
import { getDbConnection } from '../utils/db-connection';
export class TaskOrchestrator {
private taskDetails: TaskDetails | null = getTaskDetails();
private cache: DbCache | Cache = getCache(this.options);
private forkedProcessTaskRunner = new ForkedProcessTaskRunner(this.options);
private runningTasksService = new RunningTasksService(getDbConnection());
private tasksSchedule = new TasksSchedule(
this.projectGraph,
this.taskGraph,
@ -428,6 +430,7 @@ export class TaskOrchestrator {
);
const { code, terminalOutput } = await childProcess.getResults();
results.push({
task,
status: code === 0 ? 'success' : 'failure',
@ -574,6 +577,15 @@ export class TaskOrchestrator {
}
private async startContinuousTask(task: Task, groupId: number) {
if (this.runningTasksService.getRunningTasks([task.id]).length) {
// task is already running, we need to poll and wait for the running task to finish
do {
console.log(`Waiting for ${task.id} in another nx process`);
await new Promise((resolve) => setTimeout(resolve, 100));
} while (this.runningTasksService.getRunningTasks([task.id]).length);
return;
}
const taskSpecificEnv = await this.processedTasks.get(task.id);
await this.preRunSteps([task], { groupId });
@ -613,9 +625,11 @@ export class TaskOrchestrator {
temporaryOutputPath,
pipeOutput
);
this.runningTasksService.addRunningTask(task.id);
this.runningContinuousTasks.set(task.id, childProcess);
childProcess.onExit((code) => {
this.runningTasksService.removeRunningTask(task.id);
if (!this.cleaningUp) {
console.error(
`Task "${task.id}" is continuous but exited with code ${code}`
@ -836,6 +850,8 @@ export class TaskOrchestrator {
return t.kill();
} catch (e) {
console.error(`Unable to terminate ${taskId}\nError:`, e);
} finally {
this.runningTasksService.removeRunningTask(taskId);
}
})
);