Skip to content

Commit dc1383d

Browse files
authored
fix: auth flow for query and permission assignment for ListStream (#1048)
--------- Co-authored-by: Nikhil Sinha <[email protected]>
1 parent 6fc4a5b commit dc1383d

File tree

10 files changed

+131
-150
lines changed

10 files changed

+131
-150
lines changed

src/correlation/correlation_utils.rs

-62
This file was deleted.

src/correlation/mod.rs

+27-11
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818

1919
use std::collections::HashSet;
2020

21-
use actix_web::http::header::ContentType;
21+
use actix_web::{http::header::ContentType, Error};
2222
use chrono::Utc;
23-
use correlation_utils::user_auth_for_query;
2423
use datafusion::error::DataFusionError;
2524
use http::StatusCode;
2625
use itertools::Itertools;
@@ -31,12 +30,15 @@ use tokio::sync::RwLock;
3130
use tracing::{error, trace, warn};
3231

3332
use crate::{
34-
handlers::http::rbac::RBACError, option::CONFIG, query::QUERY_SESSION, rbac::map::SessionKey,
35-
storage::ObjectStorageError, users::filters::FilterQuery, utils::get_hash,
33+
handlers::http::rbac::RBACError,
34+
option::CONFIG,
35+
query::QUERY_SESSION,
36+
rbac::{map::SessionKey, Users},
37+
storage::ObjectStorageError,
38+
users::filters::FilterQuery,
39+
utils::{get_hash, user_auth_for_query},
3640
};
3741

38-
pub mod correlation_utils;
39-
4042
pub static CORRELATIONS: Lazy<Correlation> = Lazy::new(Correlation::default);
4143

4244
#[derive(Debug, Default)]
@@ -77,11 +79,15 @@ impl Correlation {
7779
let correlations = self.0.read().await.iter().cloned().collect_vec();
7880

7981
let mut user_correlations = vec![];
82+
let permissions = Users.get_permissions(session_key);
83+
8084
for c in correlations {
81-
if user_auth_for_query(session_key, &c.table_configs)
82-
.await
83-
.is_ok()
84-
{
85+
let tables = &c
86+
.table_configs
87+
.iter()
88+
.map(|t| t.table_name.clone())
89+
.collect_vec();
90+
if user_auth_for_query(&permissions, tables).is_ok() {
8591
user_correlations.push(c);
8692
}
8793
}
@@ -220,7 +226,14 @@ impl CorrelationRequest {
220226
}
221227

222228
// check if user has access to table
223-
user_auth_for_query(session_key, &self.table_configs).await?;
229+
let permissions = Users.get_permissions(session_key);
230+
let tables = &self
231+
.table_configs
232+
.iter()
233+
.map(|t| t.table_name.clone())
234+
.collect_vec();
235+
236+
user_auth_for_query(&permissions, tables)?;
224237

225238
// to validate table config, we need to check whether the mentioned fields
226239
// are present in the table or not
@@ -271,6 +284,8 @@ pub enum CorrelationError {
271284
Unauthorized,
272285
#[error("DataFusion Error: {0}")]
273286
DataFusion(#[from] DataFusionError),
287+
#[error("{0}")]
288+
ActixError(#[from] Error),
274289
}
275290

276291
impl actix_web::ResponseError for CorrelationError {
@@ -283,6 +298,7 @@ impl actix_web::ResponseError for CorrelationError {
283298
Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
284299
Self::Unauthorized => StatusCode::BAD_REQUEST,
285300
Self::DataFusion(_) => StatusCode::INTERNAL_SERVER_ERROR,
301+
Self::ActixError(_) => StatusCode::BAD_REQUEST,
286302
}
287303
}
288304

src/handlers/airplane.rs

+6-7
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};
3434
use tonic_web::GrpcWebLayer;
3535

3636
use crate::handlers::http::cluster::get_ingestor_info;
37-
use crate::handlers::http::query::{
38-
authorize_and_set_filter_tags, into_query, update_schema_when_distributed,
39-
};
37+
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
4038
use crate::handlers::livetail::cross_origin_config;
4139
use crate::metrics::QUERY_EXECUTE_TIME;
4240
use crate::option::CONFIG;
@@ -46,6 +44,7 @@ use crate::utils::arrow::flight::{
4644
send_to_ingester,
4745
};
4846
use crate::utils::time::TimeRange;
47+
use crate::utils::user_auth_for_query;
4948
use arrow_flight::{
5049
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
5150
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
@@ -157,12 +156,12 @@ impl FlightService for AirServiceImpl {
157156
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
158157
.to_owned();
159158

160-
update_schema_when_distributed(streams)
159+
update_schema_when_distributed(&streams)
161160
.await
162161
.map_err(|err| Status::internal(err.to_string()))?;
163162

164163
// map payload to query
165-
let mut query = into_query(&ticket, &session_state, time_range)
164+
let query = into_query(&ticket, &session_state, time_range)
166165
.await
167166
.map_err(|_| Status::internal("Failed to parse query"))?;
168167

@@ -202,7 +201,7 @@ impl FlightService for AirServiceImpl {
202201
rbac::Response::Authorized => (),
203202
rbac::Response::UnAuthorized => {
204203
return Err(Status::permission_denied(
205-
"user is not authenticated to access this resource",
204+
"user is not authorized to access this resource",
206205
))
207206
}
208207
rbac::Response::ReloadRequired => {
@@ -212,7 +211,7 @@ impl FlightService for AirServiceImpl {
212211

213212
let permissions = Users.get_permissions(&key);
214213

215-
authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| {
214+
user_auth_for_query(&permissions, &streams).map_err(|_| {
216215
Status::permission_denied("User Does not have permission to access this")
217216
})?;
218217
let time = Instant::now();

src/handlers/http/correlation.rs

+31-14
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,17 @@
1818

1919
use actix_web::{web, HttpRequest, Responder};
2020
use bytes::Bytes;
21+
use itertools::Itertools;
2122
use relative_path::RelativePathBuf;
2223

24+
use crate::rbac::Users;
25+
use crate::utils::user_auth_for_query;
2326
use crate::{
2427
option::CONFIG, storage::CORRELATIONS_ROOT_DIRECTORY,
2528
utils::actix::extract_session_key_from_req,
2629
};
2730

28-
use crate::correlation::{
29-
correlation_utils::user_auth_for_query, CorrelationConfig, CorrelationError,
30-
CorrelationRequest, CORRELATIONS,
31-
};
31+
use crate::correlation::{CorrelationConfig, CorrelationError, CorrelationRequest, CORRELATIONS};
3232

3333
pub async fn list(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
3434
let session_key = extract_session_key_from_req(&req)
@@ -52,14 +52,17 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
5252

5353
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
5454

55-
if user_auth_for_query(&session_key, &correlation.table_configs)
56-
.await
57-
.is_ok()
58-
{
59-
Ok(web::Json(correlation))
60-
} else {
61-
Err(CorrelationError::Unauthorized)
62-
}
55+
let permissions = Users.get_permissions(&session_key);
56+
57+
let tables = &correlation
58+
.table_configs
59+
.iter()
60+
.map(|t| t.table_name.clone())
61+
.collect_vec();
62+
63+
user_auth_for_query(&permissions, tables)?;
64+
65+
Ok(web::Json(correlation))
6366
}
6467

6568
pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, CorrelationError> {
@@ -93,7 +96,14 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
9396

9497
// validate whether user has access to this correlation object or not
9598
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
96-
user_auth_for_query(&session_key, &correlation.table_configs).await?;
99+
let permissions = Users.get_permissions(&session_key);
100+
let tables = &correlation
101+
.table_configs
102+
.iter()
103+
.map(|t| t.table_name.clone())
104+
.collect_vec();
105+
106+
user_auth_for_query(&permissions, tables)?;
97107

98108
let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
99109
correlation_request.validate(&session_key).await?;
@@ -122,7 +132,14 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError
122132
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
123133

124134
// validate user's query auth
125-
user_auth_for_query(&session_key, &correlation.table_configs).await?;
135+
let permissions = Users.get_permissions(&session_key);
136+
let tables = &correlation
137+
.table_configs
138+
.iter()
139+
.map(|t| t.table_name.clone())
140+
.collect_vec();
141+
142+
user_auth_for_query(&permissions, tables)?;
126143

127144
// Delete from disk
128145
let store = CONFIG.storage().get_object_store();

src/handlers/http/logstream.rs

+20-5
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@ use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
3232
use crate::metadata::{SchemaVersion, STREAM_INFO};
3333
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
3434
use crate::option::{Mode, CONFIG};
35+
use crate::rbac::role::Action;
36+
use crate::rbac::Users;
3537
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
3638
use crate::storage::StreamType;
3739
use crate::storage::{retention::Retention, StorageDir, StreamInfo};
40+
use crate::utils::actix::extract_session_key_from_req;
3841
use crate::{event, stats};
3942

4043
use crate::{metadata, validator};
@@ -46,6 +49,7 @@ use arrow_schema::{Field, Schema};
4649
use bytes::Bytes;
4750
use chrono::Utc;
4851
use http::{HeaderName, HeaderValue};
52+
use itertools::Itertools;
4953
use serde_json::Value;
5054
use std::collections::HashMap;
5155
use std::fs;
@@ -86,16 +90,27 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
8690
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
8791
}
8892

89-
pub async fn list(_: HttpRequest) -> impl Responder {
90-
//list all streams from storage
93+
pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
94+
let key = extract_session_key_from_req(&req)
95+
.map_err(|err| StreamError::Anyhow(anyhow::Error::msg(err.to_string())))?;
96+
97+
// list all streams from storage
9198
let res = CONFIG
9299
.storage()
93100
.get_object_store()
94101
.list_streams()
95102
.await
96-
.unwrap();
103+
.unwrap()
104+
.into_iter()
105+
.filter(|logstream| {
106+
warn!("logstream-\n{logstream:?}");
107+
108+
Users.authorize(key.clone(), Action::ListStream, Some(&logstream.name), None)
109+
== crate::rbac::Response::Authorized
110+
})
111+
.collect_vec();
97112

98-
web::Json(res)
113+
Ok(web::Json(res))
99114
}
100115

101116
pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
@@ -130,7 +145,7 @@ pub async fn schema(req: HttpRequest) -> Result<impl Responder, StreamError> {
130145
}
131146
Err(err) => return Err(StreamError::from(err)),
132147
};
133-
match update_schema_when_distributed(vec![stream_name.clone()]).await {
148+
match update_schema_when_distributed(&vec![stream_name.clone()]).await {
134149
Ok(_) => {
135150
let schema = STREAM_INFO.schema(&stream_name)?;
136151
Ok((web::Json(schema), StatusCode::OK))

0 commit comments

Comments
 (0)