@@ -19,7 +19,7 @@ use omicron_common::address::CLICKHOUSE_TCP_PORT;
19
19
use omicron_common:: api:: external:: Generation ;
20
20
use oximeter_db:: Client as OximeterClient ;
21
21
use oximeter_db:: OXIMETER_VERSION ;
22
- use slog:: info;
22
+ use slog:: { info, error } ;
23
23
use slog:: Logger ;
24
24
use std:: fs:: File ;
25
25
use std:: io:: { BufRead , BufReader } ;
@@ -109,7 +109,7 @@ impl ServerContext {
109
109
// We only want to handle one in flight request at a time. Reconfigurator execution will retry
110
110
// again later anyway. We use a flume bounded channel with a size of 0 to act as a rendezvous channel.
111
111
let ( inner_tx, inner_rx) = flume:: bounded ( 0 ) ;
112
- tokio:: spawn ( long_running_ch_server_task ( inner_rx) ) ;
112
+ tokio:: spawn ( long_running_ch_admin_server_task ( inner_rx) ) ;
113
113
114
114
Ok ( Self {
115
115
clickhouse_cli,
@@ -158,7 +158,7 @@ pub enum ClickhouseAdminServerRequest {
158
158
} ,
159
159
}
160
160
161
- async fn long_running_ch_server_task (
161
+ async fn long_running_ch_admin_server_task (
162
162
incoming : Receiver < ClickhouseAdminServerRequest > ,
163
163
) {
164
164
while let Ok ( request) = incoming. recv_async ( ) . await {
@@ -168,23 +168,35 @@ async fn long_running_ch_server_task(
168
168
replica_settings,
169
169
response,
170
170
} => {
171
+ // TODO: Remove clone
171
172
let result =
172
- generate_config_and_enable_svc ( ctx, replica_settings) ;
173
- response. send ( result) . expect ( "failed to send value from configuration generation to channel" ) ;
173
+ generate_config_and_enable_svc ( ctx. clone ( ) , replica_settings) ;
174
+ if let Err ( e) = response. send ( result) {
175
+ error ! (
176
+ & ctx. log,
177
+ "failed to send value from configuration generation to channel: {e:?}"
178
+ ) ;
179
+ } ;
174
180
}
175
181
ClickhouseAdminServerRequest :: DbInit {
176
182
ctx,
177
183
replicated,
178
184
response,
179
185
} => {
180
- let result = init_db ( ctx, replicated) . await ;
181
- response. send ( result) . expect ( "failed to send value from database initialization to channel" ) ;
186
+ let result = init_db ( ctx. clone ( ) , replicated) . await ;
187
+ if let Err ( e) = response. send ( result) {
188
+ error ! (
189
+ & ctx. log,
190
+ "failed to send value from database initialization to channel: {e:?}"
191
+ ) ;
192
+ } ;
182
193
}
183
194
}
184
195
}
185
196
}
186
197
187
198
pub fn generate_config_and_enable_svc (
199
+ // TODO: Expand ctx into separate items
188
200
ctx : Arc < ServerContext > ,
189
201
replica_settings : ServerConfigurableSettings ,
190
202
) -> Result < ReplicaConfig , HttpError > {
@@ -222,6 +234,7 @@ pub fn generate_config_and_enable_svc(
222
234
}
223
235
224
236
pub async fn init_db (
237
+ // TODO: Expand ctx into separate items
225
238
ctx : Arc < ServerContext > ,
226
239
replicated : bool ,
227
240
) -> Result < ( ) , HttpError > {
0 commit comments