feat(core): add maxCacheSize option to limit local artifact size (#29654)
<!-- Please make sure you have read the submission guidelines before posting an PR --> <!-- https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr --> <!-- Please make sure that your commit message follows our format --> <!-- Example: `fix(nx): must begin with lowercase` --> <!-- If this is a particularly complex change or feature addition, you can request a dedicated Nx release for this pull request branch. Mention someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they will confirm if the PR warrants its own release for testing purposes, and generate it for you if appropriate. --> ## Current Behavior Cache artifacts are removed based on age at a random interval. There is not a way to set a max size for the cache, so it can grow quite large in certain repos ## Expected Behavior Cache size can be controlled via `maxCacheSize` in `nx.json`. Cache artifacts are removed based on usage until the limit has been reached. ## Related Issue(s) <!-- Please link the issue being fixed so it gets closed when this is merged. --> Fixes # --------- Co-authored-by: FrozenPandaz <jasonjean1993@gmail.com>
This commit is contained in:
parent
c57932ef66
commit
e4f5224e9e
103
Cargo.lock
generated
103
Cargo.lock
generated
@ -1,6 +1,6 @@
|
||||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
version = 3
|
||||
version = 4
|
||||
|
||||
[[package]]
|
||||
name = "Inflector"
|
||||
@ -33,7 +33,7 @@ version = "0.7.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
"getrandom 0.2.12",
|
||||
"once_cell",
|
||||
"version_check",
|
||||
]
|
||||
@ -47,7 +47,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"once_cell",
|
||||
"version_check",
|
||||
"zerocopy",
|
||||
"zerocopy 0.7.32",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -673,7 +673,19 @@ checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"wasi",
|
||||
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"wasi 0.13.3+wasi-0.2.2",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1268,7 +1280,7 @@ checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"log",
|
||||
"wasi",
|
||||
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
@ -1498,6 +1510,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"portable-pty",
|
||||
"rand 0.9.0",
|
||||
"rayon",
|
||||
"regex",
|
||||
"rkyv",
|
||||
@ -1578,7 +1591,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d5285893bb5eb82e6aaf5d59ee909a06a16737a8970984dd7746ba9283498d6"
|
||||
dependencies = [
|
||||
"phf_shared",
|
||||
"rand",
|
||||
"rand 0.8.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1770,8 +1783,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"rand_chacha",
|
||||
"rand_core",
|
||||
"rand_chacha 0.3.1",
|
||||
"rand_core 0.6.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
|
||||
dependencies = [
|
||||
"rand_chacha 0.9.0",
|
||||
"rand_core 0.9.3",
|
||||
"zerocopy 0.8.23",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1781,7 +1805,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
|
||||
dependencies = [
|
||||
"ppv-lite86",
|
||||
"rand_core",
|
||||
"rand_core 0.6.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
|
||||
dependencies = [
|
||||
"ppv-lite86",
|
||||
"rand_core 0.9.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1790,7 +1824,16 @@ version = "0.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
"getrandom 0.2.12",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
|
||||
dependencies = [
|
||||
"getrandom 0.3.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -2700,6 +2743,15 @@ version = "0.11.0+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.13.3+wasi-0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2"
|
||||
dependencies = [
|
||||
"wit-bindgen-rt",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "watchexec"
|
||||
version = "3.0.1"
|
||||
@ -2998,6 +3050,15 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wit-bindgen-rt"
|
||||
version = "0.33.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c"
|
||||
dependencies = [
|
||||
"bitflags 2.6.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wyz"
|
||||
version = "0.5.1"
|
||||
@ -3019,7 +3080,16 @@ version = "0.7.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be"
|
||||
dependencies = [
|
||||
"zerocopy-derive",
|
||||
"zerocopy-derive 0.7.32",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.8.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6"
|
||||
dependencies = [
|
||||
"zerocopy-derive 0.8.23",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -3032,3 +3102,14 @@ dependencies = [
|
||||
"quote",
|
||||
"syn 2.0.53",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy-derive"
|
||||
version = "0.8.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.53",
|
||||
]
|
||||
|
||||
@ -28,6 +28,7 @@ Nx.json configuration
|
||||
- [generators](../../devkit/documents/NxJsonConfiguration#generators): Object
|
||||
- [implicitDependencies](../../devkit/documents/NxJsonConfiguration#implicitdependencies): ImplicitDependencyEntry<T>
|
||||
- [installation](../../devkit/documents/NxJsonConfiguration#installation): NxInstallationConfiguration
|
||||
- [maxCacheSize](../../devkit/documents/NxJsonConfiguration#maxcachesize): string
|
||||
- [namedInputs](../../devkit/documents/NxJsonConfiguration#namedinputs): Object
|
||||
- [neverConnectToCloud](../../devkit/documents/NxJsonConfiguration#neverconnecttocloud): boolean
|
||||
- [nxCloudAccessToken](../../devkit/documents/NxJsonConfiguration#nxcloudaccesstoken): string
|
||||
@ -162,6 +163,14 @@ useful for workspaces that don't have a root package.json + node_modules.
|
||||
|
||||
---
|
||||
|
||||
### maxCacheSize
|
||||
|
||||
• `Optional` **maxCacheSize**: `string`
|
||||
|
||||
Sets the maximum size of the local cache. Accepts a number followed by a unit (e.g. 100MB). Accepted units are B, KB, MB, and GB.
|
||||
|
||||
---
|
||||
|
||||
### namedInputs
|
||||
|
||||
• `Optional` **namedInputs**: `Object`
|
||||
|
||||
@ -26,6 +26,7 @@ use ProjectsConfigurations or NxJsonConfiguration
|
||||
- [generators](../../devkit/documents/Workspace#generators): Object
|
||||
- [implicitDependencies](../../devkit/documents/Workspace#implicitdependencies): ImplicitDependencyEntry<string[] | "\*">
|
||||
- [installation](../../devkit/documents/Workspace#installation): NxInstallationConfiguration
|
||||
- [maxCacheSize](../../devkit/documents/Workspace#maxcachesize): string
|
||||
- [namedInputs](../../devkit/documents/Workspace#namedinputs): Object
|
||||
- [neverConnectToCloud](../../devkit/documents/Workspace#neverconnecttocloud): boolean
|
||||
- [nxCloudAccessToken](../../devkit/documents/Workspace#nxcloudaccesstoken): string
|
||||
@ -202,6 +203,18 @@ useful for workspaces that don't have a root package.json + node_modules.
|
||||
|
||||
---
|
||||
|
||||
### maxCacheSize
|
||||
|
||||
• `Optional` **maxCacheSize**: `string`
|
||||
|
||||
Sets the maximum size of the local cache. Accepts a number followed by a unit (e.g. 100MB). Accepted units are B, KB, MB, and GB.
|
||||
|
||||
#### Inherited from
|
||||
|
||||
[NxJsonConfiguration](../../devkit/documents/NxJsonConfiguration).[maxCacheSize](../../devkit/documents/NxJsonConfiguration#maxcachesize)
|
||||
|
||||
---
|
||||
|
||||
### namedInputs
|
||||
|
||||
• `Optional` **namedInputs**: `Object`
|
||||
|
||||
@ -11,6 +11,9 @@ import {
|
||||
updateFile,
|
||||
updateJson,
|
||||
} from '@nx/e2e/utils';
|
||||
|
||||
import { readdir, stat } from 'fs/promises';
|
||||
|
||||
import { join } from 'path';
|
||||
|
||||
describe('cache', () => {
|
||||
@ -361,6 +364,66 @@ describe('cache', () => {
|
||||
expect(fourthRun).toContain('read the output from the cache');
|
||||
}, 120000);
|
||||
|
||||
it('should evict cache if larger than max cache size', async () => {
|
||||
runCLI('reset');
|
||||
updateJson(`nx.json`, (c) => {
|
||||
c.maxCacheSize = '500KB';
|
||||
return c;
|
||||
});
|
||||
|
||||
const lib = uniq('cache-size');
|
||||
|
||||
updateFile(
|
||||
`tools/copy.js`,
|
||||
'require("fs").cpSync(process.argv[2], process.argv[3], { recursive: true, force: true });'
|
||||
);
|
||||
updateFile(
|
||||
`libs/${lib}/project.json`,
|
||||
JSON.stringify({
|
||||
name: lib,
|
||||
targets: {
|
||||
write: {
|
||||
cache: true,
|
||||
command: 'node tools/copy.js {projectRoot}/src dist/{projectRoot}',
|
||||
inputs: ['{projectRoot}/hash.txt'],
|
||||
outputs: ['{workspaceRoot}/dist/{projectRoot}'],
|
||||
},
|
||||
},
|
||||
})
|
||||
);
|
||||
// 100KB file
|
||||
updateFile(`libs/${lib}/src/data.txt`, 'a'.repeat(100 * 1024));
|
||||
for (let i = 0; i < 10; ++i) {
|
||||
updateFile(`libs/${lib}/hash.txt`, i.toString());
|
||||
runCLI(`write ${lib}`);
|
||||
}
|
||||
|
||||
// Expect that size of cache artifacts in cacheDir is less than 1MB
|
||||
const cacheDir = tmpProjPath('.nx/cache');
|
||||
const cacheFiles = listFiles('.nx/cache');
|
||||
let cacheEntries = 0;
|
||||
let cacheEntriesSize = 0;
|
||||
for (const file of cacheFiles) {
|
||||
if (file.match(/^[0-9]+$/)) {
|
||||
cacheEntries += 1;
|
||||
cacheEntriesSize += await dirSize(join(cacheDir, file));
|
||||
console.log(
|
||||
'Checked cache entry',
|
||||
file,
|
||||
'size',
|
||||
cacheEntriesSize,
|
||||
'total entries',
|
||||
cacheEntries
|
||||
);
|
||||
}
|
||||
}
|
||||
console.log('Cache entries:', cacheEntries);
|
||||
console.log('Cache size:', cacheEntriesSize);
|
||||
expect(cacheEntries).toBeGreaterThan(1);
|
||||
expect(cacheEntries).toBeLessThan(10);
|
||||
expect(cacheEntriesSize).toBeLessThanOrEqual(500 * 1024);
|
||||
});
|
||||
|
||||
function expectCached(
|
||||
actualOutput: string,
|
||||
expectedCachedProjects: string[]
|
||||
@ -404,3 +467,27 @@ describe('cache', () => {
|
||||
expect(matchingProjects).toEqual(expectedProjects);
|
||||
}
|
||||
});
|
||||
|
||||
const dirSize = async (dir) => {
|
||||
const files = await readdir(dir, { withFileTypes: true });
|
||||
|
||||
const paths = files.map(async (file) => {
|
||||
const path = join(dir, file.name);
|
||||
|
||||
if (file.isDirectory()) return await dirSize(path);
|
||||
|
||||
if (file.isFile()) {
|
||||
const { size } = await stat(path);
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
console.log('Unknown file type', path);
|
||||
|
||||
return 0;
|
||||
});
|
||||
|
||||
return (await Promise.all(paths))
|
||||
.flat(Infinity)
|
||||
.reduce((i, size) => i + size, 0);
|
||||
};
|
||||
|
||||
@ -43,7 +43,7 @@ swc_common = "0.31.16"
|
||||
swc_ecma_parser = { version = "0.137.1", features = ["typescript"] }
|
||||
swc_ecma_visit = "0.93.0"
|
||||
swc_ecma_ast = "0.107.0"
|
||||
|
||||
rand = "0.9.0"
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
winapi = { version = "0.3", features = ["fileapi"] }
|
||||
|
||||
|
||||
@ -82,6 +82,7 @@ export const allowedWorkspaceExtensions = [
|
||||
'neverConnectToCloud',
|
||||
'sync',
|
||||
'useLegacyCache',
|
||||
'maxCacheSize',
|
||||
] as const;
|
||||
|
||||
if (!patched) {
|
||||
|
||||
@ -544,6 +544,11 @@ export interface NxJsonConfiguration<T = '*' | string[]> {
|
||||
* Use the legacy file system cache instead of the db cache
|
||||
*/
|
||||
useLegacyCache?: boolean;
|
||||
|
||||
/**
|
||||
* Sets the maximum size of the local cache. Accepts a number followed by a unit (e.g. 100MB). Accepted units are B, KB, MB, and GB.
|
||||
*/
|
||||
maxCacheSize?: string;
|
||||
}
|
||||
|
||||
export type PluginConfiguration = string | ExpandedPluginConfiguration;
|
||||
|
||||
110
packages/nx/src/native/cache/cache.rs
vendored
110
packages/nx/src/native/cache/cache.rs
vendored
@ -19,6 +19,7 @@ pub struct CachedResult {
|
||||
pub code: i16,
|
||||
pub terminal_output: String,
|
||||
pub outputs_path: String,
|
||||
pub size: Option<i64>,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
@ -28,6 +29,7 @@ pub struct NxCache {
|
||||
cache_path: PathBuf,
|
||||
db: External<NxDbConnection>,
|
||||
link_task_details: bool,
|
||||
max_cache_size: Option<i64>,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
@ -38,6 +40,7 @@ impl NxCache {
|
||||
cache_path: String,
|
||||
db_connection: External<NxDbConnection>,
|
||||
link_task_details: Option<bool>,
|
||||
max_cache_size: Option<i64>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let cache_path = PathBuf::from(&cache_path);
|
||||
|
||||
@ -50,6 +53,7 @@ impl NxCache {
|
||||
cache_directory: cache_path.to_normalized_string(),
|
||||
cache_path,
|
||||
link_task_details: link_task_details.unwrap_or(true),
|
||||
max_cache_size,
|
||||
};
|
||||
|
||||
r.setup()?;
|
||||
@ -62,6 +66,7 @@ impl NxCache {
|
||||
"CREATE TABLE IF NOT EXISTS cache_outputs (
|
||||
hash TEXT PRIMARY KEY NOT NULL,
|
||||
code INTEGER NOT NULL,
|
||||
size INTEGER NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (hash) REFERENCES task_details (hash)
|
||||
@ -71,6 +76,7 @@ impl NxCache {
|
||||
"CREATE TABLE IF NOT EXISTS cache_outputs (
|
||||
hash TEXT PRIMARY KEY NOT NULL,
|
||||
code INTEGER NOT NULL,
|
||||
size INTEGER NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
@ -95,10 +101,11 @@ impl NxCache {
|
||||
"UPDATE cache_outputs
|
||||
SET accessed_at = CURRENT_TIMESTAMP
|
||||
WHERE hash = ?1
|
||||
RETURNING code",
|
||||
RETURNING code, size",
|
||||
params![hash],
|
||||
|row| {
|
||||
let code: i16 = row.get(0)?;
|
||||
let size: i64 = row.get(1)?;
|
||||
|
||||
let start = Instant::now();
|
||||
let terminal_output =
|
||||
@ -109,6 +116,7 @@ impl NxCache {
|
||||
code,
|
||||
terminal_output,
|
||||
outputs_path: task_dir.to_normalized_string(),
|
||||
size: Some(size),
|
||||
})
|
||||
},
|
||||
)
|
||||
@ -139,6 +147,7 @@ impl NxCache {
|
||||
// Write the terminal outputs into a file
|
||||
let task_outputs_path = self.get_task_outputs_path_internal(&hash);
|
||||
trace!("Writing terminal outputs to: {:?}", &task_outputs_path);
|
||||
let mut total_size: i64 = terminal_output.len() as i64;
|
||||
write(task_outputs_path, terminal_output)?;
|
||||
|
||||
// Expand the outputs
|
||||
@ -150,11 +159,11 @@ impl NxCache {
|
||||
if p.exists() {
|
||||
let cached_outputs_dir = task_dir.join(expanded_output);
|
||||
trace!("Copying {:?} -> {:?}", &p, &cached_outputs_dir);
|
||||
_copy(p, cached_outputs_dir)?;
|
||||
total_size += _copy(p, cached_outputs_dir)?;
|
||||
}
|
||||
}
|
||||
|
||||
self.record_to_cache(hash, code)?;
|
||||
self.record_to_cache(hash, code, total_size)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -163,17 +172,22 @@ impl NxCache {
|
||||
&self,
|
||||
hash: String,
|
||||
result: CachedResult,
|
||||
outputs: Vec<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
trace!(
|
||||
"applying remote cache results: {:?} ({})",
|
||||
&hash,
|
||||
&result.outputs_path
|
||||
);
|
||||
let terminal_output = result.terminal_output;
|
||||
let terminal_output = result.terminal_output.clone();
|
||||
let mut size = terminal_output.len() as i64;
|
||||
if outputs.len() > 0 && result.code == 0 {
|
||||
size += try_and_retry(|| self.copy_files_from_cache(result.clone(), outputs.clone()))?;
|
||||
};
|
||||
write(self.get_task_outputs_path(hash.clone()), terminal_output)?;
|
||||
|
||||
let code: i16 = result.code;
|
||||
self.record_to_cache(hash, code)?;
|
||||
self.record_to_cache(hash, code, size)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -187,12 +201,58 @@ impl NxCache {
|
||||
.to_normalized_string()
|
||||
}
|
||||
|
||||
fn record_to_cache(&self, hash: String, code: i16) -> anyhow::Result<()> {
|
||||
trace!("Recording to cache: {}, {}", &hash, code);
|
||||
fn record_to_cache(&self, hash: String, code: i16, size: i64) -> anyhow::Result<()> {
|
||||
trace!("Recording to cache: {}, {}, {}", &hash, code, size);
|
||||
self.db.execute(
|
||||
"INSERT OR REPLACE INTO cache_outputs (hash, code) VALUES (?1, ?2)",
|
||||
params![hash, code],
|
||||
"INSERT OR REPLACE INTO cache_outputs (hash, code, size) VALUES (?1, ?2, ?3)",
|
||||
params![hash, code, size],
|
||||
)?;
|
||||
if self.max_cache_size.is_some() {
|
||||
self.ensure_cache_size_within_limit()?
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_cache_size_within_limit(&self) -> anyhow::Result<()> {
|
||||
if let Some(user_specified_max_cache_size) = self.max_cache_size {
|
||||
let buffer_amount = (0.1 * user_specified_max_cache_size as f64) as i64;
|
||||
let target_cache_size = user_specified_max_cache_size - buffer_amount;
|
||||
|
||||
let full_cache_size = self
|
||||
.db
|
||||
.query_row("SELECT SUM(size) FROM cache_outputs", [], |row| {
|
||||
row.get::<_, i64>(0)
|
||||
})?
|
||||
.unwrap_or(0);
|
||||
if user_specified_max_cache_size < full_cache_size {
|
||||
let mut cache_size = full_cache_size;
|
||||
let mut stmt = self.db.prepare(
|
||||
"SELECT hash, size FROM cache_outputs ORDER BY accessed_at ASC LIMIT 100",
|
||||
)?;
|
||||
'outer: while cache_size > target_cache_size {
|
||||
let rows = stmt.query_map([], |r| {
|
||||
let hash: String = r.get(0)?;
|
||||
let size: i64 = r.get(1)?;
|
||||
Ok((hash, size))
|
||||
})?;
|
||||
for row in rows {
|
||||
if let Ok((hash, size)) = row {
|
||||
cache_size -= size;
|
||||
self.db.execute(
|
||||
"DELETE FROM cache_outputs WHERE hash = ?1",
|
||||
params![hash],
|
||||
)?;
|
||||
remove_items(&[self.cache_path.join(&hash)])?;
|
||||
}
|
||||
// We've deleted enough cache entries to be under the
|
||||
// target cache size, stop looking for more.
|
||||
if cache_size < target_cache_size {
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -201,7 +261,7 @@ impl NxCache {
|
||||
&self,
|
||||
cached_result: CachedResult,
|
||||
outputs: Vec<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> anyhow::Result<i64> {
|
||||
let outputs_path = Path::new(&cached_result.outputs_path);
|
||||
|
||||
let expanded_outputs = _expand_outputs(outputs_path, outputs)?;
|
||||
@ -220,9 +280,9 @@ impl NxCache {
|
||||
&outputs_path,
|
||||
&self.workspace_root
|
||||
);
|
||||
_copy(outputs_path, &self.workspace_root)?;
|
||||
let sz = _copy(outputs_path, &self.workspace_root)?;
|
||||
|
||||
Ok(())
|
||||
Ok(sz)
|
||||
}
|
||||
|
||||
#[napi]
|
||||
@ -285,3 +345,29 @@ impl NxCache {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn try_and_retry<T, F>(mut f: F) -> anyhow::Result<T>
|
||||
where
|
||||
F: FnMut() -> anyhow::Result<T>,
|
||||
{
|
||||
let mut attempts = 0;
|
||||
// Generate a random number between 2 and 4 to raise to the power of attempts
|
||||
let base_exponent = rand::random::<f64>() * 2.0 + 2.0;
|
||||
let base_timeout = 15;
|
||||
|
||||
loop {
|
||||
attempts += 1;
|
||||
match f() {
|
||||
Ok(result) => return Ok(result),
|
||||
Err(e) => {
|
||||
// Max time is 15 * (4 + 4² + 4³ + 4⁴ + 4⁵) = 20460ms
|
||||
if attempts == 6 {
|
||||
// After enough attempts, throw the error
|
||||
return Err(e);
|
||||
}
|
||||
let timeout = base_timeout as f64 * base_exponent.powi(attempts);
|
||||
std::thread::sleep(std::time::Duration::from_millis(timeout as u64));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
35
packages/nx/src/native/cache/file_ops.rs
vendored
35
packages/nx/src/native/cache/file_ops.rs
vendored
@ -13,11 +13,14 @@ pub fn remove(src: String) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn copy(src: String, dest: String) -> anyhow::Result<()> {
|
||||
pub fn copy(src: String, dest: String) -> anyhow::Result<i64> {
|
||||
_copy(src, dest)
|
||||
}
|
||||
|
||||
pub fn _copy<P>(src: P, dest: P) -> anyhow::Result<()> where P: AsRef<Path> {
|
||||
pub fn _copy<P>(src: P, dest: P) -> anyhow::Result<i64>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let dest: PathBuf = remove_trailing_single_dot(dest);
|
||||
|
||||
let dest_parent = dest.parent().unwrap_or(&dest);
|
||||
@ -28,15 +31,16 @@ pub fn _copy<P>(src: P, dest: P) -> anyhow::Result<()> where P: AsRef<Path> {
|
||||
fs::create_dir_all(dest_parent)?;
|
||||
}
|
||||
|
||||
if src.is_dir() {
|
||||
copy_dir_all(&src, dest).map_err(anyhow::Error::new)?;
|
||||
let size = if src.is_dir() {
|
||||
copy_dir_all(&src, dest).map_err(anyhow::Error::new)?
|
||||
} else if src.is_symlink() {
|
||||
symlink(fs::read_link(src)?, dest)?;
|
||||
0
|
||||
} else {
|
||||
fs::copy(src, dest)?;
|
||||
}
|
||||
fs::copy(src, dest)?
|
||||
};
|
||||
|
||||
Ok(())
|
||||
Ok(size as i64)
|
||||
}
|
||||
|
||||
fn remove_trailing_single_dot(path: impl AsRef<Path>) -> PathBuf {
|
||||
@ -66,26 +70,29 @@ fn symlink<P: AsRef<Path>, Q: AsRef<Path>>(original: P, link: Q) -> io::Result<(
|
||||
std::os::wasi::fs::symlink_path(original, link)
|
||||
}
|
||||
|
||||
fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> io::Result<()> {
|
||||
|
||||
fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> io::Result<u64> {
|
||||
trace!("creating directory: {:?}", dst.as_ref());
|
||||
fs::create_dir_all(&dst)?;
|
||||
trace!("reading source directory: {:?}", src.as_ref());
|
||||
let mut total_size = 0;
|
||||
for entry in fs::read_dir(src)? {
|
||||
let entry = entry?;
|
||||
let ty = entry.file_type()?;
|
||||
if ty.is_dir() {
|
||||
copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
|
||||
let size: u64 = if ty.is_dir() {
|
||||
copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?
|
||||
} else if ty.is_symlink() {
|
||||
symlink(
|
||||
fs::read_link(entry.path())?,
|
||||
dst.as_ref().join(entry.file_name()),
|
||||
)?;
|
||||
// Handle this
|
||||
0
|
||||
} else {
|
||||
fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
|
||||
}
|
||||
fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?
|
||||
};
|
||||
total_size += size;
|
||||
}
|
||||
Ok(())
|
||||
Ok(total_size)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
9
packages/nx/src/native/index.d.ts
vendored
9
packages/nx/src/native/index.d.ts
vendored
@ -37,12 +37,12 @@ export declare class ImportResult {
|
||||
|
||||
export declare class NxCache {
|
||||
cacheDirectory: string
|
||||
constructor(workspaceRoot: string, cachePath: string, dbConnection: ExternalObject<NxDbConnection>, linkTaskDetails?: boolean | undefined | null)
|
||||
constructor(workspaceRoot: string, cachePath: string, dbConnection: ExternalObject<NxDbConnection>, linkTaskDetails?: boolean | undefined | null, maxCacheSize?: number | undefined | null)
|
||||
get(hash: string): CachedResult | null
|
||||
put(hash: string, terminalOutput: string, outputs: Array<string>, code: number): void
|
||||
applyRemoteCacheResults(hash: string, result: CachedResult): void
|
||||
applyRemoteCacheResults(hash: string, result: CachedResult, outputs: Array<string>): void
|
||||
getTaskOutputsPath(hash: string): string
|
||||
copyFilesFromCache(cachedResult: CachedResult, outputs: Array<string>): void
|
||||
copyFilesFromCache(cachedResult: CachedResult, outputs: Array<string>): number
|
||||
removeOldCacheRecords(): void
|
||||
checkCacheFsInSync(): boolean
|
||||
}
|
||||
@ -112,13 +112,14 @@ export interface CachedResult {
|
||||
code: number
|
||||
terminalOutput: string
|
||||
outputsPath: string
|
||||
size?: number
|
||||
}
|
||||
|
||||
export declare export function closeDbConnection(connection: ExternalObject<NxDbConnection>): void
|
||||
|
||||
export declare export function connectToNxDb(cacheDir: string, nxVersion: string, dbName?: string | undefined | null): ExternalObject<NxDbConnection>
|
||||
|
||||
export declare export function copy(src: string, dest: string): void
|
||||
export declare export function copy(src: string, dest: string): number
|
||||
|
||||
export interface DepsOutputsInput {
|
||||
dependentTasksOutputFiles: string
|
||||
|
||||
41
packages/nx/src/tasks-runner/cache.spec.ts
Normal file
41
packages/nx/src/tasks-runner/cache.spec.ts
Normal file
@ -0,0 +1,41 @@
|
||||
import { parseMaxCacheSize } from './cache';
|
||||
|
||||
describe('cache', () => {
|
||||
describe('parseMaxCacheSize', () => {
|
||||
it('should parse KB', () => {
|
||||
expect(parseMaxCacheSize('1KB')).toEqual(1024);
|
||||
});
|
||||
|
||||
it('should parse MB', () => {
|
||||
expect(parseMaxCacheSize('1MB')).toEqual(1024 * 1024);
|
||||
});
|
||||
|
||||
it('should parse GB', () => {
|
||||
expect(parseMaxCacheSize('1GB')).toEqual(1024 * 1024 * 1024);
|
||||
});
|
||||
|
||||
it('should parse B', () => {
|
||||
expect(parseMaxCacheSize('1B')).toEqual(1);
|
||||
});
|
||||
|
||||
it('should parse as bytes by default', () => {
|
||||
expect(parseMaxCacheSize('1')).toEqual(1);
|
||||
});
|
||||
|
||||
it('should handle decimals', () => {
|
||||
expect(parseMaxCacheSize('1.5KB')).toEqual(1024 * 1.5);
|
||||
});
|
||||
|
||||
it('should error if invalid unit', () => {
|
||||
expect(() => parseMaxCacheSize('1ZB')).toThrow();
|
||||
});
|
||||
|
||||
it('should error if invalid number', () => {
|
||||
expect(() => parseMaxCacheSize('abc')).toThrow();
|
||||
});
|
||||
|
||||
it('should error if multiple decimal points', () => {
|
||||
expect(() => parseMaxCacheSize('1.5.5KB')).toThrow;
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -89,7 +89,14 @@ export function getCache(options: DefaultTasksRunnerOptions): DbCache | Cache {
|
||||
}
|
||||
|
||||
export class DbCache {
|
||||
private cache = new NxCache(workspaceRoot, cacheDir, getDbConnection());
|
||||
private nxJson = readNxJson();
|
||||
private cache = new NxCache(
|
||||
workspaceRoot,
|
||||
cacheDir,
|
||||
getDbConnection(),
|
||||
undefined,
|
||||
parseMaxCacheSize(this.nxJson.maxCacheSize)
|
||||
);
|
||||
|
||||
private remoteCache: RemoteCacheV2 | null;
|
||||
private remoteCachePromise: Promise<RemoteCacheV2>;
|
||||
@ -129,7 +136,7 @@ export class DbCache {
|
||||
);
|
||||
|
||||
if (res) {
|
||||
this.applyRemoteCacheResults(task.hash, res);
|
||||
this.applyRemoteCacheResults(task.hash, res, task.outputs);
|
||||
|
||||
return {
|
||||
...res,
|
||||
@ -143,8 +150,12 @@ export class DbCache {
|
||||
}
|
||||
}
|
||||
|
||||
private applyRemoteCacheResults(hash: string, res: NativeCacheResult) {
|
||||
return this.cache.applyRemoteCacheResults(hash, res);
|
||||
private applyRemoteCacheResults(
|
||||
hash: string,
|
||||
res: NativeCacheResult,
|
||||
outputs: string[]
|
||||
) {
|
||||
return this.cache.applyRemoteCacheResults(hash, res, outputs);
|
||||
}
|
||||
|
||||
async put(
|
||||
@ -574,3 +585,49 @@ function tryAndRetry<T>(fn: () => Promise<T>): Promise<T> {
|
||||
};
|
||||
return _try();
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a string representation of a max cache size to a number.
|
||||
*
|
||||
* e.g. '1GB' -> 1024 * 1024 * 1024
|
||||
* '1MB' -> 1024 * 1024
|
||||
* '1KB' -> 1024
|
||||
*
|
||||
* @param maxCacheSize Max cache size as specified in nx.json
|
||||
*/
|
||||
export function parseMaxCacheSize(maxCacheSize: string): number | undefined {
|
||||
if (!maxCacheSize) {
|
||||
return undefined;
|
||||
}
|
||||
let regexResult = maxCacheSize.match(
|
||||
/^(?<size>[\d|.]+)\s?((?<unit>[KMG]?B)?)$/
|
||||
);
|
||||
if (!regexResult) {
|
||||
throw new Error(
|
||||
`Invalid max cache size specified in nx.json: ${maxCacheSize}. Must be a number followed by an optional unit (KB, MB, GB)`
|
||||
);
|
||||
}
|
||||
let sizeString = regexResult.groups.size;
|
||||
let unit = regexResult.groups.unit;
|
||||
if ([...sizeString].filter((c) => c === '.').length > 1) {
|
||||
throw new Error(
|
||||
`Invalid max cache size specified in nx.json: ${maxCacheSize} (multiple decimal points in size)`
|
||||
);
|
||||
}
|
||||
let size = parseFloat(sizeString);
|
||||
if (isNaN(size)) {
|
||||
throw new Error(
|
||||
`Invalid max cache size specified in nx.json: ${maxCacheSize} (${sizeString} is not a number)`
|
||||
);
|
||||
}
|
||||
switch (unit) {
|
||||
case 'KB':
|
||||
return size * 1024;
|
||||
case 'MB':
|
||||
return size * 1024 * 1024;
|
||||
case 'GB':
|
||||
return size * 1024 * 1024 * 1024;
|
||||
default:
|
||||
return size;
|
||||
}
|
||||
}
|
||||
|
||||
@ -235,7 +235,11 @@ export class TaskOrchestrator {
|
||||
|
||||
const outputs = task.outputs;
|
||||
const shouldCopyOutputsFromCache =
|
||||
// No output files to restore
|
||||
!!outputs.length &&
|
||||
// Remote caches are restored to output dirs when applied
|
||||
!cachedResult.remote &&
|
||||
// Output files have not been touched since last run
|
||||
(await this.shouldCopyOutputsFromCache(outputs, task.hash));
|
||||
if (shouldCopyOutputsFromCache) {
|
||||
await this.cache.copyFilesFromCache(task.hash, cachedResult, outputs);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user