diff --git a/Cargo.lock b/Cargo.lock index caacd13414..dd6b5bcce2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,9 +194,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.3.3" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" [[package]] name = "bitvec" @@ -289,7 +289,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a68fa787550392a9d58f44c21a3022cfb3ea3e2458b7f85d3b399d0ceeccf409" dependencies = [ "async-trait", - "nix", + "nix 0.27.1", "tokio", "winapi", ] @@ -333,7 +333,7 @@ dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", - "memoffset", + "memoffset 0.8.0", "scopeguard", ] @@ -391,6 +391,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "downcast-rs" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650" + [[package]] name = "dunce" version = "1.0.4" @@ -411,23 +417,12 @@ checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" [[package]] name = "errno" -version = "0.3.1" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" dependencies = [ - "errno-dragonfly", - "libc", - "windows-sys 0.48.0", -] - -[[package]] -name = "errno-dragonfly" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" -dependencies = [ - "cc", "libc", + "windows-sys 0.52.0", ] [[package]] @@ -460,6 +455,17 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +[[package]] +name = "filedescriptor" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7199d965852c3bac31f779ef99cbb4537f80e952e2d6aa0ffeb30cce00f4f46e" +dependencies = [ + "libc", + "thiserror", + "winapi", +] + [[package]] name = "filetime" version = "0.2.23" @@ -667,7 +673,7 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52e0be46f4cf1f8f9e88d0e3eb7b29718aff23889563249f379119bd1ab6910e" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.1", "bstr", "gix-path", "libc", @@ -715,7 +721,7 @@ version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5db19298c5eeea2961e5b3bf190767a2d1f09b8802aeb5f258e42276350aff19" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.1", "bstr", "gix-features", "gix-path", @@ -801,7 +807,7 @@ version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78f6dce0c6683e2219e8169aac4b1c29e89540a8262fef7056b31d80d969408c" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.1", "gix-path", "libc", "windows", @@ -1026,6 +1032,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ioctl-rs" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7970510895cee30b3e9128319f2cefd4bde883a39f38baa279567ba3a7eb97d" +dependencies = [ + "libc", +] + [[package]] name = "is-macro" version = "0.3.0" @@ -1218,6 +1233,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memoffset" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.8.0" @@ -1284,7 +1308,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49ac8112fe5998579b22e29903c7b277fc7f91c7860c0236f35792caf8156e18" dependencies = [ "anyhow", - "bitflags 2.3.3", + "bitflags 2.4.1", "ctor", "napi-derive", "napi-sys", @@ -1351,13 +1375,27 @@ dependencies = [ "smallvec", ] +[[package]] +name = "nix" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "libc", + "memoffset 0.6.5", + "pin-utils", +] + [[package]] name = "nix" version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.1", "cfg-if", "libc", ] @@ -1384,7 +1422,7 @@ version = "6.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.1", "crossbeam-channel", "filetime", "fsevent-sys", @@ -1479,6 +1517,7 @@ dependencies = [ "nom", "once_cell", "parking_lot", + "portable-pty", "rayon", "regex", "rkyv", @@ -1487,6 +1526,7 @@ dependencies = [ "swc_ecma_dep_graph", "swc_ecma_parser", "swc_ecma_visit", + "term_size", "thiserror", "tokio", "tracing", @@ -1591,6 +1631,27 @@ dependencies = [ "syn 2.0.46", ] +[[package]] +name = "portable-pty" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "806ee80c2a03dbe1a9fb9534f8d19e4c0546b790cde8fd1fea9d6390644cb0be" +dependencies = [ + "anyhow", + "bitflags 1.3.2", + "downcast-rs", + "filedescriptor", + "lazy_static", + "libc", + "log", + "nix 0.25.1", + "serial", + "shared_library", + "shell-words", + "winapi", + "winreg", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -1946,6 +2007,48 @@ dependencies = [ "syn 2.0.46", ] +[[package]] +name = "serial" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1237a96570fc377c13baa1b88c7589ab66edced652e43ffb17088f003db3e86" +dependencies = [ + "serial-core", + "serial-unix", + "serial-windows", +] + +[[package]] +name = "serial-core" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f46209b345401737ae2125fe5b19a77acce90cd53e1658cda928e4fe9a64581" +dependencies = [ + "libc", +] + +[[package]] +name = "serial-unix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f03fbca4c9d866e24a459cbca71283f545a37f8e3e002ad8c70593871453cab7" +dependencies = [ + "ioctl-rs", + "libc", + "serial-core", + "termios", +] + +[[package]] +name = "serial-windows" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15c6d3b776267a75d31bbdfd5d36c0ca051251caafc285827052bc53bcdc8162" +dependencies = [ + "libc", + "serial-core", +] + [[package]] name = "sha1_smol" version = "1.0.0" @@ -1961,6 +2064,22 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shared_library" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a9e7e0f2bfae24d8a5b5a66c5b257a83c7412304311512a0c054cd5e619da11" +dependencies = [ + "lazy_static", + "libc", +] + +[[package]] +name = "shell-words" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -2119,7 +2238,7 @@ version = "0.107.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5693558188efdd5b664e517b69ba8056a7f64c214ca8cd034e3ae8314566b866" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.1", "is-macro", "num-bigint", "scoped-tls", @@ -2264,6 +2383,25 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "term_size" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e4129646ca0ed8f45d09b929036bafad5377103edd06e50bf574b353d2b08d9" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "termios" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5d9cf598a6d7ce700a4e6a9199da127e6819a61e64b68609683cc9a01b5683a" +dependencies = [ + "libc", +] + [[package]] name = "termtree" version = "0.4.0" @@ -2565,7 +2703,7 @@ dependencies = [ "futures", "ignore-files 1.3.2", "miette", - "nix", + "nix 0.27.1", "normalize-path", "notify", "once_cell", @@ -2584,7 +2722,7 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fa905a7f327bfdda78b9c06831d3180a419b7b722bd1ef779ac13ff2ab69df0" dependencies = [ - "nix", + "nix 0.27.1", "notify", "watchexec-signals", ] @@ -2611,7 +2749,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af0a778522cf0fc2fa8a8f1380e32893208cb2e7fd33e64de8bd81a00a2a7838" dependencies = [ "miette", - "nix", + "nix 0.27.1", "thiserror", ] @@ -2623,7 +2761,7 @@ checksum = "6214815382a9cadf1f0e521e3c28ae4e02541b96622d0e78053f03b730a1437f" dependencies = [ "command-group", "futures", - "nix", + "nix 0.27.1", "tokio", "tracing", "watchexec-events", @@ -2887,6 +3025,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/e2e/eslint/src/linter.test.ts b/e2e/eslint/src/linter.test.ts index ddf293a378..c6a5b664a6 100644 --- a/e2e/eslint/src/linter.test.ts +++ b/e2e/eslint/src/linter.test.ts @@ -583,7 +583,7 @@ describe('Linter', () => { const outFlat = runCLI(`affected -t lint`, { silenceError: true, }); - expect(outFlat).toContain('All files pass linting'); + expect(outFlat).toContain('ran target lint'); }, 1000000); it('should convert standalone to flat config', () => { @@ -616,7 +616,7 @@ describe('Linter', () => { const outFlat = runCLI(`affected -t lint`, { silenceError: true, }); - expect(outFlat).toContain('All files pass linting'); + expect(outFlat).toContain('ran target lint'); }, 1000000); }); diff --git a/packages/nx/Cargo.toml b/packages/nx/Cargo.toml index 86dcb1da38..bfaee51f85 100644 --- a/packages/nx/Cargo.toml +++ b/packages/nx/Cargo.toml @@ -17,6 +17,7 @@ ignore-files = "2.0.0" itertools = "0.10.5" once_cell = "1.18.0" parking_lot = { version = "0.12.1", features = ["send_guard"] } +portable-pty = "0.8.1" napi = { version = '2.12.6', default-features = false, features = [ 'anyhow', 'napi4', @@ -41,6 +42,7 @@ swc_common = "0.31.16" swc_ecma_parser = { version = "0.137.1", features = ["typescript"] } swc_ecma_visit = "0.93.0" swc_ecma_ast = "0.107.0" +term_size = "0.3.2" [lib] crate-type = ['cdylib'] diff --git a/packages/nx/src/daemon/client/client.ts b/packages/nx/src/daemon/client/client.ts index 19bfa8d534..59c43d3268 100644 --- a/packages/nx/src/daemon/client/client.ts +++ b/packages/nx/src/daemon/client/client.ts @@ -20,7 +20,7 @@ import { NxJsonConfiguration } from '../../config/nx-json'; import { readNxJson } from '../../config/configuration'; import { PromisedBasedQueue } from '../../utils/promised-based-queue'; import { hasNxJson } from '../../config/nx-json'; -import { Message, SocketMessenger } from './socket-messenger'; +import { Message, DaemonSocketMessenger } from './daemon-socket-messenger'; import { safelyCleanUpExistingProcess } from '../cache'; import { Hash } from '../../hasher/task-hasher'; import { Task, TaskGraph } from '../../config/task-graph'; @@ -50,7 +50,7 @@ export class DaemonClient { } private queue: PromisedBasedQueue; - private socketMessenger: SocketMessenger; + private socketMessenger: DaemonSocketMessenger; private currentMessage; private currentResolve; @@ -172,10 +172,12 @@ export class DaemonClient { ) => void ): Promise { await this.getProjectGraphAndSourceMaps(); - let messenger: SocketMessenger | undefined; + let messenger: DaemonSocketMessenger | undefined; await this.queue.sendToQueue(() => { - messenger = new SocketMessenger(connect(FULL_OS_SOCKET_PATH)).listen( + messenger = new DaemonSocketMessenger( + connect(FULL_OS_SOCKET_PATH) + ).listen( (message) => { try { const parsedMessage = JSON.parse(message); @@ -248,7 +250,7 @@ export class DaemonClient { } private setUpConnection() { - this.socketMessenger = new SocketMessenger( + this.socketMessenger = new DaemonSocketMessenger( connect(FULL_OS_SOCKET_PATH) ).listen( (message) => this.handleMessage(message), diff --git a/packages/nx/src/daemon/client/socket-messenger.ts b/packages/nx/src/daemon/client/daemon-socket-messenger.ts similarity index 96% rename from packages/nx/src/daemon/client/socket-messenger.ts rename to packages/nx/src/daemon/client/daemon-socket-messenger.ts index 22acef5c4e..34c00ff12e 100644 --- a/packages/nx/src/daemon/client/socket-messenger.ts +++ b/packages/nx/src/daemon/client/daemon-socket-messenger.ts @@ -8,7 +8,7 @@ export interface Message extends Record { data?: any; } -export class SocketMessenger { +export class DaemonSocketMessenger { constructor(private socket: Socket) {} async sendMessage(messageToDaemon: Message) { diff --git a/packages/nx/src/daemon/socket-utils.ts b/packages/nx/src/daemon/socket-utils.ts index 0ceb5b1084..49bd03903e 100644 --- a/packages/nx/src/daemon/socket-utils.ts +++ b/packages/nx/src/daemon/socket-utils.ts @@ -1,7 +1,7 @@ import { unlinkSync } from 'fs'; import { platform } from 'os'; -import { resolve } from 'path'; -import { DAEMON_SOCKET_PATH } from './tmp-dir'; +import { join, resolve } from 'path'; +import { DAEMON_SOCKET_PATH, socketDir } from './tmp-dir'; export const isWindows = platform() === 'win32'; @@ -15,6 +15,11 @@ export const FULL_OS_SOCKET_PATH = isWindows ? '\\\\.\\pipe\\nx\\' + resolve(DAEMON_SOCKET_PATH) : resolve(DAEMON_SOCKET_PATH); +export const FORKED_PROCESS_OS_SOCKET_PATH = (id: string) => { + let path = resolve(join(socketDir, 'fp' + id + '.sock')); + return isWindows ? '\\\\.\\pipe\\nx\\' + resolve(path) : resolve(path); +}; + export function killSocketOrPath(): void { try { unlinkSync(FULL_OS_SOCKET_PATH); diff --git a/packages/nx/src/daemon/tmp-dir.ts b/packages/nx/src/daemon/tmp-dir.ts index 96fa74215c..6904f83233 100644 --- a/packages/nx/src/daemon/tmp-dir.ts +++ b/packages/nx/src/daemon/tmp-dir.ts @@ -21,7 +21,7 @@ export const DAEMON_OUTPUT_LOG_FILE = join( 'daemon.log' ); -const socketDir = process.env.NX_DAEMON_SOCKET_DIR || createSocketDir(); +export const socketDir = process.env.NX_DAEMON_SOCKET_DIR || createSocketDir(); export const DAEMON_SOCKET_PATH = join( socketDir, diff --git a/packages/nx/src/executors/run-commands/run-commands.impl.ts b/packages/nx/src/executors/run-commands/run-commands.impl.ts index 355001ac11..3c076ead81 100644 --- a/packages/nx/src/executors/run-commands/run-commands.impl.ts +++ b/packages/nx/src/executors/run-commands/run-commands.impl.ts @@ -4,6 +4,7 @@ import * as yargsParser from 'yargs-parser'; import { env as appendLocalEnv } from 'npm-run-path'; import { ExecutorContext } from '../../config/misc-interfaces'; import * as chalk from 'chalk'; +import { runCommand } from '../../native'; export const LARGE_BUFFER = 1024 * 1000000; @@ -121,7 +122,8 @@ async function runInParallel( options.readyWhen, options.color, calculateCwd(options.cwd, context), - options.env ?? {} + options.env ?? {}, + true ).then((result) => ({ result, command: c.command, @@ -187,7 +189,8 @@ async function runSerially( undefined, options.color, calculateCwd(options.cwd, context), - options.env ?? {} + options.env ?? {}, + false ); if (!success) { process.stderr.write( @@ -200,7 +203,7 @@ async function runSerially( return true; } -function createProcess( +async function createProcess( commandConfig: { command: string; color?: string; @@ -210,12 +213,50 @@ function createProcess( readyWhen: string, color: boolean, cwd: string, - env: Record + env: Record, + isParallel: boolean +): Promise { + env = processEnv(color, cwd, env); + // The rust runCommand is always a tty, so it will not look nice in parallel and if we need prefixes + // currently does not work properly in windows + if ( + process.env.NX_NATIVE_COMMAND_RUNNER !== 'false' && + process.stdout.isTTY && + !commandConfig.prefix && + !isParallel + ) { + const cp = runCommand(commandConfig.command, cwd, env); + + return new Promise((res) => { + cp.onOutput((output) => { + if (readyWhen && output.indexOf(readyWhen) > -1) { + res(true); + } + }); + + cp.onExit((code) => res(code === 0)); + }); + } + + return nodeProcess(commandConfig, color, cwd, env, readyWhen); +} + +function nodeProcess( + commandConfig: { + command: string; + color?: string; + bgColor?: string; + prefix?: string; + }, + color: boolean, + cwd: string, + env: Record, + readyWhen: string ): Promise { return new Promise((res) => { const childProcess = exec(commandConfig.command, { maxBuffer: LARGE_BUFFER, - env: processEnv(color, cwd, env), + env, cwd, }); /** diff --git a/packages/nx/src/native/command.rs b/packages/nx/src/native/command.rs new file mode 100644 index 0000000000..8e3c167b11 --- /dev/null +++ b/packages/nx/src/native/command.rs @@ -0,0 +1,212 @@ +use std::{ + collections::HashMap, + io::{BufReader, Read, Write}, +}; + +use anyhow::anyhow; +use crossbeam_channel::{bounded, unbounded, Receiver}; +use napi::threadsafe_function::ErrorStrategy::Fatal; +use napi::threadsafe_function::ThreadsafeFunction; +use napi::threadsafe_function::ThreadsafeFunctionCallMode::NonBlocking; +use napi::{Env, JsFunction}; +use portable_pty::{ChildKiller, CommandBuilder, NativePtySystem, PtySize, PtySystem}; + +fn command_builder() -> CommandBuilder { + if cfg!(target_os = "windows") { + let comspec = std::env::var("COMSPEC"); + let shell = comspec + .as_ref() + .map(|v| v.as_str()) + .unwrap_or_else(|_| "cmd.exe"); + let mut command = CommandBuilder::new(shell); + command.arg("/C"); + + command + } else { + let mut command = CommandBuilder::new("sh"); + command.arg("-c"); + command + } +} + +pub enum ChildProcessMessage { + Kill, +} + +#[napi] +pub struct ChildProcess { + process_killer: Box, + message_receiver: Receiver, + wait_receiver: Receiver, +} +#[napi] +impl ChildProcess { + pub fn new( + process_killer: Box, + message_receiver: Receiver, + exit_receiver: Receiver, + ) -> Self { + Self { + process_killer, + message_receiver, + wait_receiver: exit_receiver, + } + } + + #[napi] + pub fn kill(&mut self) -> anyhow::Result<()> { + self.process_killer.kill().map_err(anyhow::Error::from)?; + Ok(()) + } + + #[napi] + pub fn on_exit( + &mut self, + #[napi(ts_arg_type = "(code: number) => void")] callback: JsFunction, + ) -> napi::Result<()> { + let wait = self.wait_receiver.clone(); + let callback_tsfn: ThreadsafeFunction = + callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?; + + std::thread::spawn(move || { + // we will only get one exit_code here, so we dont need to do a while loop + if let Ok(exit_code) = wait.recv() { + callback_tsfn.call(exit_code, NonBlocking); + } + }); + + Ok(()) + } + + #[napi] + pub fn on_output( + &mut self, + env: Env, + #[napi(ts_arg_type = "(message: string) => void")] callback: JsFunction, + ) -> napi::Result<()> { + let rx = self.message_receiver.clone(); + + let mut callback_tsfn: ThreadsafeFunction = + callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?; + + callback_tsfn.unref(&env)?; + + std::thread::spawn(move || { + while let Ok(content) = rx.recv() { + callback_tsfn.call(content, NonBlocking); + } + }); + + Ok(()) + } +} + +fn get_directory(command_dir: Option) -> anyhow::Result { + if let Some(command_dir) = command_dir { + Ok(command_dir) + } else { + std::env::current_dir() + .map(|v| v.to_string_lossy().to_string()) + .map_err(|_| { + anyhow!("failed to get current directory, please specify command_dir explicitly") + }) + } +} + +#[napi] +pub fn run_command( + command: String, + command_dir: Option, + js_env: Option>, + quiet: Option, +) -> napi::Result { + let command_dir = get_directory(command_dir)?; + + let quiet = quiet.unwrap_or(false); + + let pty_system = NativePtySystem::default(); + + let (w, h) = term_size::dimensions().unwrap_or((80, 24)); + let pair = pty_system.openpty(PtySize { + rows: h as u16, + cols: w as u16, + pixel_width: 0, + pixel_height: 0, + })?; + + let mut cmd = command_builder(); + cmd.arg(command.as_str()); + cmd.cwd(command_dir); + + if let Some(js_env) = js_env { + for (key, value) in js_env { + cmd.env(key, value); + } + } + + let (message_tx, message_rx) = unbounded(); + + let reader = pair.master.try_clone_reader()?; + let mut stdout = std::io::stdout(); + std::thread::spawn(move || { + let mut reader = BufReader::new(reader); + let mut buffer = [0; 8 * 1024]; + + let mut strip_clear_code = cfg!(target_os = "windows"); + + while let Ok(n) = reader.read(&mut buffer) { + if n == 0 { + break; + } + + let mut content = String::from_utf8_lossy(&buffer[..n]).to_string(); + if strip_clear_code { + strip_clear_code = false; + // remove clear screen + content = content.replacen("\x1B[2J", "", 1); + // remove cursor position 1,1 + content = content.replacen("\x1B[H", "", 1); + } + message_tx.send(content.to_string()).ok(); + if !quiet { + stdout.write_all(content.as_bytes()).ok(); + stdout.flush().ok(); + } + } + }); + + let mut child = pair.slave.spawn_command(cmd)?; + // Release any handles owned by the slave + // we don't need it now that we've spawned the child. + drop(pair.slave); + + let process_killer = child.clone_killer(); + let (exit_tx, exit_rx) = bounded(1); + std::thread::spawn(move || { + let exit = child.wait().unwrap(); + // make sure that master is only dropped after we wait on the child. Otherwise windows does not like it + drop(pair.master); + exit_tx.send(exit.exit_code()).ok(); + }); + + Ok(ChildProcess::new(process_killer, message_rx, exit_rx)) +} + +/// This allows us to run a pseudoterminal with a fake node ipc channel +/// this makes it possible to be backwards compatible with the old implementation +#[napi] +pub fn nx_fork( + id: String, + fork_script: String, + psuedo_ipc_path: String, + command_dir: Option, + js_env: Option>, + quiet: bool, +) -> napi::Result { + run_command( + format!("node {} {} {}", fork_script, psuedo_ipc_path, id), + command_dir, + js_env, + Some(quiet), + ) +} diff --git a/packages/nx/src/native/index.d.ts b/packages/nx/src/native/index.d.ts index 3bf5ac628f..d9abbcf57d 100644 --- a/packages/nx/src/native/index.d.ts +++ b/packages/nx/src/native/index.d.ts @@ -21,6 +21,12 @@ export function expandOutputs(directory: string, entries: Array): Array< export function getFilesForOutputs(directory: string, entries: Array): Array export function remove(src: string): void export function copy(src: string, dest: string): void +export function runCommand(command: string, commandDir?: string | undefined | null, jsEnv?: Record | undefined | null, quiet?: boolean | undefined | null): ChildProcess +/** + * This allows us to run a pseudoterminal with a fake node ipc channel + * this makes it possible to be backwards compatible with the old implementation + */ +export function nxFork(id: string, forkScript: string, psuedoIpcPath: string, commandDir: string | undefined | null, jsEnv: Record | undefined | null, quiet: boolean): ChildProcess export function hashArray(input: Array): string export function hashFile(file: string): string | null export function findImports(projectFileMap: Record>): Array @@ -140,6 +146,11 @@ export interface FileMap { nonProjectFiles: Array } export function testOnlyTransferFileMap(projectFiles: Record>, nonProjectFiles: Array): NxWorkspaceFilesExternals +export class ChildProcess { + kill(): void + onExit(callback: (code: number) => void): void + onOutput(callback: (message: string) => void): void +} export class ImportResult { file: string sourceProject: string diff --git a/packages/nx/src/native/index.js b/packages/nx/src/native/index.js index d4ed2c8803..d1e70f2555 100644 --- a/packages/nx/src/native/index.js +++ b/packages/nx/src/native/index.js @@ -246,12 +246,15 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } -const { expandOutputs, getFilesForOutputs, remove, copy, hashArray, hashFile, ImportResult, findImports, transferProjectGraph, HashPlanner, TaskHasher, EventType, Watcher, WorkspaceContext, WorkspaceErrors, testOnlyTransferFileMap } = nativeBinding +const { expandOutputs, getFilesForOutputs, remove, copy, ChildProcess, runCommand, nxFork, hashArray, hashFile, ImportResult, findImports, transferProjectGraph, HashPlanner, TaskHasher, EventType, Watcher, WorkspaceContext, WorkspaceErrors, testOnlyTransferFileMap } = nativeBinding module.exports.expandOutputs = expandOutputs module.exports.getFilesForOutputs = getFilesForOutputs module.exports.remove = remove module.exports.copy = copy +module.exports.ChildProcess = ChildProcess +module.exports.runCommand = runCommand +module.exports.nxFork = nxFork module.exports.hashArray = hashArray module.exports.hashFile = hashFile module.exports.ImportResult = ImportResult diff --git a/packages/nx/src/native/mod.rs b/packages/nx/src/native/mod.rs index 685119bfb3..2b1fe75593 100644 --- a/packages/nx/src/native/mod.rs +++ b/packages/nx/src/native/mod.rs @@ -1,4 +1,5 @@ pub mod cache; +pub mod command; pub mod glob; pub mod hasher; mod logger; diff --git a/packages/nx/src/native/tests/command.spec.ts b/packages/nx/src/native/tests/command.spec.ts new file mode 100644 index 0000000000..800f86c58b --- /dev/null +++ b/packages/nx/src/native/tests/command.spec.ts @@ -0,0 +1,49 @@ +import { PseudoTtyProcess } from '../../utils/child-process'; +import { runCommand } from '../index'; + +describe('runCommand', () => { + it('should run command', async () => { + const childProcess = runCommand('echo "hello world"', process.cwd()); + expect(() => { + childProcess.onExit((exitCode) => expect(exitCode).toEqual(0)); + }); + }); + it('should kill a running command', () => { + const childProcess = new PseudoTtyProcess( + runCommand( + 'sleep 3 && echo "hello world" > file.txt', + process.cwd() + ) + ); + childProcess.onExit((exit_code) => { + expect(exit_code).not.toEqual(0); + }); + childProcess.kill(); + expect(childProcess.isAlive).toEqual(false); + + }, 1000); + + it('should subscribe to output', (done) => { + const childProcess = runCommand('echo "hello world"', process.cwd()); + + childProcess.onOutput((output) => { + expect(output.trim()).toEqual('hello world'); + }); + + childProcess.onExit(() => { + done(); + }); + }); + + it('should be tty', (done) => { + const childProcess = runCommand('node -p "process.stdout.isTTY"'); + childProcess.onOutput((out) => { + let output = JSON.stringify(out.trim()); + // check to make sure that we have ansi sequence characters only available in tty terminals + expect(output).toMatchInlineSnapshot(`""\\u001b[33mtrue\\u001b[39m""`); + }); + childProcess.onExit((_) => { + done(); + }); + }); +}); diff --git a/packages/nx/src/tasks-runner/fork.ts b/packages/nx/src/tasks-runner/fork.ts new file mode 100644 index 0000000000..e1740c3379 --- /dev/null +++ b/packages/nx/src/tasks-runner/fork.ts @@ -0,0 +1,29 @@ +import { fork, Serializable } from 'child_process'; +import { join } from 'path'; +import { PsuedoIPCClient } from './psuedo-ipc'; + +const psuedoIPCPath = process.argv[2]; +const forkId = process.argv[3]; + +const script = join(__dirname, '../../bin/run-executor.js'); + +const childProcess = fork(script, { + stdio: ['inherit', 'inherit', 'inherit', 'ipc'], +}); + +const psuedoIPC = new PsuedoIPCClient(psuedoIPCPath); + +psuedoIPC.onMessageFromParent(forkId, (message) => { + childProcess.send(message); +}); + +psuedoIPC.notifyChildIsReady(forkId); + +process.on('message', (message: Serializable) => { + psuedoIPC.sendMessageToParent(message); +}); + +childProcess.on('exit', (code) => { + psuedoIPC.close(); + process.exit(code); +}); diff --git a/packages/nx/src/tasks-runner/forked-process-task-runner.ts b/packages/nx/src/tasks-runner/forked-process-task-runner.ts index c7db7e7563..2f03fa2b78 100644 --- a/packages/nx/src/tasks-runner/forked-process-task-runner.ts +++ b/packages/nx/src/tasks-runner/forked-process-task-runner.ts @@ -14,7 +14,13 @@ import { } from './batch/batch-messages'; import { stripIndents } from '../utils/strip-indents'; import { Task, TaskGraph } from '../config/task-graph'; -import { Transform } from 'stream'; +import { Readable, Transform } from 'stream'; +import { ChildProcess as NativeChildProcess, nxFork } from '../native'; +import { PsuedoIPCServer } from './psuedo-ipc'; +import { FORKED_PROCESS_OS_SOCKET_PATH } from '../daemon/socket-utils'; +import { PseudoTtyProcess } from '../utils/child-process'; + +const forkScript = join(__dirname, './fork.js'); const workerPath = join(__dirname, './batch/run-batch.js'); @@ -22,9 +28,16 @@ export class ForkedProcessTaskRunner { cliPath = getCliPath(); private readonly verbose = process.env.NX_VERBOSE_LOGGING === 'true'; - private processes = new Set(); + private processes = new Set(); - constructor(private readonly options: DefaultTasksRunnerOptions) { + private psuedoIPCPath = FORKED_PROCESS_OS_SOCKET_PATH(process.pid.toString()); + + private psuedoIPC = new PsuedoIPCServer(this.psuedoIPCPath); + + constructor(private readonly options: DefaultTasksRunnerOptions) {} + + async init() { + await this.psuedoIPC.init(); this.setupProcessEventListeners(); } @@ -107,7 +120,155 @@ export class ForkedProcessTaskRunner { }); } - public forkProcessPipeOutputCapture( + public async forkProcessLegacy( + task: Task, + { + temporaryOutputPath, + streamOutput, + pipeOutput, + taskGraph, + env, + }: { + temporaryOutputPath: string; + streamOutput: boolean; + pipeOutput: boolean; + taskGraph: TaskGraph; + env: NodeJS.ProcessEnv; + } + ): Promise<{ code: number; terminalOutput: string }> { + return pipeOutput + ? await this.forkProcessPipeOutputCapture(task, { + temporaryOutputPath, + streamOutput, + taskGraph, + env, + }) + : await this.forkProcessDirectOutputCapture(task, { + temporaryOutputPath, + streamOutput, + taskGraph, + env, + }); + } + + public async forkProcess( + task: Task, + { + temporaryOutputPath, + streamOutput, + pipeOutput, + taskGraph, + env, + }: { + temporaryOutputPath: string; + streamOutput: boolean; + pipeOutput: boolean; + taskGraph: TaskGraph; + env: NodeJS.ProcessEnv; + } + ): Promise<{ code: number; terminalOutput: string }> { + const shouldPrefix = + streamOutput && process.env.NX_PREFIX_OUTPUT === 'true'; + + // streamOutput would be false if we are running multiple targets + // there's no point in running the commands in a pty if we are not streaming the output + if (!streamOutput || shouldPrefix || !process.stdout.isTTY) { + return this.forkProcessWithPrefixAndNotTTY(task, { + temporaryOutputPath, + streamOutput, + taskGraph, + env, + }); + } else { + return this.forkProcessWithPsuedoTerminal(task, { + temporaryOutputPath, + streamOutput, + taskGraph, + env, + }); + } + } + + private async forkProcessWithPsuedoTerminal( + task: Task, + { + temporaryOutputPath, + streamOutput, + taskGraph, + env, + }: { + temporaryOutputPath: string; + streamOutput: boolean; + taskGraph: TaskGraph; + env: NodeJS.ProcessEnv; + } + ): Promise<{ code: number; terminalOutput: string }> { + const args = getPrintableCommandArgsForTask(task); + if (streamOutput) { + output.logCommand(args.join(' ')); + output.addNewline(); + } + + const childId = task.id; + const p = new PseudoTtyProcess( + nxFork( + childId, + forkScript, + this.psuedoIPCPath, + process.cwd(), + env, + !streamOutput + ) + ); + + await this.psuedoIPC.waitForChildReady(childId); + + this.psuedoIPC.sendMessageToChild(childId, { + targetDescription: task.target, + overrides: task.overrides, + taskGraph, + isVerbose: this.verbose, + }); + this.processes.add(p); + + let terminalOutput = ''; + p.onOutput((msg) => { + terminalOutput += msg; + }); + + return new Promise((res) => { + p.onExit((code) => { + res({ + code, + terminalOutput, + }); + }); + }); + } + + private forkProcessPipeOutputCapture( + task: Task, + { + streamOutput, + temporaryOutputPath, + taskGraph, + env, + }: { + streamOutput: boolean; + temporaryOutputPath: string; + taskGraph: TaskGraph; + env: NodeJS.ProcessEnv; + } + ) { + return this.forkProcessWithPrefixAndNotTTY(task, { + streamOutput, + temporaryOutputPath, + taskGraph, + env, + }); + } + + private forkProcessWithPrefixAndNotTTY( task: Task, { streamOutput, @@ -203,7 +364,7 @@ export class ForkedProcessTaskRunner { }); } - public forkProcessDirectOutputCapture( + private forkProcessDirectOutputCapture( task: Task, { streamOutput, @@ -296,10 +457,17 @@ export class ForkedProcessTaskRunner { } private setupProcessEventListeners() { + this.psuedoIPC.onMessageFromChildren((message: Serializable) => { + process.send(message); + }); + // When the nx process gets a message, it will be sent into the task's process process.on('message', (message: Serializable) => { + // this.publisher.publish(message.toString()); + this.psuedoIPC.sendMessageToChildren(message); + this.processes.forEach((p) => { - if (p.connected) { + if ('connected' in p && p.connected) { p.send(message); } }); @@ -308,14 +476,14 @@ export class ForkedProcessTaskRunner { // Terminate any task processes on exit process.on('exit', () => { this.processes.forEach((p) => { - if (p.connected) { + if ('connected' in p ? p.connected : p.isAlive) { p.kill(); } }); }); process.on('SIGINT', () => { this.processes.forEach((p) => { - if (p.connected) { + if ('connected' in p ? p.connected : p.isAlive) { p.kill('SIGTERM'); } }); @@ -324,7 +492,7 @@ export class ForkedProcessTaskRunner { }); process.on('SIGTERM', () => { this.processes.forEach((p) => { - if (p.connected) { + if ('connected' in p ? p.connected : p.isAlive) { p.kill('SIGTERM'); } }); @@ -333,7 +501,7 @@ export class ForkedProcessTaskRunner { }); process.on('SIGHUP', () => { this.processes.forEach((p) => { - if (p.connected) { + if ('connected' in p ? p.connected : p.isAlive) { p.kill('SIGTERM'); } }); @@ -341,6 +509,10 @@ export class ForkedProcessTaskRunner { // will store results to the cache and will terminate this process }); } + + destroy() { + this.psuedoIPC.close(); + } } const colors = [ diff --git a/packages/nx/src/tasks-runner/psuedo-ipc.ts b/packages/nx/src/tasks-runner/psuedo-ipc.ts new file mode 100644 index 0000000000..dbb7f0e876 --- /dev/null +++ b/packages/nx/src/tasks-runner/psuedo-ipc.ts @@ -0,0 +1,182 @@ +/** + * Node IPC is specific to Node, but when spawning child processes in Rust, it won't have IPC. + * + * Thus, this is a wrapper which is spawned by Rust, which will create a Node IPC channel and pipe it to a ZeroMQ Channel + * + * Main Nx Process + * * Calls Rust Fork Function + * * `node fork.js` + * * Create a Rust - Node.js Agnostic Channel aka Psuedo IPC Channel + * * This returns RustChildProcess + * * RustChildProcess.onMessage(msg => ()); + * * psuedo_ipc_channel.on_message() => tx.send(msg); + * * Node.js Fork Wrapper (fork.js) + * * fork(run-command.js) with `inherit` and `ipc` + * * This will create a Node IPC Channel + * * channel = getPsuedoIpcChannel(process.env.NX_IPC_CHANNEL_ID) + * * forkChildProcess.on('message', writeToPsuedoIpcChannel) + */ + +import { connect, Server, Socket } from 'net'; +import { consumeMessagesFromSocket } from '../utils/consume-messages-from-socket'; +import { Serializable } from 'child_process'; + +export interface PsuedoIPCMessage { + type: 'TO_CHILDREN_FROM_PARENT' | 'TO_PARENT_FROM_CHILDREN' | 'CHILD_READY'; + id: string | undefined; + message: Serializable; +} + +export class PsuedoIPCServer { + private sockets = new Set(); + private server: Server | undefined; + + private childMessages: { + onMessage: (message: Serializable) => void; + onClose?: () => void; + onError?: (err: Error) => void; + }[] = []; + + constructor(private path: string) {} + + init(): Promise { + return new Promise((res) => { + this.server = new Server((socket) => { + this.sockets.add(socket); + this.registerChildMessages(socket); + socket.on('close', () => { + this.sockets.delete(socket); + }); + }); + this.server.listen(this.path, () => { + res(); + }); + }); + } + + private childReadyMap = new Map void>(); + + async waitForChildReady(childId: string) { + return new Promise((res) => { + this.childReadyMap.set(childId, res); + }); + } + + private registerChildMessages(socket: Socket) { + socket.on( + 'data', + consumeMessagesFromSocket(async (rawMessage) => { + const { type, message }: PsuedoIPCMessage = JSON.parse(rawMessage); + if (type === 'TO_PARENT_FROM_CHILDREN') { + for (const childMessage of this.childMessages) { + childMessage.onMessage(message); + } + } else if (type === 'CHILD_READY') { + const childId = message as string; + if (this.childReadyMap.has(childId)) { + this.childReadyMap.get(childId)(); + } + } + }) + ); + + socket.on('close', () => { + for (const childMessage of this.childMessages) { + childMessage.onClose?.(); + } + }); + socket.on('error', (err) => { + for (const childMessage of this.childMessages) { + childMessage.onError?.(err); + } + }); + } + + sendMessageToChildren(message: Serializable) { + this.sockets.forEach((socket) => { + socket.write( + JSON.stringify({ type: 'TO_CHILDREN_FROM_PARENT', message }) + ); + // send EOT to indicate that the message has been fully written + socket.write(String.fromCodePoint(4)); + }); + } + + sendMessageToChild(id: string, message: Serializable) { + this.sockets.forEach((socket) => { + socket.write( + JSON.stringify({ type: 'TO_CHILDREN_FROM_PARENT', id, message }) + ); + socket.write(String.fromCodePoint(4)); + }); + } + onMessageFromChildren( + onMessage: (message: Serializable) => void, + onClose: () => void = () => {}, + onError: (err: Error) => void = (err) => {} + ) { + this.childMessages.push({ + onMessage, + onClose, + onError, + }); + } + + close() { + this.server?.close(); + this.sockets.forEach((s) => s.destroy()); + } +} + +export class PsuedoIPCClient { + private socket: Socket | undefined = connect(this.path); + + constructor(private path: string) {} + sendMessageToParent(message: Serializable) { + this.socket.write( + JSON.stringify({ type: 'TO_PARENT_FROM_CHILDREN', message }) + ); + // send EOT to indicate that the message has been fully written + this.socket.write(String.fromCodePoint(4)); + } + + notifyChildIsReady(id: string) { + this.socket.write( + JSON.stringify({ + type: 'CHILD_READY', + message: id, + } as PsuedoIPCMessage) + ); + // send EOT to indicate that the message has been fully written + this.socket.write(String.fromCodePoint(4)); + } + + onMessageFromParent( + forkId: string, + onMessage: (message: Serializable) => void, + onClose: () => void = () => {}, + onError: (err: Error) => void = (err) => {} + ) { + this.socket.on( + 'data', + consumeMessagesFromSocket(async (rawMessage) => { + const { id, type, message }: PsuedoIPCMessage = JSON.parse(rawMessage); + if (type === 'TO_CHILDREN_FROM_PARENT') { + if (id && id === forkId) { + onMessage(message); + } else if (id === undefined) { + onMessage(message); + } + } + }) + ); + + this.socket.on('close', onClose); + this.socket.on('error', onError); + + return this; + } + close() { + this.socket?.destroy(); + } +} diff --git a/packages/nx/src/tasks-runner/task-orchestrator.ts b/packages/nx/src/tasks-runner/task-orchestrator.ts index 08f65e0740..79115464cc 100644 --- a/packages/nx/src/tasks-runner/task-orchestrator.ts +++ b/packages/nx/src/tasks-runner/task-orchestrator.ts @@ -66,6 +66,9 @@ export class TaskOrchestrator { ) {} async run() { + // Init the ForkedProcessTaskRunner + await this.forkedProcessTaskRunner.init(); + // initial scheduling await this.scheduleNextTasks(); @@ -88,6 +91,7 @@ export class TaskOrchestrator { 'task-execution:start', 'task-execution:end' ); + this.forkedProcessTaskRunner.destroy(); this.cache.removeOldCacheRecords(); return this.completedTasks; @@ -398,25 +402,22 @@ export class TaskOrchestrator { ) { try { // execution - const { code, terminalOutput } = pipeOutput - ? await this.forkedProcessTaskRunner.forkProcessPipeOutputCapture( - task, - { + const { code, terminalOutput } = + process.env.NX_NATIVE_COMMAND_RUNNER !== 'false' + ? await this.forkedProcessTaskRunner.forkProcess(task, { temporaryOutputPath, streamOutput, + pipeOutput, taskGraph: this.taskGraph, env, - } - ) - : await this.forkedProcessTaskRunner.forkProcessDirectOutputCapture( - task, - { + }) + : await this.forkedProcessTaskRunner.forkProcessLegacy(task, { temporaryOutputPath, streamOutput, + pipeOutput, taskGraph: this.taskGraph, env, - } - ); + }); return { code, @@ -565,6 +566,10 @@ export class TaskOrchestrator { private async pipeOutputCapture(task: Task) { try { + if (process.env.NX_NATIVE_COMMAND_RUNNER !== 'false') { + return true; + } + const { schema } = await getExecutorForTask(task, this.projectGraph); return ( diff --git a/packages/nx/src/utils/child-process.ts b/packages/nx/src/utils/child-process.ts index eb37171c45..ae73ee5a1c 100644 --- a/packages/nx/src/utils/child-process.ts +++ b/packages/nx/src/utils/child-process.ts @@ -3,6 +3,7 @@ import { existsSync } from 'fs'; import { join, relative } from 'path'; import { getPackageManagerCommand } from './package-manager'; import { workspaceRoot, workspaceRootInner } from './workspace-root'; +import { ChildProcess } from '../native'; export function runNxSync( cmd: string, @@ -26,3 +27,37 @@ export function runNxSync( } execSync(`${baseCmd} ${cmd}`, options); } + +export class PseudoTtyProcess { + isAlive = true; + + exitCallbacks = []; + + constructor(private childProcess: ChildProcess) { + childProcess.onExit((exitCode) => { + this.isAlive = false; + this.exitCallbacks.forEach((cb) => cb(exitCode)); + }); + } + + onExit(callback: (code: number) => void): void { + this.exitCallbacks.push(callback); + } + + onOutput(callback: (message: string) => void): void { + this.childProcess.onOutput(callback); + } + + kill(): void { + try { + this.childProcess.kill(); + } catch { + // when the child process completes before we explicitly call kill, this will throw + // do nothing + } finally { + if (this.isAlive == true) { + this.isAlive = false; + } + } + } +}