@@ -205,23 +205,18 @@ def step_with_batch_queue(self) -> Optional[EngineCoreOutputs]:
205
205
self .batch_queue .put_nowait (
206
206
(future , scheduler_output )) # type: ignore
207
207
208
- # If all requests are scheduled or the job queue is full,
208
+ scheduled_batch = (scheduler_output is not None
209
+ and scheduler_output .total_num_scheduled_tokens > 0 )
210
+
211
+ # If no more requests can be scheduled and the job queue is not empty,
209
212
# block until the first batch in the job queue is finished.
210
- if (scheduler_output is None
211
- or scheduler_output .total_num_scheduled_tokens == 0 ):
212
- try :
213
- future , scheduler_output = self .batch_queue .get (
214
- timeout = POLLING_TIMEOUT_S )
215
- # Blocking until the first result is available.
216
- model_output = future .result ()
217
- self .batch_queue .task_done ()
218
- engine_core_outputs = self .scheduler .update_from_output (
219
- scheduler_output , model_output )
220
- except queue .Empty :
221
- # If the queue is empty (timeout at .get), return
222
- # an empty EngineCoreOutputs for logging.
223
- engine_core_outputs = EngineCoreOutputs (
224
- outputs = [], scheduler_stats = self .scheduler .make_stats ())
213
+ if not scheduled_batch and not self .batch_queue .empty ():
214
+ future , scheduler_output = self .batch_queue .get_nowait ()
215
+ # Blocking until the first result is available.
216
+ model_output = future .result ()
217
+ self .batch_queue .task_done ()
218
+ engine_core_outputs = self .scheduler .update_from_output (
219
+ scheduler_output , model_output )
225
220
226
221
return engine_core_outputs
227
222
0 commit comments