diff --git a/e2e/cypress/src/cypress.test.ts b/e2e/cypress/src/cypress.test.ts index 1d29edee45..8d66ed31fe 100644 --- a/e2e/cypress/src/cypress.test.ts +++ b/e2e/cypress/src/cypress.test.ts @@ -81,7 +81,6 @@ describe('env vars', () => { `e2e ${myapp}-e2e --config \\'{\\"env\\":{\\"cliArg\\":\\"i am from the cli args\\"}}\\'` ); expect(run1).toContain('All specs passed!'); - await killPort(4200); // tests should not fail because of a config change updateFile( `apps/${myapp}-e2e/cypress.config.ts`, @@ -114,7 +113,6 @@ export default defineConfig({ `e2e ${myapp}-e2e --config \\'{\\"env\\":{\\"cliArg\\":\\"i am from the cli args\\"}}\\'` ); expect(run2).toContain('All specs passed!'); - await killPort(4200); // make sure project.json env vars also work checkFilesExist(`apps/${myapp}-e2e/src/e2e/env.cy.ts`); @@ -143,8 +141,6 @@ export default defineConfig({ ); const run3 = runCLI(`e2e ${myapp}-e2e`); expect(run3).toContain('All specs passed!'); - - expect(await killPort(4200)).toBeTruthy(); } }, TEN_MINS_MS diff --git a/packages/cypress/package.json b/packages/cypress/package.json index 56cd94466a..118e62bdde 100644 --- a/packages/cypress/package.json +++ b/packages/cypress/package.json @@ -42,6 +42,7 @@ "@phenomnomnominal/tsquery": "~5.0.1", "detect-port": "^1.5.1", "semver": "^7.6.3", + "tree-kill": "1.2.2", "tslib": "^2.3.0" }, "peerDependencies": { diff --git a/packages/cypress/plugins/cypress-preset.ts b/packages/cypress/plugins/cypress-preset.ts index 7ec404862a..c279e1aa89 100644 --- a/packages/cypress/plugins/cypress-preset.ts +++ b/packages/cypress/plugins/cypress-preset.ts @@ -8,6 +8,7 @@ import { dirname, join, relative } from 'path'; import type { InlineConfig } from 'vite'; import vitePreprocessor from '../src/plugins/preprocessor-vite'; import { NX_PLUGIN_OPTIONS } from '../src/utils/constants'; +import * as treeKill from 'tree-kill'; // Importing the cypress type here causes the angular and next unit // tests to fail when transpiling, it seems like the cypress types are @@ -79,7 +80,7 @@ function startWebServer(webServerCommand: string) { windowsHide: false, }); - return () => { + return async () => { if (process.platform === 'win32') { try { execSync('taskkill /pid ' + serverProcess.pid + ' /T /F', { @@ -91,9 +92,14 @@ function startWebServer(webServerCommand: string) { } } } else { - // child.kill() does not work on linux - // process.kill will kill the whole process group on unix - process.kill(-serverProcess.pid, 'SIGKILL'); + return new Promise((res, rej) => { + treeKill(serverProcess.pid, (err) => { + if (err) { + rej(err); + } + res(); + }); + }); } }; } @@ -172,7 +178,7 @@ export function nxE2EPreset( const killWebServer = startWebServer(webServerCommand); on('after:run', () => { - killWebServer(); + return killWebServer(); }); await waitForServer(config.baseUrl, options.webServerConfig); } diff --git a/packages/nx/src/native/index.d.ts b/packages/nx/src/native/index.d.ts index 660732c5c8..63a9a9ee8c 100644 --- a/packages/nx/src/native/index.d.ts +++ b/packages/nx/src/native/index.d.ts @@ -62,6 +62,13 @@ export declare class NxTaskHistory { getEstimatedTaskTimings(targets: Array): Record } +export declare class RunningTasksService { + constructor(db: ExternalObject) + getRunningTasks(ids: Array): Array + addRunningTask(taskId: string): void + removeRunningTask(taskId: string): void +} + export declare class RustPseudoTerminal { constructor() runCommand(command: string, commandDir?: string | undefined | null, jsEnv?: Record | undefined | null, execArgv?: Array | undefined | null, quiet?: boolean | undefined | null, tty?: boolean | undefined | null): ChildProcess diff --git a/packages/nx/src/native/native-bindings.js b/packages/nx/src/native/native-bindings.js index 6db0d71959..76a912791a 100644 --- a/packages/nx/src/native/native-bindings.js +++ b/packages/nx/src/native/native-bindings.js @@ -368,6 +368,7 @@ module.exports.HttpRemoteCache = nativeBinding.HttpRemoteCache module.exports.ImportResult = nativeBinding.ImportResult module.exports.NxCache = nativeBinding.NxCache module.exports.NxTaskHistory = nativeBinding.NxTaskHistory +module.exports.RunningTasksService = nativeBinding.RunningTasksService module.exports.RustPseudoTerminal = nativeBinding.RustPseudoTerminal module.exports.TaskDetails = nativeBinding.TaskDetails module.exports.TaskHasher = nativeBinding.TaskHasher diff --git a/packages/nx/src/native/tasks/mod.rs b/packages/nx/src/native/tasks/mod.rs index a6db3991b0..7150b8eb8f 100644 --- a/packages/nx/src/native/tasks/mod.rs +++ b/packages/nx/src/native/tasks/mod.rs @@ -10,3 +10,5 @@ mod utils; pub mod details; #[cfg(not(target_arch = "wasm32"))] pub mod task_history; +#[cfg(not(target_arch = "wasm32"))] +pub mod running_tasks_service; diff --git a/packages/nx/src/native/tasks/running_tasks_service.rs b/packages/nx/src/native/tasks/running_tasks_service.rs new file mode 100644 index 0000000000..50722d5267 --- /dev/null +++ b/packages/nx/src/native/tasks/running_tasks_service.rs @@ -0,0 +1,167 @@ +use crate::native::db::connection::NxDbConnection; +use crate::native::utils::Normalize; +use hashbrown::HashSet; +use napi::bindgen_prelude::External; +use std::env::args_os; +use std::ffi::OsString; +use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System}; +use tracing::debug; + +#[napi] +struct RunningTasksService { + db: External, + added_tasks: HashSet, +} + +#[napi] +impl RunningTasksService { + #[napi(constructor)] + pub fn new(db: External) -> anyhow::Result { + let s = Self { + db, + added_tasks: Default::default(), + }; + + s.setup()?; + + Ok(s) + } + + #[napi] + pub fn get_running_tasks(&mut self, ids: Vec) -> anyhow::Result> { + let mut results = Vec::::with_capacity(ids.len()); + for id in ids.into_iter() { + if self.is_task_running(&id)? { + results.push(id); + } + } + Ok(results) + } + + fn is_task_running(&self, task_id: &String) -> anyhow::Result { + let mut stmt = self + .db + .prepare("SELECT pid, command, cwd FROM running_tasks WHERE task_id = ?")?; + if let Ok((pid, db_process_command, db_process_cwd)) = stmt.query_row([task_id], |row| { + let pid: u32 = row.get(0)?; + let command: String = row.get(1)?; + let cwd: String = row.get(2)?; + + Ok((pid, command, cwd)) + }) { + debug!("Checking if {} exists", pid); + + let mut sys = System::new(); + sys.refresh_processes_specifics( + ProcessesToUpdate::Some(&[Pid::from(pid as usize)]), + true, + ProcessRefreshKind::everything(), + ); + + match sys.process(sysinfo::Pid::from(pid as usize)) { + Some(process_info) => { + let cmd = process_info.cmd().to_vec(); + let cmd_str = cmd + .iter() + .map(|s| s.to_string_lossy().to_string()) + .collect::>() + .join(" "); + + if let Some(cwd_path) = process_info.cwd() { + let cwd_str = cwd_path.to_normalized_string(); + Ok(cmd_str == db_process_command && cwd_str == db_process_cwd) + } else { + Ok(cmd_str == db_process_command) + } + } + None => Ok(false), + } + } else { + Ok(false) + } + } + + #[napi] + pub fn add_running_task(&mut self, task_id: String) -> anyhow::Result<()> { + let pid = std::process::id(); + let command = args_os().collect::>(); + // Convert command vector to a string representation + let command_str = command + .iter() + .map(|s| s.to_string_lossy().to_string()) + .collect::>() + .join(" "); + + let cwd = std::env::current_dir() + .expect("The current working directory does not exist") + .to_normalized_string(); + let mut stmt = self.db.prepare( + "INSERT OR REPLACE INTO running_tasks (task_id, pid, command, cwd) VALUES (?, ?, ?, ?)", + )?; + stmt.execute([&task_id, &pid.to_string(), &command_str, &cwd])?; + self.added_tasks.insert(task_id); + Ok(()) + } + + #[napi] + pub fn remove_running_task(&self, task_id: String) -> anyhow::Result<()> { + let mut stmt = self + .db + .prepare("DELETE FROM running_tasks WHERE task_id = ?")?; + stmt.execute([task_id])?; + Ok(()) + } + + fn setup(&self) -> anyhow::Result<()> { + self.db.execute_batch( + " + CREATE TABLE IF NOT EXISTS running_tasks ( + task_id TEXT PRIMARY KEY NOT NULL, + pid INTEGER NOT NULL, + command TEXT NOT NULL, + cwd TEXT NOT NULL + ); + ", + )?; + Ok(()) + } +} + +impl Drop for RunningTasksService { + fn drop(&mut self) { + // Remove tasks added by this service. This might happen if process exits because of SIGKILL + for task_id in self.added_tasks.iter() { + self.remove_running_task(task_id.clone()).ok(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env::args_os; + use std::ffi::OsString; + + #[test] + fn test_add_task() { + let pid = std::process::id(); + + let mut sys = System::new(); + sys.refresh_processes_specifics( + ProcessesToUpdate::Some(&[Pid::from(pid as usize)]), + true, + ProcessRefreshKind::everything(), + ); + if let Some(process_info) = sys.process(sysinfo::Pid::from(pid as usize)) { + // Check if the process name contains "nx" or is related to nx + // TODO: check is the process is actually the same process + dbg!(process_info); + dbg!("Process {} is running", pid); + let cmd = process_info.cmd().to_vec(); + let command = args_os().collect::>(); + assert_eq!(cmd, command); + } else { + dbg!("Process {} is not running", pid); + } + } +} diff --git a/packages/nx/src/native/tests/running_tasks_service.spec.ts b/packages/nx/src/native/tests/running_tasks_service.spec.ts new file mode 100644 index 0000000000..7d8033ba17 --- /dev/null +++ b/packages/nx/src/native/tests/running_tasks_service.spec.ts @@ -0,0 +1,42 @@ +import { RunningTasksService, TaskDetails } from '../index'; +import { join } from 'path'; +import { TempFs } from '../../internal-testing-utils/temp-fs'; +import { rmSync } from 'fs'; +import { getDbConnection } from '../../utils/db-connection'; +import { randomBytes } from 'crypto'; + +const dbOutputFolder = 'temp-db-task'; +describe('RunningTasksService', () => { + let runningTasksService: RunningTasksService; + let tempFs: TempFs; + + beforeEach(() => { + tempFs = new TempFs('running-tasks-service'); + + const dbConnection = getDbConnection({ + directory: join(__dirname, dbOutputFolder), + dbName: `temp-db-${randomBytes(4).toString('hex')}`, + }); + runningTasksService = new RunningTasksService(dbConnection); + }); + + afterAll(() => { + rmSync(join(__dirname, dbOutputFolder), { + recursive: true, + force: true, + }); + }); + + it('should record a task as running', () => { + runningTasksService.addRunningTask('app:build'); + expect(runningTasksService.getRunningTasks(['app:build'])).toEqual([ + 'app:build', + ]); + }); + + it('should remove a task from running tasks', () => { + runningTasksService.addRunningTask('app:build'); + runningTasksService.removeRunningTask('app:build'); + expect(runningTasksService.getRunningTasks(['app:build'])).toEqual([]); + }); +}); diff --git a/packages/nx/src/tasks-runner/task-orchestrator.ts b/packages/nx/src/tasks-runner/task-orchestrator.ts index de8fca6ae7..dbbe79e648 100644 --- a/packages/nx/src/tasks-runner/task-orchestrator.ts +++ b/packages/nx/src/tasks-runner/task-orchestrator.ts @@ -32,16 +32,18 @@ import { workspaceRoot } from '../utils/workspace-root'; import { output } from '../utils/output'; import { combineOptionsForExecutor } from '../utils/params'; import { NxJsonConfiguration } from '../config/nx-json'; -import type { TaskDetails } from '../native'; +import { RunningTasksService, type TaskDetails } from '../native'; import { NoopChildProcess } from './running-tasks/noop-child-process'; import { RunningTask } from './running-tasks/running-task'; import { NxArgs } from '../utils/command-line-utils'; +import { getDbConnection } from '../utils/db-connection'; export class TaskOrchestrator { private taskDetails: TaskDetails | null = getTaskDetails(); private cache: DbCache | Cache = getCache(this.options); private forkedProcessTaskRunner = new ForkedProcessTaskRunner(this.options); + private runningTasksService = new RunningTasksService(getDbConnection()); private tasksSchedule = new TasksSchedule( this.projectGraph, this.taskGraph, @@ -428,6 +430,7 @@ export class TaskOrchestrator { ); const { code, terminalOutput } = await childProcess.getResults(); + results.push({ task, status: code === 0 ? 'success' : 'failure', @@ -574,6 +577,15 @@ export class TaskOrchestrator { } private async startContinuousTask(task: Task, groupId: number) { + if (this.runningTasksService.getRunningTasks([task.id]).length) { + // task is already running, we need to poll and wait for the running task to finish + do { + console.log(`Waiting for ${task.id} in another nx process`); + await new Promise((resolve) => setTimeout(resolve, 100)); + } while (this.runningTasksService.getRunningTasks([task.id]).length); + return; + } + const taskSpecificEnv = await this.processedTasks.get(task.id); await this.preRunSteps([task], { groupId }); @@ -613,9 +625,11 @@ export class TaskOrchestrator { temporaryOutputPath, pipeOutput ); + this.runningTasksService.addRunningTask(task.id); this.runningContinuousTasks.set(task.id, childProcess); childProcess.onExit((code) => { + this.runningTasksService.removeRunningTask(task.id); if (!this.cleaningUp) { console.error( `Task "${task.id}" is continuous but exited with code ${code}` @@ -836,6 +850,8 @@ export class TaskOrchestrator { return t.kill(); } catch (e) { console.error(`Unable to terminate ${taskId}\nError:`, e); + } finally { + this.runningTasksService.removeRunningTask(taskId); } }) );