Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.

Commit ccb9ff0

Browse files
fix: run on_upstream_http_request hook in federation source (#440)
Co-authored-by: Dotan Simha <[email protected]>
1 parent 585c1ce commit ccb9ff0

File tree

10 files changed

+339
-249
lines changed

10 files changed

+339
-249
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libs/common/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod graphql;
33
pub mod http;
44
pub mod json;
55
pub mod plugin;
6+
pub mod plugin_manager;
67
pub mod serde_utils;
78
pub mod vrl_functions;
89
pub mod vrl_utils;

libs/common/src/plugin_manager.rs

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use crate::{
2+
execute::RequestExecutionContext,
3+
graphql::GraphQLRequest,
4+
http::{ConductorHttpRequest, ConductorHttpResponse},
5+
};
6+
use reqwest::Response;
7+
8+
#[async_trait::async_trait(?Send)]
9+
pub trait PluginManager: std::fmt::Debug + Send + Sync {
10+
async fn on_downstream_http_request(&self, context: &mut RequestExecutionContext);
11+
fn on_downstream_http_response(
12+
&self,
13+
context: &mut RequestExecutionContext,
14+
response: &mut ConductorHttpResponse,
15+
);
16+
async fn on_downstream_graphql_request(&self, context: &mut RequestExecutionContext);
17+
async fn on_upstream_graphql_request<'a>(&self, req: &mut GraphQLRequest);
18+
async fn on_upstream_http_request<'a>(
19+
&self,
20+
ctx: &mut RequestExecutionContext,
21+
request: &mut ConductorHttpRequest,
22+
);
23+
async fn on_upstream_http_response<'a>(
24+
&self,
25+
ctx: &mut RequestExecutionContext,
26+
response: &Result<Response, reqwest_middleware::Error>,
27+
);
28+
}

libs/engine/src/gateway.rs

+10-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use conductor_common::{
55
graphql::{ExtractGraphQLOperationError, GraphQLRequest, GraphQLResponse, ParsedGraphQLRequest},
66
http::{ConductorHttpRequest, ConductorHttpResponse, Url},
77
plugin::PluginError,
8+
plugin_manager::PluginManager,
89
};
910
use conductor_config::{ConductorConfig, EndpointDefinition, SourceDefinition};
1011
use conductor_tracing::{
@@ -17,7 +18,7 @@ use reqwest::{Method, StatusCode};
1718
use tracing::error;
1819

1920
use crate::{
20-
plugin_manager::PluginManager,
21+
plugin_manager::PluginManagerImpl,
2122
source::{
2223
federation_source::FederationSourceRuntime,
2324
graphql_source::GraphQLSourceRuntime,
@@ -30,7 +31,7 @@ use crate::{
3031
pub struct ConductorGatewayRouteData {
3132
pub endpoint: String,
3233
pub tenant_id: u32,
33-
pub plugin_manager: Arc<PluginManager>,
34+
pub plugin_manager: Arc<Box<dyn PluginManager>>,
3435
pub to: Arc<Box<dyn SourceRuntime>>,
3536
}
3637

@@ -102,9 +103,10 @@ impl ConductorGateway {
102103
.cloned()
103104
.collect::<Vec<_>>();
104105

105-
let plugin_manager = PluginManager::new(&Some(combined_plugins), tracing_manager, tenant_id)
106-
.await
107-
.map_err(GatewayError::PluginManagerInitError)?;
106+
let plugin_manager =
107+
PluginManagerImpl::new(&Some(combined_plugins), tracing_manager, tenant_id)
108+
.await
109+
.map_err(GatewayError::PluginManagerInitError)?;
108110

109111
let upstream_source: Box<dyn SourceRuntime> = config_object
110112
.sources
@@ -115,7 +117,7 @@ impl ConductorGateway {
115117
let route_data = ConductorGatewayRouteData {
116118
endpoint: endpoint_config.path.clone(),
117119
to: Arc::new(upstream_source),
118-
plugin_manager: Arc::new(plugin_manager),
120+
plugin_manager: Arc::new(Box::new(plugin_manager)),
119121
tenant_id,
120122
};
121123

@@ -159,10 +161,10 @@ impl ConductorGateway {
159161
plugins: Vec<Box<dyn conductor_common::plugin::Plugin>>,
160162
request: ConductorHttpRequest,
161163
) -> ConductorHttpResponse {
162-
let plugin_manager = PluginManager::new_from_vec(plugins);
164+
let plugin_manager = PluginManagerImpl::new_from_vec(plugins);
163165
let route_data = ConductorGatewayRouteData {
164166
endpoint: "/".to_string(),
165-
plugin_manager: Arc::new(plugin_manager),
167+
plugin_manager: Arc::new(Box::new(plugin_manager)),
166168
to: source,
167169
tenant_id: 0,
168170
};

libs/engine/src/plugin_manager.rs

+15-11
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,23 @@ use conductor_common::{
33
graphql::GraphQLRequest,
44
http::{ConductorHttpRequest, ConductorHttpResponse},
55
plugin::{CreatablePlugin, Plugin, PluginError},
6+
plugin_manager::PluginManager,
67
};
78
use conductor_config::PluginDefinition;
89
use conductor_tracing::minitrace_mgr::MinitraceManager;
910
use reqwest::Response;
1011

1112
#[derive(Debug, Default)]
12-
pub struct PluginManager {
13+
pub struct PluginManagerImpl {
1314
plugins: Vec<Box<dyn Plugin>>,
1415
}
1516

16-
impl PluginManager {
17+
impl PluginManagerImpl {
1718
pub fn new_from_vec(plugins: Vec<Box<dyn Plugin>>) -> Self {
1819
let mut pm = Self { plugins };
1920

2021
// We want to make sure to register default plugins last, in order to ensure it's setting the value correctly
21-
for p in PluginManager::default_plugins() {
22+
for p in PluginManagerImpl::default_plugins() {
2223
pm.register_boxed_plugin(p);
2324
}
2425

@@ -34,7 +35,7 @@ impl PluginManager {
3435
tracing_manager: &mut MinitraceManager,
3536
tenant_id: u32,
3637
) -> Result<Self, PluginError> {
37-
let mut instance = PluginManager::default();
38+
let mut instance = PluginManagerImpl::default();
3839

3940
if let Some(config_defs) = plugins_config {
4041
for plugin_def in config_defs.iter() {
@@ -98,7 +99,7 @@ impl PluginManager {
9899
};
99100

100101
// We want to make sure to register these last, in order to ensure it's setting the value correctly
101-
for p in PluginManager::default_plugins() {
102+
for p in PluginManagerImpl::default_plugins() {
102103
instance.register_boxed_plugin(p);
103104
}
104105

@@ -116,14 +117,17 @@ impl PluginManager {
116117
pub fn register_plugin(&mut self, plugin: impl Plugin + 'static) {
117118
self.plugins.push(Box::new(plugin));
118119
}
120+
}
119121

122+
#[async_trait::async_trait(?Send)]
123+
impl PluginManager for PluginManagerImpl {
120124
#[tracing::instrument(
121125
level = "debug",
122126
skip(self, context),
123127
name = "on_downstream_http_request"
124128
)]
125129
#[inline]
126-
pub async fn on_downstream_http_request(&self, context: &mut RequestExecutionContext) {
130+
async fn on_downstream_http_request(&self, context: &mut RequestExecutionContext) {
127131
let p = &self.plugins;
128132

129133
for plugin in p.iter() {
@@ -141,7 +145,7 @@ impl PluginManager {
141145
name = "on_downstream_http_response"
142146
)]
143147
#[inline]
144-
pub fn on_downstream_http_response(
148+
fn on_downstream_http_response(
145149
&self,
146150
context: &mut RequestExecutionContext,
147151
response: &mut ConductorHttpResponse,
@@ -163,7 +167,7 @@ impl PluginManager {
163167
name = "on_downstream_graphql_request"
164168
)]
165169
#[inline]
166-
pub async fn on_downstream_graphql_request(&self, context: &mut RequestExecutionContext) {
170+
async fn on_downstream_graphql_request(&self, context: &mut RequestExecutionContext) {
167171
let p = &self.plugins;
168172

169173
for plugin in p.iter() {
@@ -177,7 +181,7 @@ impl PluginManager {
177181

178182
#[tracing::instrument(level = "debug", skip(self, req), name = "on_upstream_graphql_request")]
179183
#[inline]
180-
pub async fn on_upstream_graphql_request<'a>(&self, req: &mut GraphQLRequest) {
184+
async fn on_upstream_graphql_request<'a>(&self, req: &mut GraphQLRequest) {
181185
let p = &self.plugins;
182186

183187
for plugin in p.iter() {
@@ -191,7 +195,7 @@ impl PluginManager {
191195
name = "on_upstream_http_request"
192196
)]
193197
#[inline]
194-
pub async fn on_upstream_http_request<'a>(
198+
async fn on_upstream_http_request<'a>(
195199
&self,
196200
ctx: &mut RequestExecutionContext,
197201
request: &mut ConductorHttpRequest,
@@ -213,7 +217,7 @@ impl PluginManager {
213217
name = "on_upstream_http_response"
214218
)]
215219
#[inline]
216-
pub async fn on_upstream_http_response<'a>(
220+
async fn on_upstream_http_response<'a>(
217221
&self,
218222
ctx: &mut RequestExecutionContext,
219223
response: &Result<Response, reqwest_middleware::Error>,

libs/engine/src/source/federation_source.rs

+15-13
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ use base64::{engine, Engine};
44
use conductor_common::execute::RequestExecutionContext;
55
use conductor_common::graphql::GraphQLResponse;
66
use conductor_config::{FederationSourceConfig, SupergraphSourceConfig};
7-
use federation_query_planner::execute_federation;
87
use federation_query_planner::supergraph::{parse_supergraph, Supergraph};
8+
use federation_query_planner::FederationExecutor;
9+
use futures::lock::Mutex;
910
use minitrace_reqwest::{traced_reqwest, TracedHttpClient};
1011
use std::collections::HashMap;
12+
use std::sync::Arc;
1113
use std::{future::Future, pin::Pin};
1214

1315
#[derive(Debug)]
@@ -156,7 +158,7 @@ impl FederationSourceRuntime {
156158
}
157159

158160
pub async fn update_supergraph(&mut self, new_schema: String) {
159-
let new_supergraph = parse_supergraph(&new_schema).unwrap();
161+
let new_supergraph: Supergraph = parse_supergraph(&new_schema).unwrap();
160162
self.supergraph = new_supergraph;
161163
}
162164

@@ -190,7 +192,7 @@ impl SourceRuntime for FederationSourceRuntime {
190192

191193
fn execute<'a>(
192194
&'a self,
193-
_route_data: &'a ConductorGatewayRouteData,
195+
route_data: &'a ConductorGatewayRouteData,
194196
request_context: &'a mut RequestExecutionContext,
195197
) -> Pin<Box<(dyn Future<Output = Result<GraphQLResponse, SourceError>> + 'a)>> {
196198
Box::pin(wasm_polyfills::call_async(async move {
@@ -199,17 +201,17 @@ impl SourceRuntime for FederationSourceRuntime {
199201
.take()
200202
.expect("GraphQL request isn't available at the time of execution");
201203

202-
// let source_req = &mut downstream_request.request;
203-
204-
// TODO: this needs to be called by conductor execution when fetching subgarphs
205-
// route_data
206-
// .plugin_manager
207-
// .on_upstream_graphql_request(source_req)
208-
// .await;
209-
210204
let operation = downstream_request.parsed_operation;
211-
212-
match execute_federation(&self.client, &self.supergraph, operation).await {
205+
let executor = FederationExecutor {
206+
client: &self.client,
207+
plugin_manager: route_data.plugin_manager.clone(),
208+
supergraph: &self.supergraph,
209+
};
210+
211+
match executor
212+
.execute_federation(Arc::new(Mutex::new(request_context)), operation)
213+
.await
214+
{
213215
Ok((response_data, query_plan)) => {
214216
let mut response = serde_json::from_str::<GraphQLResponse>(&response_data).unwrap();
215217

libs/federation_query_planner/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ bench = false
1010
serde = { workspace = true }
1111
wasm_polyfills = { path = "../wasm_polyfills" }
1212
conductor_tracing = { path = "../tracing" }
13+
conductor_common = { path = "../common" }
1314
serde_json = { workspace = true }
1415
async-trait = { workspace = true }
1516
anyhow = { workspace = true }

0 commit comments

Comments
 (0)