feat(core): add support for http based caches (#30593)
Implements http based remote caches per the RFC here: https://github.com/nrwl/nx/discussions/30548
This commit is contained in:
parent
494f150a69
commit
0525426a51
881
Cargo.lock
generated
881
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -44,7 +44,160 @@ To learn more about migrating from custom task runners, [please refer to this de
|
||||
|
||||
## Build Your Own Caching Server
|
||||
|
||||
We have [published a new RFC](https://github.com/nrwl/nx/discussions/30548) detailing a custom self-hosted cache based on an OpenAPI specification. This will be available before Nx 21, ensuring a smooth migration path for those who are looking for full control.
|
||||
Starting in Nx version 20.8, you can build your own caching server using the OpenAPI specification provided below. This allows you to create a custom remote cache server that fits your specific needs. The server is responsible for managing all aspects of the remote cache, including storage, retrieval, and authentication.
|
||||
|
||||
Implementation is left up to you, but the server must adhere to the OpenAPI specification provided below to ensure compatibility with Nx's caching mechanism. The endpoints described below involve the transfer of tar archives which are sent as binary data. It is important to note that the underlying format of that data is subject to change in future versions of Nx, but the OpenAPI specification should remain stable.
|
||||
|
||||
As long as your server adheres to the OpenAPI spec, you can implement it in any programming language or framework of your choice.
|
||||
|
||||
### Open API Specification
|
||||
|
||||
```json {% fileName="Nx 20.8+" %}
|
||||
{
|
||||
"openapi": "3.0.0",
|
||||
"info": {
|
||||
"title": "Nx custom remote cache specification.",
|
||||
"description": "Nx is a build system, optimized for monorepos, with AI-powered architectural awareness and advanced CI capabilities.",
|
||||
"version": "1.0.0"
|
||||
},
|
||||
"paths": {
|
||||
"/v1/cache/{hash}": {
|
||||
"put": {
|
||||
"description": "Upload a task output",
|
||||
"operationId": "put",
|
||||
"security": [
|
||||
{
|
||||
"bearerToken": []
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"202": {
|
||||
"description": "Successfully uploaded the output"
|
||||
},
|
||||
"401": {
|
||||
"description": "Missing or invalid authentication token.",
|
||||
"content": {
|
||||
"text/plain": {
|
||||
"schema": {
|
||||
"type": "string",
|
||||
"description": "Error message provided to the Nx CLI user"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"403": {
|
||||
"description": "Access forbidden. (e.g. read-only token used to write)",
|
||||
"content": {
|
||||
"text/plain": {
|
||||
"schema": {
|
||||
"type": "string",
|
||||
"description": "Error message provided to the Nx CLI user"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"409": {
|
||||
"description": "Cannot override an existing record"
|
||||
}
|
||||
},
|
||||
"parameters": [
|
||||
{
|
||||
"in": "header",
|
||||
"description": "The file size in bytes",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"type": "number"
|
||||
},
|
||||
"name": "Content-Length"
|
||||
},
|
||||
{
|
||||
"name": "hash",
|
||||
"description": "The task hash corresponding to the uploaded task output",
|
||||
"in": "path",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
],
|
||||
"requestBody": {
|
||||
"content": {
|
||||
"application/octet-stream": {
|
||||
"schema": {
|
||||
"type": "string",
|
||||
"format": "binary"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"get": {
|
||||
"description": "Download a task output",
|
||||
"operationId": "get",
|
||||
"security": [
|
||||
{
|
||||
"bearerToken": []
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successfully retrieved cache artifact",
|
||||
"content": {
|
||||
"application/octet-stream": {
|
||||
"schema": {
|
||||
"type": "string",
|
||||
"format": "binary",
|
||||
"description": "An octet stream with the content."
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"403": {
|
||||
"description": "Access forbidden",
|
||||
"content": {
|
||||
"text/plain": {
|
||||
"schema": {
|
||||
"type": "string",
|
||||
"description": "Error message provided to the Nx CLI user"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "The record was not found"
|
||||
}
|
||||
},
|
||||
"parameters": [
|
||||
{
|
||||
"name": "hash",
|
||||
"in": "path",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"components": {
|
||||
"securitySchemes": {
|
||||
"bearerToken": {
|
||||
"type": "http",
|
||||
"description": "Auth mechanism",
|
||||
"scheme": "bearer"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Usage Notes
|
||||
|
||||
To use your custom caching server, you must set the `NX_SELF_HOSTED_REMOTE_CACHE_SERVER` environment variable. Additionally, the following environment variables also affect the behavior:
|
||||
|
||||
- `NX_SELF_HOSTED_REMOTE_CACHE_ACCESS_TOKEN`: The authentication token to access the cache server.
|
||||
- `NODE_TLS_REJECT_UNAUTHORIZED`: Set to `0` to disable TLS certificate validation.
|
||||
|
||||
## Why Switch to Nx Cloud
|
||||
|
||||
|
||||
69
e2e/nx/src/__fixtures__/remote-cache.js
Normal file
69
e2e/nx/src/__fixtures__/remote-cache.js
Normal file
@ -0,0 +1,69 @@
|
||||
//@ts-check
|
||||
|
||||
const http = require('http');
|
||||
|
||||
const inMemoryCache = {};
|
||||
|
||||
// IMPORTANT: This implementation serves only as a test fixture
|
||||
// and is not intended for production use. It is a simple in-memory cache server.
|
||||
// If one was to wish to use something like this in production, the following
|
||||
// items should be considered:
|
||||
// - Persistence: Use a database or a file system for storage
|
||||
// - Security:
|
||||
// - Implement proper authentication and authorization
|
||||
// - Ensure existing data is not overwritten without checks
|
||||
const server = http.createServer((req, res) => {
|
||||
const url = req.url;
|
||||
const parts = url?.split('/');
|
||||
const hash = parts?.[parts.length - 1];
|
||||
|
||||
const auth = req.headers.authorization;
|
||||
if (auth !== 'Bearer test-token') {
|
||||
res.statusCode = 401;
|
||||
res.setHeader('Content-Type', 'text/plain');
|
||||
res.end(
|
||||
'Unauthorized: Missing or invalid token. Set NX_SELF_HOSTED_REMOTE_CACHE_ACCESS_TOKEN to proceed.'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.method === 'GET') {
|
||||
console.log('Checking for hash:', hash);
|
||||
console.log('In memory cache:', !!inMemoryCache[hash]);
|
||||
if (inMemoryCache[hash]) {
|
||||
res.statusCode = 200;
|
||||
res.setHeader('Content-Type', 'application/octet-stream');
|
||||
res.end(inMemoryCache[hash]);
|
||||
return;
|
||||
}
|
||||
console.log('Not found:', hash);
|
||||
res.statusCode = 404;
|
||||
res.end('Not found');
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.method === 'PUT') {
|
||||
req.on('data', (chunk) => {
|
||||
if (!inMemoryCache[hash]) {
|
||||
inMemoryCache[hash] = new ArrayBuffer(); // initialize if not present
|
||||
}
|
||||
// Append the chunk to the existing buffer
|
||||
const newBuffer = Buffer.concat([
|
||||
Buffer.from(inMemoryCache[hash]),
|
||||
chunk,
|
||||
]);
|
||||
inMemoryCache[hash] = newBuffer;
|
||||
});
|
||||
req.on('end', () => {
|
||||
console.log('Stored in memory cache:', hash);
|
||||
res.statusCode = 200;
|
||||
res.end('OK');
|
||||
});
|
||||
return;
|
||||
}
|
||||
});
|
||||
|
||||
const PORT = 3000;
|
||||
server.listen(PORT, () => {
|
||||
console.log(`Server running at http://localhost:${PORT}/`);
|
||||
});
|
||||
@ -1,9 +1,11 @@
|
||||
import {
|
||||
cleanupProject,
|
||||
directoryExists,
|
||||
fileExists,
|
||||
listFiles,
|
||||
newProject,
|
||||
readFile,
|
||||
removeFile,
|
||||
rmDist,
|
||||
runCLI,
|
||||
tmpProjPath,
|
||||
@ -11,6 +13,7 @@ import {
|
||||
updateFile,
|
||||
updateJson,
|
||||
} from '@nx/e2e/utils';
|
||||
import { fork } from 'child_process';
|
||||
|
||||
import { readdir, stat } from 'fs/promises';
|
||||
|
||||
@ -424,6 +427,111 @@ describe('cache', () => {
|
||||
expect(cacheEntriesSize).toBeLessThanOrEqual(500 * 1024);
|
||||
});
|
||||
|
||||
describe('http remote cache', () => {
|
||||
let cacheServer: any;
|
||||
beforeAll(() => {
|
||||
cacheServer = fork(join(__dirname, '__fixtures__', 'remote-cache.js'), {
|
||||
stdio: 'inherit',
|
||||
});
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
cacheServer.kill();
|
||||
});
|
||||
|
||||
it('should PUT and GET cache from remote cache', async () => {
|
||||
const projectName = uniq('myapp');
|
||||
const outputFilePath = `dist/${projectName}/output.txt`;
|
||||
updateFile(
|
||||
`projects/${projectName}/project.json`,
|
||||
JSON.stringify({
|
||||
name: projectName,
|
||||
targets: {
|
||||
build: {
|
||||
command: `node -e 'const {mkdirSync, writeFileSync} = require("fs"); mkdirSync("dist/${projectName}", {recursive: true}); writeFileSync("${outputFilePath}", "Hello World")'`,
|
||||
outputs: ['{workspaceRoot}/dist/{projectName}'],
|
||||
cache: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
);
|
||||
runCLI(`build ${projectName}`, {
|
||||
env: {
|
||||
NX_SELF_HOSTED_REMOTE_CACHE_SERVER: 'http://localhost:3000',
|
||||
NX_SELF_HOSTED_REMOTE_CACHE_ACCESS_TOKEN: 'test-token',
|
||||
},
|
||||
});
|
||||
// removing the file should not affect the cache retrieval,
|
||||
// but we can check that the file exists to ensure the cache is
|
||||
// being used.
|
||||
removeFile(outputFilePath);
|
||||
runCLI(`reset`);
|
||||
const output = runCLI(`build ${projectName}`, {
|
||||
env: {
|
||||
NX_SELF_HOSTED_REMOTE_CACHE_SERVER: 'http://localhost:3000',
|
||||
NX_SELF_HOSTED_REMOTE_CACHE_ACCESS_TOKEN: 'test-token',
|
||||
},
|
||||
});
|
||||
expectProjectMatchTaskCacheStatus(output, [projectName], 'remote cache');
|
||||
expect(fileExists(tmpProjPath(outputFilePath))).toBe(true);
|
||||
});
|
||||
|
||||
it('should handle 401 without ACCESS_TOKEN appropriately', async () => {
|
||||
const projectName = uniq('myapp');
|
||||
const outputFilePath = `dist/${projectName}/output.txt`;
|
||||
updateFile(
|
||||
`projects/${projectName}/project.json`,
|
||||
JSON.stringify({
|
||||
name: projectName,
|
||||
targets: {
|
||||
build: {
|
||||
command: `node -e 'const {mkdirSync, writeFileSync} = require("fs"); mkdirSync("dist/${projectName}", {recursive: true}); writeFileSync("${outputFilePath}", "Hello World")'`,
|
||||
outputs: ['{workspaceRoot}/dist/{projectName}'],
|
||||
cache: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
);
|
||||
const output = runCLI(`build ${projectName}`, {
|
||||
env: {
|
||||
NX_SELF_HOSTED_REMOTE_CACHE_SERVER: 'http://localhost:3000',
|
||||
},
|
||||
silenceError: true,
|
||||
});
|
||||
|
||||
expect(output).toContain(
|
||||
'Unauthorized: Missing or invalid token. Set NX_SELF_HOSTED_REMOTE_CACHE_ACCESS_TOKEN to proceed.'
|
||||
);
|
||||
});
|
||||
|
||||
it('should error if server is not running', async () => {
|
||||
const projectName = uniq('myapp');
|
||||
const outputFilePath = `dist/${projectName}/output.txt`;
|
||||
updateFile(
|
||||
`projects/${projectName}/project.json`,
|
||||
JSON.stringify({
|
||||
name: projectName,
|
||||
targets: {
|
||||
build: {
|
||||
command: `node -e 'const {mkdirSync, writeFileSync} = require("fs"); mkdirSync("dist/${projectName}", {recursive: true}); writeFileSync("${outputFilePath}", "Hello World")'`,
|
||||
outputs: ['{workspaceRoot}/dist/{projectName}'],
|
||||
cache: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
);
|
||||
const output = runCLI(`build ${projectName}`, {
|
||||
env: {
|
||||
NX_SELF_HOSTED_REMOTE_CACHE_SERVER: 'http://localhost:3001',
|
||||
NX_SELF_HOSTED_REMOTE_CACHE_ACCESS_TOKEN: 'test-token',
|
||||
},
|
||||
silenceError: true,
|
||||
});
|
||||
|
||||
expect(output).toContain('http://localhost:3001');
|
||||
});
|
||||
});
|
||||
|
||||
function expectCached(
|
||||
actualOutput: string,
|
||||
expectedCachedProjects: string[]
|
||||
|
||||
@ -17,6 +17,7 @@ colored = "2"
|
||||
crossbeam-channel = '0.5'
|
||||
dashmap = { version = "5.5.3", features = ["rayon"] }
|
||||
dunce = "1"
|
||||
flate2 = "1.1.1"
|
||||
fs_extra = "1.3.0"
|
||||
globset = "0.4.10"
|
||||
hashbrown = { version = "0.14.5", features = ["rayon", "rkyv"] }
|
||||
@ -34,6 +35,7 @@ nom = '7.1.3'
|
||||
regex = "1.9.1"
|
||||
rayon = "1.7.0"
|
||||
rkyv = { version = "0.7", features = ["validation"] }
|
||||
tar = "0.4.44"
|
||||
thiserror = "1.0.40"
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
|
||||
@ -57,6 +59,7 @@ portable-pty = { git = "https://github.com/cammisuli/wezterm", rev = "b538ee29e1
|
||||
crossterm = "0.27.0"
|
||||
ignore-files = "2.1.0"
|
||||
fs4 = "0.12.0"
|
||||
reqwest = "0.12.15"
|
||||
rusqlite = { version = "0.32.1", features = ["bundled", "array", "vtab"] }
|
||||
watchexec = "3.0.1"
|
||||
watchexec-events = "2.0.1"
|
||||
|
||||
84
packages/nx/src/native/cache/errors.rs
vendored
Normal file
84
packages/nx/src/native/cache/errors.rs
vendored
Normal file
@ -0,0 +1,84 @@
|
||||
use std::{error::Error, fmt::Write, future::Future, pin::Pin};
|
||||
|
||||
use napi::Status;
|
||||
use reqwest::Response;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error, Clone, Eq, PartialEq)]
|
||||
pub enum HttpRemoteCacheErrors {
|
||||
#[error("Unauthorized: {0}")]
|
||||
Unauthorized(String),
|
||||
#[error("Misconfigured remote cache endpoint: {0}")]
|
||||
Misconfigured(String),
|
||||
#[error("Failed to send request: {0}")]
|
||||
RequestError(String),
|
||||
}
|
||||
|
||||
pub type AsyncHttpRemoteCacheErrors = Pin<Box<dyn Future<Output = HttpRemoteCacheErrors>>>;
|
||||
|
||||
pub fn report_request_error(mut err: &dyn Error) -> String {
|
||||
let mut s = format!("{}", err);
|
||||
while let Some(src) = err.source() {
|
||||
let _ = write!(s, "\n\nCaused by: {}", src);
|
||||
err = src;
|
||||
}
|
||||
s
|
||||
}
|
||||
|
||||
pub async fn convert_response_to_error(response: Response) -> HttpRemoteCacheErrors {
|
||||
match response.status() {
|
||||
reqwest::StatusCode::UNAUTHORIZED => {
|
||||
if response
|
||||
.headers()
|
||||
.get(reqwest::header::CONTENT_TYPE)
|
||||
.map(|v| v == "text/plain")
|
||||
.unwrap_or(false)
|
||||
{
|
||||
HttpRemoteCacheErrors::Unauthorized(
|
||||
response
|
||||
.text()
|
||||
.await
|
||||
.unwrap_or_else(|_| "Unauthorized".to_string()),
|
||||
)
|
||||
} else {
|
||||
HttpRemoteCacheErrors::Misconfigured(
|
||||
"Requests should respond with text/plain on 401s".to_string(),
|
||||
)
|
||||
}
|
||||
}
|
||||
_ => HttpRemoteCacheErrors::Misconfigured(format!(
|
||||
"Unexpected response status: {}",
|
||||
response.status()
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<str> for HttpRemoteCacheErrors {
|
||||
fn as_ref(&self) -> &str {
|
||||
match self {
|
||||
HttpRemoteCacheErrors::Unauthorized(_) => "Unauthorized",
|
||||
HttpRemoteCacheErrors::Misconfigured(_) => "Misconfigured",
|
||||
HttpRemoteCacheErrors::RequestError(_) => "RequestError",
|
||||
// _ => "Error",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<HttpRemoteCacheErrors> for napi::Error<HttpRemoteCacheErrors> {
|
||||
fn from(err: HttpRemoteCacheErrors) -> Self {
|
||||
napi::Error::new(err.clone(), err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
// we need to implement this conversion to Status because napi::Error only accepts Status
|
||||
// waiting for this to close https://github.com/napi-rs/napi-rs/issues/2178#issuecomment-2401184010
|
||||
impl From<HttpRemoteCacheErrors> for napi::Error {
|
||||
fn from(err: HttpRemoteCacheErrors) -> Self {
|
||||
let status = match err {
|
||||
HttpRemoteCacheErrors::Unauthorized(_) => Status::GenericFailure,
|
||||
HttpRemoteCacheErrors::Misconfigured(_) => Status::InvalidArg,
|
||||
_ => Status::GenericFailure,
|
||||
};
|
||||
napi::Error::new(status, err.to_string())
|
||||
}
|
||||
}
|
||||
246
packages/nx/src/native/cache/http_remote_cache.rs
vendored
Normal file
246
packages/nx/src/native/cache/http_remote_cache.rs
vendored
Normal file
@ -0,0 +1,246 @@
|
||||
use std::{
|
||||
env,
|
||||
error::Error,
|
||||
fs::{self},
|
||||
io::Read,
|
||||
path::Path,
|
||||
};
|
||||
|
||||
use super::{
|
||||
cache::CachedResult,
|
||||
errors::{convert_response_to_error, report_request_error, HttpRemoteCacheErrors},
|
||||
};
|
||||
use flate2::Compression;
|
||||
use reqwest::{header, Client, ClientBuilder, StatusCode};
|
||||
use tar::{Archive, Builder};
|
||||
use tracing::trace;
|
||||
|
||||
#[napi]
|
||||
pub struct HttpRemoteCache {
|
||||
client: Client,
|
||||
url: String,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
impl HttpRemoteCache {
|
||||
#[napi(constructor)]
|
||||
pub fn new() -> Self {
|
||||
let mut headers = header::HeaderMap::new();
|
||||
let auth_token = env::var("NX_SELF_HOSTED_REMOTE_CACHE_ACCESS_TOKEN");
|
||||
if let Ok(token) = auth_token {
|
||||
headers.insert(
|
||||
header::AUTHORIZATION,
|
||||
header::HeaderValue::from_str(&format!("Bearer {}", token))
|
||||
.expect("from_str should not throw here."),
|
||||
);
|
||||
}
|
||||
|
||||
headers.insert(
|
||||
header::CONTENT_TYPE,
|
||||
header::HeaderValue::from_static("application/octet-stream"),
|
||||
);
|
||||
|
||||
let mut client_builder = ClientBuilder::new().default_headers(headers);
|
||||
|
||||
let env_accept_unauthorized = env::var("NODE_TLS_REJECT_UNAUTHORIZED");
|
||||
if let Ok(env_accept_unauthorized) = env_accept_unauthorized {
|
||||
if env_accept_unauthorized == "0" {
|
||||
client_builder = client_builder.danger_accept_invalid_certs(true);
|
||||
}
|
||||
}
|
||||
|
||||
HttpRemoteCache {
|
||||
client: client_builder
|
||||
.build()
|
||||
.expect("Failed to create HTTP client"),
|
||||
url: env::var("NX_SELF_HOSTED_REMOTE_CACHE_SERVER")
|
||||
.expect("NX_REMOTE_CACHE_URL must be set"),
|
||||
}
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async fn retrieve(
|
||||
&self,
|
||||
hash: String,
|
||||
cache_directory: String,
|
||||
) -> napi::Result<Option<CachedResult>> {
|
||||
let span = tracing::trace_span!("retrieve", hash = %hash);
|
||||
let _guard = span.enter();
|
||||
|
||||
let url: String = format!("{}/v1/cache/{}", self.url, hash);
|
||||
let response = self.client.get(&url).send().await;
|
||||
if let Ok(resp) = response {
|
||||
trace!("HTTP response status: {}", resp.status());
|
||||
let status = resp.status();
|
||||
|
||||
match status {
|
||||
StatusCode::OK => {
|
||||
Ok(Some(
|
||||
// response is an application/octet-stream containing a tarball
|
||||
// we need to extract the tarball and return the path to the extracted files
|
||||
Self::download_and_extract_from_result(resp, cache_directory, hash).await?,
|
||||
))
|
||||
}
|
||||
StatusCode::NOT_FOUND => Ok(None),
|
||||
_ => Err(convert_response_to_error(resp).await.into()),
|
||||
}
|
||||
} else {
|
||||
Err(HttpRemoteCacheErrors::RequestError(response.unwrap_err().to_string()).into())
|
||||
}
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async fn store(
|
||||
&self,
|
||||
hash: String,
|
||||
cache_directory: String,
|
||||
terminal_output: String,
|
||||
code: u32,
|
||||
) -> napi::Result<bool> {
|
||||
let span = tracing::trace_span!("store", hash = %hash);
|
||||
let _guard = span.enter();
|
||||
|
||||
// We can change the creation of the tar in a future version without
|
||||
// worrying about breaking existing user cache's, because when the
|
||||
// user updates their task's hashes will be changed... so users
|
||||
// retrieving old hashes will not be affected, and new entries
|
||||
// will have distinct hashes.
|
||||
|
||||
// create a tarball in memory from the cache dir
|
||||
let tar_gz: Vec<u8> = Vec::new();
|
||||
let enc = flate2::write::GzEncoder::new(tar_gz, Compression::default());
|
||||
let mut archive = Builder::new(enc);
|
||||
trace!("Created tar file for writing");
|
||||
|
||||
let cache_path = Path::new(&cache_directory);
|
||||
let outputs_path = cache_path.join(&hash);
|
||||
|
||||
trace!("Adding cache artifacts to tarball");
|
||||
archive.append_dir_all("", &outputs_path)?;
|
||||
trace!("Added cache directory to tarball");
|
||||
|
||||
trace!("Adding terminal output to tarball");
|
||||
let mut terminal_output_header = tar::Header::new_old();
|
||||
let terminal_output_bytes = terminal_output.as_bytes();
|
||||
terminal_output_header.set_size(terminal_output_bytes.len() as u64);
|
||||
terminal_output_header.set_cksum(); // Ensure the checksum is set correctly
|
||||
archive.append_data(
|
||||
&mut terminal_output_header,
|
||||
"terminalOutput",
|
||||
terminal_output_bytes,
|
||||
)?;
|
||||
trace!("Added terminal output to tarball");
|
||||
|
||||
trace!("Adding code to tarball");
|
||||
let mut code_header = tar::Header::new_old();
|
||||
code_header.set_size(4);
|
||||
code_header.set_cksum(); // Ensure the checksum is set correctly
|
||||
archive.append_data(&mut code_header, "code", &code.to_be_bytes()[..])?;
|
||||
trace!("Added code to tarball");
|
||||
|
||||
trace!("Finishing tarball");
|
||||
archive
|
||||
.finish() // Finish the archive to get the inner bytes
|
||||
.map_err(|e| anyhow::anyhow!(format!("Failed to finish tarball: {}", e)))?;
|
||||
trace!("Finished tarball");
|
||||
|
||||
trace!("Reading tarball into memory");
|
||||
let archive_bytes = archive.into_inner()?;
|
||||
let buffer = archive_bytes.finish()?;
|
||||
trace!("read tarball into memory");
|
||||
|
||||
let url: String = format!("{}/v1/cache/{}", self.url, hash);
|
||||
let response = self
|
||||
.client
|
||||
.put(&url)
|
||||
.body(buffer) // Convert the bytes to a Vec<u8> for the request body
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
napi::Error::from(HttpRemoteCacheErrors::RequestError(report_request_error(
|
||||
&e,
|
||||
)))
|
||||
})?;
|
||||
|
||||
match response.status() {
|
||||
StatusCode::OK => Ok(true),
|
||||
// Cache entry already exists, silently do not store new data
|
||||
StatusCode::CONFLICT => Ok(false),
|
||||
// User is authorized but server does not allow
|
||||
// cache storage for whatever reason (e.g. read-only token.)
|
||||
StatusCode::FORBIDDEN => Ok(false),
|
||||
_ => Err(convert_response_to_error(response).await.into()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn download_and_extract_from_result(
|
||||
response: reqwest::Response,
|
||||
cache_directory: String,
|
||||
hash: String,
|
||||
) -> anyhow::Result<CachedResult> {
|
||||
let content = response.bytes().await.unwrap();
|
||||
trace!("Downloaded {} bytes from remote cache", content.len());
|
||||
let tar = flate2::read::GzDecoder::new(content.as_ref());
|
||||
let mut archive = Archive::new(tar);
|
||||
let entries = archive
|
||||
.entries() // Get the entries in the archive
|
||||
.map_err(|_| anyhow::anyhow!("Failed to read entries from tarball"))?;
|
||||
|
||||
let mut code: Option<i16> = None;
|
||||
let mut terminal_output: Option<String> = None;
|
||||
let mut size: i64 = 0;
|
||||
|
||||
let output_dir = Path::new(&cache_directory).join(&hash);
|
||||
|
||||
// Extract the archive to the specified cache directory
|
||||
for entry in entries {
|
||||
let mut entry =
|
||||
entry.map_err(|_| anyhow::anyhow!("Failed to read entry from tarball"))?;
|
||||
|
||||
let entry_path = entry
|
||||
.path()
|
||||
.map(|p| p.to_string_lossy().into_owned())
|
||||
.unwrap();
|
||||
|
||||
if entry_path == "code" {
|
||||
let code_file_bytes = entry.bytes().collect::<Result<Vec<u8>, _>>()?;
|
||||
code = Some(i16::from_be_bytes([code_file_bytes[0], code_file_bytes[1]]));
|
||||
trace!("Retrieved exit code from cache: {}", code.unwrap());
|
||||
} else if entry_path == "terminalOutput" {
|
||||
let terminal_output_bytes = entry.bytes().collect::<Result<Vec<u8>, _>>()?;
|
||||
let terminal_output_size = terminal_output_bytes.len();
|
||||
|
||||
terminal_output = Some(String::from_utf8(terminal_output_bytes)?);
|
||||
size += terminal_output_size as i64;
|
||||
|
||||
trace!(
|
||||
"Retrieved terminal output from cache: {} bytes",
|
||||
terminal_output_size
|
||||
);
|
||||
} else {
|
||||
let path_on_disk = output_dir.join(entry_path);
|
||||
trace!("Extracting entry to {}", path_on_disk.display());
|
||||
fs::create_dir_all(path_on_disk.parent().expect("This will have a parent, we just joined it above so there is at least one dir."))?;
|
||||
// Ensure the directory exists before extracting
|
||||
match entry.unpack(&path_on_disk) {
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!("Failed to unpack entry: {}", e));
|
||||
}
|
||||
Ok(f) => match f {
|
||||
tar::Unpacked::File(f) => size += f.metadata()?.len() as i64,
|
||||
_ => (),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Extracted tarball to {}", output_dir.display());
|
||||
|
||||
Ok(CachedResult {
|
||||
terminal_output,
|
||||
code: code.expect("Exit code not found in cache"),
|
||||
outputs_path: output_dir.to_string_lossy().into_owned(),
|
||||
size: Some(size),
|
||||
})
|
||||
}
|
||||
}
|
||||
5
packages/nx/src/native/cache/mod.rs
vendored
5
packages/nx/src/native/cache/mod.rs
vendored
@ -1,6 +1,9 @@
|
||||
pub mod errors;
|
||||
pub mod expand_outputs;
|
||||
pub mod file_ops;
|
||||
pub mod validate_outputs;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub mod cache;
|
||||
pub mod cache;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub mod http_remote_cache;
|
||||
|
||||
6
packages/nx/src/native/index.d.ts
vendored
6
packages/nx/src/native/index.d.ts
vendored
@ -28,6 +28,12 @@ export declare class HashPlanner {
|
||||
getPlansReference(taskIds: Array<string>, taskGraph: TaskGraph): ExternalObject<Record<string, Array<HashInstruction>>>
|
||||
}
|
||||
|
||||
export declare class HttpRemoteCache {
|
||||
constructor()
|
||||
retrieve(hash: string, cacheDirectory: string): Promise<CachedResult | null>
|
||||
store(hash: string, cacheDirectory: string, terminalOutput: string, code: number): Promise<boolean>
|
||||
}
|
||||
|
||||
export declare class ImportResult {
|
||||
file: string
|
||||
sourceProject: string
|
||||
|
||||
@ -364,6 +364,7 @@ if (!nativeBinding) {
|
||||
module.exports.ChildProcess = nativeBinding.ChildProcess
|
||||
module.exports.FileLock = nativeBinding.FileLock
|
||||
module.exports.HashPlanner = nativeBinding.HashPlanner
|
||||
module.exports.HttpRemoteCache = nativeBinding.HttpRemoteCache
|
||||
module.exports.ImportResult = nativeBinding.ImportResult
|
||||
module.exports.NxCache = nativeBinding.NxCache
|
||||
module.exports.NxTaskHistory = nativeBinding.NxTaskHistory
|
||||
|
||||
@ -88,13 +88,13 @@ function __napi_rs_initialize_modules(__napiInstance) {
|
||||
__napiInstance.exports['__napi_register__FileLock_struct_39']?.()
|
||||
__napiInstance.exports['__napi_register__FileLock_impl_41']?.()
|
||||
__napiInstance.exports['__napi_register__WorkspaceContext_struct_42']?.()
|
||||
__napiInstance.exports['__napi_register__WorkspaceContext_impl_51']?.()
|
||||
__napiInstance.exports['__napi_register__WorkspaceErrors_52']?.()
|
||||
__napiInstance.exports['__napi_register__NxWorkspaceFiles_struct_53']?.()
|
||||
__napiInstance.exports['__napi_register__NxWorkspaceFilesExternals_struct_54']?.()
|
||||
__napiInstance.exports['__napi_register__UpdatedWorkspaceFiles_struct_55']?.()
|
||||
__napiInstance.exports['__napi_register__FileMap_struct_56']?.()
|
||||
__napiInstance.exports['__napi_register____test_only_transfer_file_map_57']?.()
|
||||
__napiInstance.exports['__napi_register__WorkspaceContext_impl_53']?.()
|
||||
__napiInstance.exports['__napi_register__WorkspaceErrors_54']?.()
|
||||
__napiInstance.exports['__napi_register__NxWorkspaceFiles_struct_55']?.()
|
||||
__napiInstance.exports['__napi_register__NxWorkspaceFilesExternals_struct_56']?.()
|
||||
__napiInstance.exports['__napi_register__UpdatedWorkspaceFiles_struct_57']?.()
|
||||
__napiInstance.exports['__napi_register__FileMap_struct_58']?.()
|
||||
__napiInstance.exports['__napi_register____test_only_transfer_file_map_59']?.()
|
||||
}
|
||||
export const FileLock = __napiModule.exports.FileLock
|
||||
export const HashPlanner = __napiModule.exports.HashPlanner
|
||||
|
||||
@ -119,13 +119,13 @@ function __napi_rs_initialize_modules(__napiInstance) {
|
||||
__napiInstance.exports['__napi_register__FileLock_struct_39']?.()
|
||||
__napiInstance.exports['__napi_register__FileLock_impl_41']?.()
|
||||
__napiInstance.exports['__napi_register__WorkspaceContext_struct_42']?.()
|
||||
__napiInstance.exports['__napi_register__WorkspaceContext_impl_51']?.()
|
||||
__napiInstance.exports['__napi_register__WorkspaceErrors_52']?.()
|
||||
__napiInstance.exports['__napi_register__NxWorkspaceFiles_struct_53']?.()
|
||||
__napiInstance.exports['__napi_register__NxWorkspaceFilesExternals_struct_54']?.()
|
||||
__napiInstance.exports['__napi_register__UpdatedWorkspaceFiles_struct_55']?.()
|
||||
__napiInstance.exports['__napi_register__FileMap_struct_56']?.()
|
||||
__napiInstance.exports['__napi_register____test_only_transfer_file_map_57']?.()
|
||||
__napiInstance.exports['__napi_register__WorkspaceContext_impl_53']?.()
|
||||
__napiInstance.exports['__napi_register__WorkspaceErrors_54']?.()
|
||||
__napiInstance.exports['__napi_register__NxWorkspaceFiles_struct_55']?.()
|
||||
__napiInstance.exports['__napi_register__NxWorkspaceFilesExternals_struct_56']?.()
|
||||
__napiInstance.exports['__napi_register__UpdatedWorkspaceFiles_struct_57']?.()
|
||||
__napiInstance.exports['__napi_register__FileMap_struct_58']?.()
|
||||
__napiInstance.exports['__napi_register____test_only_transfer_file_map_59']?.()
|
||||
}
|
||||
module.exports.FileLock = __napiModule.exports.FileLock
|
||||
module.exports.HashPlanner = __napiModule.exports.HashPlanner
|
||||
|
||||
@ -17,6 +17,7 @@ import {
|
||||
CachedResult as NativeCacheResult,
|
||||
IS_WASM,
|
||||
getDefaultMaxCacheSize,
|
||||
HttpRemoteCache,
|
||||
} from '../native';
|
||||
import { getDbConnection } from '../utils/db-connection';
|
||||
import { isNxCloudUsed } from '../utils/nx-cloud-utils';
|
||||
@ -241,6 +242,7 @@ export class DbCache {
|
||||
(await this.getSharedCache()) ??
|
||||
(await this.getGcsCache()) ??
|
||||
(await this.getAzureCache()) ??
|
||||
this.getHttpCache() ??
|
||||
null
|
||||
);
|
||||
}
|
||||
@ -270,6 +272,19 @@ export class DbCache {
|
||||
return this.resolveRemoteCache('@nx/powerpack-azure-cache');
|
||||
}
|
||||
|
||||
private getHttpCache(): RemoteCacheV2 | null {
|
||||
if (process.env.NX_SELF_HOSTED_REMOTE_CACHE_SERVER) {
|
||||
if (IS_WASM) {
|
||||
logger.warn(
|
||||
'The HTTP remote cache is not yet supported in the wasm build of Nx.'
|
||||
);
|
||||
return null;
|
||||
}
|
||||
return new HttpRemoteCache();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private async resolveRemoteCache(pkg: string): Promise<RemoteCacheV2 | null> {
|
||||
let getRemoteCache = null;
|
||||
try {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user