@@ -6,21 +6,25 @@ use futures::Future;
6
6
use hyper:: { header, Body , Method , Request , Response } ;
7
7
use serde:: de:: DeserializeOwned ;
8
8
use serde:: Serialize ;
9
+ use tokio:: time:: { Duration , Instant } ;
9
10
10
11
use crate :: headers:: { CONTENT_TYPE_JSON , CONTENT_TYPE_PROTOBUF } ;
11
12
use crate :: { error, to_proto_body, GenericError , TwirpErrorResponse } ;
12
13
14
+ /// A function that handles a request and returns a response.
15
+ type HandlerFn = Box < dyn Fn ( Request < Body > ) -> HandlerResponse + Send + Sync > ;
16
+
17
+ /// Type alias for a handler response.
13
18
type HandlerResponse =
14
19
Box < dyn Future < Output = Result < Response < Body > , GenericError > > + Unpin + Send > ;
15
20
16
- type HandlerFn = Box < dyn Fn ( Request < Body > ) -> HandlerResponse + Send + Sync > ;
17
-
18
- /// A Router maps a request to a handler.
21
+ /// A Router maps a request (method, path) tuple to a handler.
19
22
pub struct Router {
20
23
routes : HashMap < ( Method , String ) , HandlerFn > ,
21
24
prefix : & ' static str ,
22
25
}
23
26
27
+ /// The canonical twirp path prefix. You don't have to use this, but it's the default.
24
28
pub const DEFAULT_TWIRP_PATH_PREFIX : & str = "/twirp" ;
25
29
26
30
impl Default for Router {
@@ -48,7 +52,7 @@ impl Router {
48
52
}
49
53
}
50
54
51
- /// Adds a handler to the router for the given method and path.
55
+ /// Adds a sync handler to the router for the given method and path.
52
56
pub fn add_sync_handler < F > ( & mut self , method : Method , path : & str , f : F )
53
57
where
54
58
F : Fn ( Request < Body > ) -> Result < Response < Body > , GenericError >
@@ -96,8 +100,16 @@ impl Router {
96
100
> {
97
101
let f = f. clone ( ) ;
98
102
Box :: new ( Box :: pin ( async move {
99
- match parse_request ( req) . await {
100
- Ok ( ( req, resp_fmt) ) => write_response ( f ( req) . await , resp_fmt) ,
103
+ let mut timings = * req
104
+ . extensions ( )
105
+ . get :: < Timings > ( )
106
+ . expect ( "invariant violated: timing info not present in request" ) ;
107
+ match parse_request ( req, & mut timings) . await {
108
+ Ok ( ( req, resp_fmt) ) => {
109
+ let res = f ( req) . await ;
110
+ timings. set_response_handled ( ) ;
111
+ write_response ( res, resp_fmt)
112
+ }
101
113
Err ( err) => {
102
114
// This is the only place we use tracing (would be nice to remove)
103
115
// tracing::error!(?err, "failed to parse request");
@@ -109,17 +121,27 @@ impl Router {
109
121
twirp_err. to_response ( )
110
122
}
111
123
}
124
+ . map ( |mut resp| {
125
+ timings. set_response_written ( ) ;
126
+ resp. extensions_mut ( ) . insert ( timings) ;
127
+ resp
128
+ } )
112
129
} ) )
113
130
} ;
114
131
let key = ( Method :: POST , [ self . prefix , path] . join ( "/" ) ) ;
115
132
self . routes . insert ( key, Box :: new ( g) ) ;
116
133
}
117
134
}
118
135
136
+ /// Serve a request using the given router.
119
137
pub async fn serve (
120
138
router : Arc < Router > ,
121
- req : Request < Body > ,
139
+ mut req : Request < Body > ,
122
140
) -> Result < Response < Body > , GenericError > {
141
+ if req. extensions ( ) . get :: < Timings > ( ) . is_none ( ) {
142
+ let start = tokio:: time:: Instant :: now ( ) ;
143
+ req. extensions_mut ( ) . insert ( Timings :: new ( start) ) ;
144
+ }
123
145
let key = ( req. method ( ) . clone ( ) , req. uri ( ) . path ( ) . to_string ( ) ) ;
124
146
if let Some ( handler) = router. routes . get ( & key) {
125
147
handler ( req) . await
@@ -150,16 +172,21 @@ impl BodyFormat {
150
172
}
151
173
}
152
174
153
- async fn parse_request < T > ( req : Request < Body > ) -> Result < ( T , BodyFormat ) , GenericError >
175
+ async fn parse_request < T > (
176
+ req : Request < Body > ,
177
+ timings : & mut Timings ,
178
+ ) -> Result < ( T , BodyFormat ) , GenericError >
154
179
where
155
180
T : prost:: Message + Default + DeserializeOwned ,
156
181
{
157
182
let format = BodyFormat :: from_content_type ( & req) ;
158
183
let bytes = hyper:: body:: to_bytes ( req. into_body ( ) ) . await ?;
184
+ timings. set_received ( ) ;
159
185
let request = match format {
160
186
BodyFormat :: Pb => T :: decode ( bytes) ?,
161
187
BodyFormat :: JsonPb => serde_json:: from_slice ( & bytes) ?,
162
188
} ;
189
+ timings. set_parsed ( ) ;
163
190
Ok ( ( request, format) )
164
191
}
165
192
@@ -191,6 +218,84 @@ where
191
218
Ok ( res)
192
219
}
193
220
221
+ /// Contains timing information associated with a request.
222
+ /// To access the timings in a given request, use the [extensions](Request::extensions)
223
+ /// method and specialize to `Timings` appropriately.
224
+ #[ derive( Debug , Clone , Copy ) ]
225
+ pub struct Timings {
226
+ // When the request started.
227
+ pub start : Instant ,
228
+ // When the request was received (headers and body).
229
+ pub request_received : Option < Instant > ,
230
+ // When the request body was parsed.
231
+ pub request_parsed : Option < Instant > ,
232
+ // When the response handler returned.
233
+ pub response_handled : Option < Instant > ,
234
+ // When the response was written.
235
+ pub response_written : Option < Instant > ,
236
+ }
237
+
238
+ impl Timings {
239
+ #[ allow( clippy:: new_without_default) ]
240
+ pub fn new ( start : Instant ) -> Self {
241
+ Self {
242
+ start,
243
+ request_received : None ,
244
+ request_parsed : None ,
245
+ response_handled : None ,
246
+ response_written : None ,
247
+ }
248
+ }
249
+
250
+ fn set_received ( & mut self ) {
251
+ self . request_received = Some ( Instant :: now ( ) ) ;
252
+ }
253
+
254
+ fn set_parsed ( & mut self ) {
255
+ self . request_parsed = Some ( Instant :: now ( ) ) ;
256
+ }
257
+
258
+ fn set_response_handled ( & mut self ) {
259
+ self . response_handled = Some ( Instant :: now ( ) ) ;
260
+ }
261
+
262
+ fn set_response_written ( & mut self ) {
263
+ self . response_written = Some ( Instant :: now ( ) ) ;
264
+ }
265
+
266
+ pub fn received ( & self ) -> Option < Duration > {
267
+ self . request_received . map ( |x| x - self . start )
268
+ }
269
+
270
+ pub fn parsed ( & self ) -> Option < Duration > {
271
+ match ( self . request_parsed , self . request_received ) {
272
+ ( Some ( parsed) , Some ( received) ) => Some ( parsed - received) ,
273
+ _ => None ,
274
+ }
275
+ }
276
+
277
+ pub fn response_handled ( & self ) -> Option < Duration > {
278
+ match ( self . response_handled , self . request_parsed ) {
279
+ ( Some ( handled) , Some ( parsed) ) => Some ( handled - parsed) ,
280
+ _ => None ,
281
+ }
282
+ }
283
+
284
+ pub fn response_written ( & self ) -> Option < Duration > {
285
+ match ( self . response_written , self . response_handled ) {
286
+ ( Some ( written) , Some ( handled) ) => Some ( written - handled) ,
287
+ ( Some ( written) , None ) => {
288
+ if let Some ( parsed) = self . request_parsed {
289
+ Some ( written - parsed)
290
+ } else {
291
+ self . request_received . map ( |received| written - received)
292
+ }
293
+ }
294
+ _ => None ,
295
+ }
296
+ }
297
+ }
298
+
194
299
#[ cfg( test) ]
195
300
mod tests {
196
301
0 commit comments