Skip to content

Commit

Permalink
[compute logs] fix subscription (#28110)
Browse files Browse the repository at this point in the history
* we were exiting with stuff still in the queue since we were just
checking the subscription before exiting
* we were blocking main thread with downloads since we do a `fetch` on
`subscribe`

## How I Tested These Changes

existing coverage, look compute logs in `dagster dev` and dagster+
  • Loading branch information
alangenfeld authored Feb 27, 2025
1 parent 50218dd commit 062837b
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,15 @@ async def gen_captured_log_data(
def _enqueue(new_event):
loop.call_soon_threadsafe(queue.put_nowait, new_event)

subscription(_enqueue)
# subscription object will attempt to fetch when started, so move off main thread
await run_in_threadpool(subscription, _enqueue)

is_complete = False
try:
while not is_complete:
update = await queue.get()
yield from_captured_log_data(update)
is_complete = subscription.is_complete
is_complete = subscription.is_complete and queue.empty()
finally:
subscription.dispose()

Expand Down

0 comments on commit 062837b

Please sign in to comment.