Skip to content

Commit a178189

Browse files
committed
Added service client authentication and improved modularity
1 parent 0b456d3 commit a178189

File tree

12 files changed

+1028
-731
lines changed

12 files changed

+1028
-731
lines changed

Cargo.lock

+41
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-config/examples/gcs_backend.json5

+4-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
},
1919
"slow": {
2020
"experimental_gcs_store": {
21-
"project_id": "inbound-entity-447014-k2",
21+
"service_email": "[email protected]",
2222
"bucket": "test-bucket-aman-nativelink",
2323
"key_prefix": "test-prefix-index/",
2424
"retry": {
@@ -49,7 +49,7 @@
4949
},
5050
"slow": {
5151
"experimental_gcs_store": {
52-
"project_id": "inbound-entity-447014-k2",
52+
"service_email": "[email protected]",
5353
"bucket": "test-bucket-aman-nativelink",
5454
"key_prefix": "test-prefix-dedup-cas/",
5555
"retry": {
@@ -88,7 +88,7 @@
8888
},
8989
"slow": {
9090
"experimental_gcs_store": {
91-
"project_id": "inbound-entity-447014-k2",
91+
"service_email": "[email protected]",
9292
// Name of the bucket to upload to.
9393
"bucket": "test-bucket-aman-nativelink",
9494
"key_prefix": "test-prefix-ac/",
@@ -166,3 +166,4 @@
166166
}
167167
}]
168168
}
169+

nativelink-config/src/stores.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,7 @@ pub struct S3Spec {
813813
pub struct GcsSpec {
814814
/// Project ID for the GCS service
815815
#[serde(default, deserialize_with = "convert_string_with_shellexpand")]
816-
pub project_id: String,
816+
pub service_email: String,
817817

818818
/// Bucket name to use as the backend
819819
#[serde(default, deserialize_with = "convert_string_with_shellexpand")]

nativelink-store/BUILD.bazel

+5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ rust_library(
1818
"src/existence_cache_store.rs",
1919
"src/fast_slow_store.rs",
2020
"src/filesystem_store.rs",
21+
"src/gcs_client/auth.rs",
22+
"src/gcs_client/client.rs",
23+
"src/gcs_client/grpc_client.rs",
24+
"src/gcs_client/mod.rs",
2125
"src/gcs_store.rs",
2226
"src/grpc_store.rs",
2327
"src/lib.rs",
@@ -58,6 +62,7 @@ rust_library(
5862
"@crates//:fred",
5963
"@crates//:futures",
6064
"@crates//:googleapis-tonic-google-storage-v2",
65+
"@crates//:jsonwebtoken",
6166
"@crates//:hex",
6267
"@crates//:http-body",
6368
"@crates//:hyper-0.14.31",

nativelink-store/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ fred = { version = "10.0.1", default-features = false, features = [
4040
"subscriber-client",
4141
] }
4242
googleapis-tonic-google-storage-v2 = "0.17.0"
43+
jsonwebtoken = "9.3.0"
4344
patricia_tree = { version = "0.8.0", default-features = false }
4445
futures = { version = "0.3.31", default-features = false }
4546
hex = { version = "0.4.3", default-features = false }
+201
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
// Copyright 2024 The NativeLink Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
16+
17+
use jsonwebtoken::{encode, Algorithm, EncodingKey, Header};
18+
use nativelink_config::stores::GcsSpec;
19+
use nativelink_error::{make_err, Code, Error};
20+
use rand::Rng;
21+
use serde::Serialize;
22+
use tokio::sync::{Mutex, RwLock};
23+
24+
const SCOPE: &str = "https://www.googleapis.com/auth/cloud-platform";
25+
const AUDIENCE: &str = "https://storage.googleapis.com/";
26+
const TOKEN_LIFETIME: Duration = Duration::from_secs(3600); // 1 hour
27+
const REFRESH_WINDOW: Duration = Duration::from_secs(300); // 5 minutes
28+
const MAX_REFRESH_ATTEMPTS: u32 = 3;
29+
const RETRY_DELAY_BASE: Duration = Duration::from_secs(1);
30+
31+
const ENV_PRIVATE_KEY: &str = "GCS_PRIVATE_KEY";
32+
const ENV_AUTH_TOKEN: &str = "GOOGLE_AUTH_TOKEN";
33+
34+
#[derive(Debug, Serialize)]
35+
struct JwtClaims {
36+
iss: String,
37+
sub: String,
38+
aud: String,
39+
iat: u64,
40+
exp: u64,
41+
scope: String,
42+
}
43+
44+
#[derive(Clone)]
45+
struct TokenInfo {
46+
token: String,
47+
refresh_at: u64, // Timestamp when token should be refreshed
48+
}
49+
50+
pub struct GcsAuth {
51+
token_cache: RwLock<Option<TokenInfo>>,
52+
refresh_lock: Mutex<()>,
53+
service_email: String,
54+
private_key: String,
55+
}
56+
57+
impl GcsAuth {
58+
pub async fn new(spec: &GcsSpec) -> Result<Self, Error> {
59+
// First try to get direct token from environment
60+
if let Ok(token) = std::env::var(ENV_AUTH_TOKEN) {
61+
let now = SystemTime::now()
62+
.duration_since(UNIX_EPOCH)
63+
.map_err(|e| make_err!(Code::Internal, "Failed to get system time: {}", e))?
64+
.as_secs();
65+
66+
return Ok(Self {
67+
token_cache: RwLock::new(Some(TokenInfo {
68+
token,
69+
refresh_at: now + TOKEN_LIFETIME.as_secs() - REFRESH_WINDOW.as_secs(),
70+
})),
71+
refresh_lock: Mutex::new(()),
72+
service_email: String::new(),
73+
private_key: String::new(),
74+
});
75+
}
76+
77+
let service_email = spec.service_email.clone();
78+
79+
// Get private key from environment
80+
let private_key = std::env::var(ENV_PRIVATE_KEY).map_err(|_| {
81+
make_err!(
82+
Code::NotFound,
83+
"Environment variable {} not found",
84+
ENV_PRIVATE_KEY
85+
)
86+
})?;
87+
88+
Ok(Self {
89+
token_cache: RwLock::new(None),
90+
refresh_lock: Mutex::new(()),
91+
service_email,
92+
private_key,
93+
})
94+
}
95+
96+
fn add_jitter(duration: Duration) -> Duration {
97+
let jitter = rand::thread_rng().gen_range(-5..=5);
98+
duration.saturating_add(Duration::from_secs_f64(f64::from(jitter) * 0.1))
99+
}
100+
101+
async fn generate_token(&self) -> Result<TokenInfo, Error> {
102+
let now = SystemTime::now()
103+
.duration_since(UNIX_EPOCH)
104+
.map_err(|e| make_err!(Code::Internal, "Failed to get system time: {}", e))?
105+
.as_secs();
106+
107+
let expiry = now + TOKEN_LIFETIME.as_secs();
108+
let refresh_at = expiry - REFRESH_WINDOW.as_secs();
109+
110+
let claims = JwtClaims {
111+
iss: self.service_email.clone(),
112+
sub: self.service_email.clone(),
113+
aud: AUDIENCE.to_string(),
114+
iat: now,
115+
exp: expiry,
116+
scope: SCOPE.to_string(),
117+
};
118+
119+
let header = Header::new(Algorithm::RS256);
120+
let key = EncodingKey::from_rsa_pem(self.private_key.as_bytes())
121+
.map_err(|e| make_err!(Code::Internal, "Failed to create encoding key: {}", e))?;
122+
123+
let token = encode(&header, &claims, &key)
124+
.map_err(|e| make_err!(Code::Internal, "Failed to encode JWT: {}", e))?;
125+
126+
Ok(TokenInfo { token, refresh_at })
127+
}
128+
129+
async fn refresh_token(&self) -> Result<TokenInfo, Error> {
130+
let mut attempt = 0;
131+
loop {
132+
match self.generate_token().await {
133+
Ok(token_info) => return Ok(token_info),
134+
Err(e) => {
135+
attempt += 1;
136+
if attempt >= MAX_REFRESH_ATTEMPTS {
137+
return Err(make_err!(
138+
Code::Internal,
139+
"Failed to refresh token after {} attempts: {}",
140+
MAX_REFRESH_ATTEMPTS,
141+
e
142+
));
143+
}
144+
let delay = Self::add_jitter(RETRY_DELAY_BASE * (2_u32.pow(attempt - 1)));
145+
tokio::time::sleep(delay).await;
146+
}
147+
}
148+
}
149+
}
150+
151+
pub async fn get_valid_token(&self) -> Result<String, Error> {
152+
if let Some(token_info) = self.token_cache.read().await.as_ref() {
153+
let now = SystemTime::now()
154+
.duration_since(UNIX_EPOCH)
155+
.map_err(|e| make_err!(Code::Internal, "Failed to get system time: {}", e))?
156+
.as_secs();
157+
158+
if now < token_info.refresh_at {
159+
return Ok(token_info.token.clone());
160+
}
161+
}
162+
163+
let _refresh_guard = self.refresh_lock.lock().await;
164+
165+
if let Some(token_info) = self.token_cache.read().await.as_ref() {
166+
let now = SystemTime::now()
167+
.duration_since(UNIX_EPOCH)
168+
.map_err(|e| make_err!(Code::Internal, "Failed to get system time: {}", e))?
169+
.as_secs();
170+
171+
if now < token_info.refresh_at {
172+
return Ok(token_info.token.clone());
173+
}
174+
}
175+
176+
let token_info = if self.private_key.is_empty() {
177+
if let Ok(token) = std::env::var(ENV_AUTH_TOKEN) {
178+
let now = SystemTime::now()
179+
.duration_since(UNIX_EPOCH)
180+
.map_err(|e| make_err!(Code::Internal, "Failed to get system time: {}", e))?
181+
.as_secs();
182+
183+
TokenInfo {
184+
token,
185+
refresh_at: now + TOKEN_LIFETIME.as_secs() - REFRESH_WINDOW.as_secs(),
186+
}
187+
} else {
188+
return Err(make_err!(
189+
Code::Unauthenticated,
190+
"No valid authentication method available"
191+
));
192+
}
193+
} else {
194+
self.refresh_token().await?
195+
};
196+
197+
*self.token_cache.write().await = Some(token_info.clone());
198+
199+
Ok(token_info.token)
200+
}
201+
}

0 commit comments

Comments
 (0)