From 07c46bd7f0492d252d07e9a950f4a72529bb0882 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Mon, 10 Feb 2025 14:12:14 -0800 Subject: [PATCH 1/9] Checkpoint --- include/aws/io/event_loop.h | 16 ++++++++++- include/aws/testing/io_testing_channel.h | 9 ++++-- source/bsd/kqueue_event_loop.c | 12 ++++++-- source/darwin/dispatch_queue_event_loop.c | 12 ++++++-- .../dispatch_queue_event_loop_private.h | 11 ++++++++ source/event_loop.c | 28 +++++++++++++++++-- source/linux/epoll_event_loop.c | 12 ++++++-- source/windows/iocp/iocp_event_loop.c | 12 ++++++-- 8 files changed, 95 insertions(+), 17 deletions(-) diff --git a/include/aws/io/event_loop.h b/include/aws/io/event_loop.h index ae332f387..11d28ddae 100644 --- a/include/aws/io/event_loop.h +++ b/include/aws/io/event_loop.h @@ -29,7 +29,8 @@ typedef void(aws_event_loop_on_event_fn)( * @internal */ struct aws_event_loop_vtable { - void (*destroy)(struct aws_event_loop *event_loop); + void (*start_destroy)(struct aws_event_loop *event_loop); + void (*complete_destroy)(struct aws_event_loop *event_loop); int (*run)(struct aws_event_loop *event_loop); int (*stop)(struct aws_event_loop *event_loop); int (*wait_for_stop_completion)(struct aws_event_loop *event_loop); @@ -255,6 +256,19 @@ void aws_event_loop_clean_up_base(struct aws_event_loop *event_loop); AWS_IO_API void aws_event_loop_destroy(struct aws_event_loop *event_loop); +/** + * @internal + */ +AWS_IO_API +void aws_event_loop_start_destroy(struct aws_event_loop *event_loop); + +/** + * @internal + * + */ +AWS_IO_API +void aws_event_loop_complete_destroy(struct aws_event_loop *event_loop); + AWS_EXTERN_C_END AWS_POP_SANE_WARNING_LEVEL diff --git a/include/aws/testing/io_testing_channel.h b/include/aws/testing/io_testing_channel.h index 8fa118ca4..311fbf6ae 100644 --- a/include/aws/testing/io_testing_channel.h +++ b/include/aws/testing/io_testing_channel.h @@ -57,7 +57,11 @@ static bool s_testing_loop_is_on_callers_thread(struct aws_event_loop *event_loo return testing_loop->mock_on_callers_thread; } -static void s_testing_loop_destroy(struct aws_event_loop *event_loop) { +static void s_testing_loop_start_destroy(struct aws_event_loop *event_loop) { + (void)event_loop; +} + +static void s_testing_loop_complete_destroy(struct aws_event_loop *event_loop) { struct testing_loop *testing_loop = (struct testing_loop *)aws_event_loop_get_impl(event_loop); struct aws_allocator *allocator = testing_loop->allocator; aws_task_scheduler_clean_up(&testing_loop->scheduler); @@ -67,7 +71,8 @@ static void s_testing_loop_destroy(struct aws_event_loop *event_loop) { } static struct aws_event_loop_vtable s_testing_loop_vtable = { - .destroy = s_testing_loop_destroy, + .start_destroy = s_testing_loop_start_destroy, + .complete_destroy = s_testing_loop_complete_destroy, .is_on_callers_thread = s_testing_loop_is_on_callers_thread, .run = s_testing_loop_run, .schedule_task_now = s_testing_loop_schedule_task_now, diff --git a/source/bsd/kqueue_event_loop.c b/source/bsd/kqueue_event_loop.c index 29e0e7e08..6fca33059 100644 --- a/source/bsd/kqueue_event_loop.c +++ b/source/bsd/kqueue_event_loop.c @@ -25,7 +25,8 @@ #include #include -static void s_destroy(struct aws_event_loop *event_loop); +static void s_start_destroy(struct aws_event_loop *event_loop); +static void s_complete_destroy(struct aws_event_loop *event_loop); static int s_run(struct aws_event_loop *event_loop); static int s_stop(struct aws_event_loop *event_loop); static int s_wait_for_stop_completion(struct aws_event_loop *event_loop); @@ -135,7 +136,8 @@ enum { }; struct aws_event_loop_vtable s_kqueue_vtable = { - .destroy = s_destroy, + .start_destroy = s_start_destroy, + .complete_destroy = s_complete_destroy, .run = s_run, .stop = s_stop, .wait_for_stop_completion = s_wait_for_stop_completion, @@ -313,7 +315,11 @@ struct aws_event_loop *aws_event_loop_new_with_kqueue( } #endif // AWS_ENABLE_KQUEUE -static void s_destroy(struct aws_event_loop *event_loop) { +static void s_start_destroy(struct aws_event_loop *event_loop) { + (void)event_loop; +} + +static void s_complete_destroy(struct aws_event_loop *event_loop) { AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: destroying event_loop", (void *)event_loop); struct kqueue_loop *impl = event_loop->impl_data; diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 7866ebe58..c92794cb4 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -19,7 +19,8 @@ #include #include -static void s_destroy(struct aws_event_loop *event_loop); +static void s_start_destroy(struct aws_event_loop *event_loop); +static void s_complete_destroy(struct aws_event_loop *event_loop); static int s_run(struct aws_event_loop *event_loop); static int s_stop(struct aws_event_loop *event_loop); static int s_wait_for_stop_completion(struct aws_event_loop *event_loop); @@ -60,7 +61,8 @@ static void *s_get_base_event_loop_group(struct aws_event_loop *event_loop); static bool s_is_on_callers_thread(struct aws_event_loop *event_loop); static struct aws_event_loop_vtable s_vtable = { - .destroy = s_destroy, + .start_destroy = s_start_destroy, + .complete_destroy = s_complete_destroy, .run = s_run, .stop = s_stop, .wait_for_stop_completion = s_wait_for_stop_completion, @@ -380,7 +382,11 @@ static void s_dispatch_queue_destroy_task(void *context) { s_dispatch_event_loop_destroy(dispatch_loop->base_loop); } -static void s_destroy(struct aws_event_loop *event_loop) { +static void s_start_destroy(struct aws_event_loop *event_loop) { + (void)event_loop; +} + +static void s_complete_destroy(struct aws_event_loop *event_loop) { AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying Dispatch Queue Event Loop", (void *)event_loop); struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data; diff --git a/source/darwin/dispatch_queue_event_loop_private.h b/source/darwin/dispatch_queue_event_loop_private.h index 531ef3cb7..c3792037c 100644 --- a/source/darwin/dispatch_queue_event_loop_private.h +++ b/source/darwin/dispatch_queue_event_loop_private.h @@ -11,6 +11,13 @@ #include #include +enum aws_dispatch_loop_execution_state { + AWS_DLES_SUSPENDED, + AWS_DLES_RUNNING, + AWS_DLES_SHUTTING_DOWN, + AWS_DLES_TERMINATED +}; + struct aws_dispatch_loop { struct aws_allocator *allocator; dispatch_queue_t dispatch_queue; @@ -18,6 +25,8 @@ struct aws_dispatch_loop { struct aws_event_loop *base_loop; struct aws_event_loop_group *base_elg; + //struct aws_ref_count ref_count; + /* Synced data handle cross thread tasks and events, and event loop operations*/ struct { /* @@ -43,6 +52,7 @@ struct aws_dispatch_loop { * Calling dispatch_sync() on a suspended dispatch queue will deadlock. */ bool suspended; + //enum aws_dispatch_loop_execution_state execution_state; struct aws_linked_list cross_thread_tasks; @@ -54,6 +64,7 @@ struct aws_dispatch_loop { * redundant. */ struct aws_priority_queue scheduled_iterations; + //struct aws_linked_list scheduled_iterations; } synced_data; }; diff --git a/source/event_loop.c b/source/event_loop.c index 0a799e270..151951c3f 100644 --- a/source/event_loop.c +++ b/source/event_loop.c @@ -495,10 +495,34 @@ void aws_event_loop_destroy(struct aws_event_loop *event_loop) { return; } - AWS_ASSERT(event_loop->vtable && event_loop->vtable->destroy); + AWS_ASSERT(event_loop->vtable && event_loop->vtable->start_destroy); + AWS_ASSERT(event_loop->vtable && event_loop->vtable->complete_destroy); AWS_ASSERT(!aws_event_loop_thread_is_callers_thread(event_loop)); - event_loop->vtable->destroy(event_loop); + event_loop->vtable->start_destroy(event_loop); + event_loop->vtable->complete_destroy(event_loop); +} + +void aws_event_loop_start_destroy(struct aws_event_loop *event_loop) { + if (!event_loop) { + return; + } + + AWS_ASSERT(event_loop->vtable && event_loop->vtable->start_destroy); + AWS_ASSERT(!aws_event_loop_thread_is_callers_thread(event_loop)); + + event_loop->vtable->start_destroy(event_loop); +} + +void aws_event_loop_complete_destroy(struct aws_event_loop *event_loop) { + if (!event_loop) { + return; + } + + AWS_ASSERT(event_loop->vtable && event_loop->vtable->complete_destroy); + AWS_ASSERT(!aws_event_loop_thread_is_callers_thread(event_loop)); + + event_loop->vtable->complete_destroy(event_loop); } int aws_event_loop_fetch_local_object( diff --git a/source/linux/epoll_event_loop.c b/source/linux/epoll_event_loop.c index 823e34c94..ea440ee89 100644 --- a/source/linux/epoll_event_loop.c +++ b/source/linux/epoll_event_loop.c @@ -44,7 +44,8 @@ # define EPOLLRDHUP 0x2000 #endif -static void s_destroy(struct aws_event_loop *event_loop); +static void s_start_destroy(struct aws_event_loop *event_loop); +static void s_complete_destroy(struct aws_event_loop *event_loop); static int s_run(struct aws_event_loop *event_loop); static int s_stop(struct aws_event_loop *event_loop); static int s_wait_for_stop_completion(struct aws_event_loop *event_loop); @@ -81,7 +82,8 @@ static bool s_is_on_callers_thread(struct aws_event_loop *event_loop); static void aws_event_loop_thread(void *args); static struct aws_event_loop_vtable s_vtable = { - .destroy = s_destroy, + .start_destroy = s_start_destroy, + .complete_destroy = s_complete_destroy, .run = s_run, .stop = s_stop, .wait_for_stop_completion = s_wait_for_stop_completion, @@ -248,7 +250,11 @@ struct aws_event_loop *aws_event_loop_new_with_epoll( return NULL; } -static void s_destroy(struct aws_event_loop *event_loop) { +static void s_start_destroy(struct aws_event_loop *event_loop) { + (void)event_loop; +} + +static void s_complete_destroy(struct aws_event_loop *event_loop) { AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying event_loop", (void *)event_loop); struct epoll_loop *epoll_loop = event_loop->impl_data; diff --git a/source/windows/iocp/iocp_event_loop.c b/source/windows/iocp/iocp_event_loop.c index 712f64bfe..bd31cfa77 100644 --- a/source/windows/iocp/iocp_event_loop.c +++ b/source/windows/iocp/iocp_event_loop.c @@ -96,7 +96,8 @@ enum { MAX_COMPLETION_PACKETS_PER_LOOP = 100, }; -static void s_destroy(struct aws_event_loop *event_loop); +static void s_start_destroy(struct aws_event_loop *event_loop); +static void s_complete_destroy(struct aws_event_loop *event_loop); static int s_run(struct aws_event_loop *event_loop); static int s_stop(struct aws_event_loop *event_loop); static int s_wait_for_stop_completion(struct aws_event_loop *event_loop); @@ -156,7 +157,8 @@ struct _OVERLAPPED *aws_overlapped_to_windows_overlapped(struct aws_overlapped * } struct aws_event_loop_vtable s_iocp_vtable = { - .destroy = s_destroy, + .start_destroy = s_start_destroy, + .complete_destroy = s_complete_destroy, .run = s_run, .stop = s_stop, .wait_for_stop_completion = s_wait_for_stop_completion, @@ -306,8 +308,12 @@ struct aws_event_loop *aws_event_loop_new_with_iocp( return NULL; } +static void s_start_destroy(struct aws_event_loop *event_loop) { + (void)event_loop; +} + /* Should not be called from event-thread */ -static void s_destroy(struct aws_event_loop *event_loop) { +static void s_complete_destroy(struct aws_event_loop *event_loop) { AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: destroying event-loop", (void *)event_loop); struct iocp_loop *impl = event_loop->impl_data; From e258edcc4a06d5f576789e2a05204c338ea73198 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Tue, 11 Feb 2025 08:53:43 -0800 Subject: [PATCH 2/9] Checkpoint before migrating permanently to socket branch --- source/darwin/dispatch_queue_event_loop.c | 25 ++++++++++++++++++- .../dispatch_queue_event_loop_private.h | 5 ++-- source/event_loop.c | 10 +++++++- 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index c92794cb4..b7e53e67d 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -127,6 +127,20 @@ static int s_unlock_synced_data(struct aws_dispatch_loop *dispatch_loop) { return aws_mutex_unlock(&dispatch_loop->synced_data.synced_data_lock); } +static struct aws_dispatch_loop *s_dispatch_loop_acquire(struct aws_dispatch_loop *dispatch_loop) { + if (dispatch_loop) { + aws_ref_count_acquire(&dispatch_loop->ref_count); + } + + return dispatch_loop; +} + +static void s_dispatch_loop_release(struct aws_dispatch_loop *dispatch_loop) { + if (dispatch_loop) { + aws_ref_count_release(&dispatch_loop->ref_count); + } +} + /* * This is used to determine the dynamic queue size containing scheduled iteration events. Expectation is for there to * be one scheduled for now, and one or two scheduled for various times in the future. It is unlikely for there to be @@ -151,7 +165,7 @@ static struct scheduled_iteration_entry *s_scheduled_iteration_entry_new( entry->allocator = dispatch_loop->allocator; entry->timestamp = timestamp; - entry->dispatch_loop = dispatch_loop; + entry->dispatch_loop = s_dispatch_loop_acquire(dispatch_loop); aws_priority_queue_node_init(&entry->priority_queue_node); return entry; @@ -161,6 +175,11 @@ static struct scheduled_iteration_entry *s_scheduled_iteration_entry_new( * Cleans up the memory allocated for a `scheduled_iteration_entry`. */ static void s_scheduled_iteration_entry_destroy(struct scheduled_iteration_entry *entry) { + if (!entry) { + return; + } + + s_dispatch_loop_release(entry->dispatch_loop); aws_mem_release(entry->allocator, entry); } @@ -202,6 +221,8 @@ static void s_dispatch_event_loop_destroy(struct aws_event_loop *event_loop) { aws_event_loop_clean_up_base(event_loop); aws_mem_release(event_loop->alloc, event_loop); + aws_thread_decrement_unjoined_count(); + AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroyed Dispatch Queue Event Loop.", (void *)event_loop); } @@ -249,6 +270,8 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( dispatch_loop->base_loop = loop; dispatch_loop->base_elg = options->parent_elg; + aws_thread_increment_unjoined_count(); + char dispatch_queue_id[AWS_IO_APPLE_DISPATCH_QUEUE_ID_LENGTH] = {0}; s_get_unique_dispatch_queue_id(dispatch_queue_id); diff --git a/source/darwin/dispatch_queue_event_loop_private.h b/source/darwin/dispatch_queue_event_loop_private.h index c3792037c..2ff102ac5 100644 --- a/source/darwin/dispatch_queue_event_loop_private.h +++ b/source/darwin/dispatch_queue_event_loop_private.h @@ -25,7 +25,7 @@ struct aws_dispatch_loop { struct aws_event_loop *base_loop; struct aws_event_loop_group *base_elg; - //struct aws_ref_count ref_count; + struct aws_ref_count ref_count; /* Synced data handle cross thread tasks and events, and event loop operations*/ struct { @@ -52,7 +52,7 @@ struct aws_dispatch_loop { * Calling dispatch_sync() on a suspended dispatch queue will deadlock. */ bool suspended; - //enum aws_dispatch_loop_execution_state execution_state; + enum aws_dispatch_loop_execution_state execution_state; struct aws_linked_list cross_thread_tasks; @@ -64,7 +64,6 @@ struct aws_dispatch_loop { * redundant. */ struct aws_priority_queue scheduled_iterations; - //struct aws_linked_list scheduled_iterations; } synced_data; }; diff --git a/source/event_loop.c b/source/event_loop.c index 151951c3f..d7911bd95 100644 --- a/source/event_loop.c +++ b/source/event_loop.c @@ -187,11 +187,19 @@ static void s_event_loop_group_thread_exit(void *user_data) { } static void s_aws_event_loop_group_shutdown_sync(struct aws_event_loop_group *el_group) { + size_t loop_count = aws_array_list_length(&el_group->event_loops); + for (size_t i = 0; i < loop_count; ++i) { + struct aws_event_loop *loop = NULL; + aws_array_list_get_at(&el_group->event_loops, &loop, i); + + aws_event_loop_start_destroy(loop); + } + while (aws_array_list_length(&el_group->event_loops) > 0) { struct aws_event_loop *loop = NULL; if (!aws_array_list_back(&el_group->event_loops, &loop)) { - aws_event_loop_destroy(loop); + aws_event_loop_complete_destroy(loop); } aws_array_list_pop_back(&el_group->event_loops); From 0ad1471e94436739a2058aa8181979a6069c6e87 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Tue, 11 Feb 2025 14:03:30 -0800 Subject: [PATCH 3/9] Shutdown rework initial impl --- source/darwin/dispatch_queue_event_loop.c | 281 +++++++++++------- .../dispatch_queue_event_loop_private.h | 10 +- 2 files changed, 177 insertions(+), 114 deletions(-) diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index b7e53e67d..8aa72453f 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -19,6 +19,13 @@ #include #include +// Maximum amount of time we schedule event loop service tasks out into the future. This bounds the maximum +// amount of time we have to wait for those scheduled tasks to resolve during shutdown, which in turn bounds +// how long shutdown can take. +// +// Start with a second for now. +#define AWS_DISPATCH_QUEUE_MAX_FUTURE_SERVICE_INTERVAL (AWS_TIMESTAMP_NANOS) + static void s_start_destroy(struct aws_event_loop *event_loop); static void s_complete_destroy(struct aws_event_loop *event_loop); static int s_run(struct aws_event_loop *event_loop); @@ -105,17 +112,6 @@ static struct aws_event_loop_vtable s_vtable = { * `s_run_iteration`: This function represents the block scheduled in `scheduled_iteration_entry`'s */ -/* - * The data structure used to track the dispatch queue execution iteration (block). Each entry is associated with - * an run iteration scheduled on Apple Dispatch Queue. - */ -struct scheduled_iteration_entry { - struct aws_allocator *allocator; - uint64_t timestamp; - struct aws_priority_queue_node priority_queue_node; - struct aws_dispatch_loop *dispatch_loop; -}; - /* Help functions to lock status */ /* The synced_data_lock is held when any member of `aws_dispatch_loop`'s `synced_data` is accessed or modified */ @@ -141,6 +137,17 @@ static void s_dispatch_loop_release(struct aws_dispatch_loop *dispatch_loop) { } } +/* + * The data structure used to track the dispatch queue execution iteration (block). Each entry is associated with + * a block scheduled on Apple Dispatch Queue that runs a service iteration. + */ +struct scheduled_iteration_entry { + struct aws_allocator *allocator; + uint64_t timestamp; + struct aws_priority_queue_node priority_queue_node; + struct aws_dispatch_loop *dispatch_loop; +}; + /* * This is used to determine the dynamic queue size containing scheduled iteration events. Expectation is for there to * be one scheduled for now, and one or two scheduled for various times in the future. It is unlikely for there to be @@ -183,30 +190,8 @@ static void s_scheduled_iteration_entry_destroy(struct scheduled_iteration_entry aws_mem_release(entry->allocator, entry); } -/** - * Helper function to check if another scheduled iteration already exists that will handle our needs. - * - * The function should be wrapped with the synced_data_lock to safely access the scheduled_iterations list - */ -static bool s_should_schedule_iteration( - struct aws_priority_queue *scheduled_iterations, - uint64_t proposed_iteration_time) { - if (aws_priority_queue_size(scheduled_iterations) == 0) { - return true; - } - - struct scheduled_iteration_entry **entry_ptr = NULL; - aws_priority_queue_top(scheduled_iterations, (void **)&entry_ptr); - AWS_FATAL_ASSERT(entry_ptr != NULL); - struct scheduled_iteration_entry *entry = *entry_ptr; - AWS_FATAL_ASSERT(entry != NULL); - - /* is the next scheduled iteration later than what we require? */ - return entry->timestamp > proposed_iteration_time; -} - /* Manually called to destroy an aws_event_loop */ -static void s_dispatch_event_loop_destroy(struct aws_event_loop *event_loop) { +static void s_dispatch_event_loop_final_destroy(struct aws_event_loop *event_loop) { struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data; // The scheduler should be cleaned up and zeroed out in s_dispatch_queue_destroy_task. @@ -216,6 +201,7 @@ static void s_dispatch_event_loop_destroy(struct aws_event_loop *event_loop) { } aws_mutex_clean_up(&dispatch_loop->synced_data.synced_data_lock); + aws_condition_variable_clean_up(&dispatch_loop->synced_data.signal); aws_priority_queue_clean_up(&dispatch_loop->synced_data.scheduled_iterations); aws_mem_release(dispatch_loop->allocator, dispatch_loop); aws_event_loop_clean_up_base(event_loop); @@ -247,6 +233,20 @@ static void s_get_unique_dispatch_queue_id(char result[AWS_IO_APPLE_DISPATCH_QUE memcpy(result + AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH, uuid_buf.buffer, uuid_buf.len); } +static void s_dispatch_event_loop_on_zero_ref_count(void *user_data) { + struct aws_dispatch_loop *dispatch_loop = user_data; + if (dispatch_loop == NULL) { + return; + } + + s_lock_synced_data(dispatch_loop); + AWS_FATAL_ASSERT(dispatch_loop->synced_data.execution_state == AWS_DLES_SHUTTING_DOWN); + dispatch_loop->synced_data.execution_state = AWS_DLES_TERMINATED; + s_unlock_synced_data(dispatch_loop); + + aws_condition_variable_notify_all(&dispatch_loop->synced_data.signal); +} + /* Setup a dispatch_queue with a scheduler. */ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( struct aws_allocator *alloc, @@ -269,6 +269,16 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( loop->impl_data = dispatch_loop; dispatch_loop->base_loop = loop; dispatch_loop->base_elg = options->parent_elg; + dispatch_loop->synced_data.execution_state = AWS_DLES_SUSPENDED; + aws_ref_count_init(&dispatch_loop->ref_count, dispatch_loop, s_dispatch_event_loop_on_zero_ref_count); + + if (aws_condition_variable_init(&dispatch_loop->synced_data.signal)) { + goto clean_up; + } + + if (aws_mutex_init(&dispatch_loop->synced_data.synced_data_lock)) { + goto clean_up; + } aws_thread_increment_unjoined_count(); @@ -296,12 +306,8 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( AWS_LOGF_INFO( AWS_LS_IO_EVENT_LOOP, "id=%p: Apple dispatch queue created with id: %s", (void *)loop, dispatch_queue_id); - if (aws_mutex_init(&dispatch_loop->synced_data.synced_data_lock)) { - goto clean_up; - } - /* The dispatch queue is suspended at this point. */ - dispatch_loop->synced_data.suspended = true; + // dispatch_loop->synced_data.suspended = true; dispatch_loop->synced_data.is_executing = false; if (aws_task_scheduler_init(&dispatch_loop->scheduler, alloc)) { @@ -335,7 +341,13 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( */ dispatch_resume(dispatch_loop->dispatch_queue); } - s_dispatch_event_loop_destroy(loop); + + /* + * We intentionally bypass the ref-count-initiated destruction and go directly to the final destroy here. + * The ref-counting mechanism is only for event loops that are successfully created (and thus get destroyed + * by _start_destroy -> _complete_destroy) + */ + s_dispatch_event_loop_final_destroy(loop); } else { aws_mem_release(alloc, loop); } @@ -349,21 +361,6 @@ static void s_dispatch_queue_destroy_task(void *context) { s_lock_synced_data(dispatch_loop); dispatch_loop->synced_data.current_thread_id = aws_thread_current_thread_id(); dispatch_loop->synced_data.is_executing = true; - - /* - * Because this task was scheudled on the dispatch queue using `dispatch_async_and_wait_f()` we are certain that - * any scheduled iterations will occur AFTER this point and it is safe to NULL the dispatch_queue from all iteration - * blocks scheduled to run in the future. - */ - struct aws_array_list *scheduled_iterations_array = &dispatch_loop->synced_data.scheduled_iterations.container; - for (size_t i = 0; i < aws_array_list_length(scheduled_iterations_array); ++i) { - struct scheduled_iteration_entry **entry_ptr = NULL; - aws_array_list_get_at_ptr(scheduled_iterations_array, (void **)&entry_ptr, i); - struct scheduled_iteration_entry *entry = *entry_ptr; - if (entry->dispatch_loop) { - entry->dispatch_loop = NULL; - } - } s_unlock_synced_data(dispatch_loop); AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Cancelling scheduled tasks.", (void *)dispatch_loop->base_loop); @@ -401,37 +398,72 @@ static void s_dispatch_queue_destroy_task(void *context) { dispatch_loop->synced_data.is_executing = false; s_unlock_synced_data(dispatch_loop); - - s_dispatch_event_loop_destroy(dispatch_loop->base_loop); } static void s_start_destroy(struct aws_event_loop *event_loop) { - (void)event_loop; + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting to destroy Dispatch Queue Event Loop", (void *)event_loop); + struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data; + + s_lock_synced_data(dispatch_loop); + AWS_FATAL_ASSERT( + dispatch_loop->synced_data.execution_state == AWS_DLES_RUNNING || + dispatch_loop->synced_data.execution_state == AWS_DLES_SUSPENDED); + if (dispatch_loop->synced_data.execution_state == AWS_DLES_SUSPENDED) { + dispatch_resume(dispatch_loop->dispatch_queue); + } + dispatch_loop->synced_data.execution_state = AWS_DLES_SHUTTING_DOWN; + s_unlock_synced_data(dispatch_loop); +} + +static bool s_wait_for_terminated_state(void *user_data) { + struct aws_dispatch_loop *dispatch_loop = user_data; + + return dispatch_loop->synced_data.execution_state == AWS_DLES_TERMINATED; } static void s_complete_destroy(struct aws_event_loop *event_loop) { - AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying Dispatch Queue Event Loop", (void *)event_loop); + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, "id=%p: Completing destruction of Dispatch Queue Event Loop", (void *)event_loop); struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data; - /* make sure the loop is running so we can schedule a last task. */ - s_run(event_loop); + // This would be deadlock + AWS_FATAL_ASSERT(!aws_event_loop_thread_is_callers_thread(event_loop)); /* * `dispatch_async_and_wait_f()` schedules a block to execute in FIFO order on Apple's dispatch queue and waits * for it to complete before moving on. * - * Any block that is currently running or already scheduled on the dispatch queue will be completed before + * Any block that is currently running on the dispatch queue will be completed before * `s_dispatch_queue_destroy_task()` block is executed. * * `s_dispatch_queue_destroy_task()` will cancel outstanding tasks that have already been scheduled to the task - * scheduler and then iterate through cross thread tasks before finally running `s_dispatch_event_loop_destroy()` - * which will clean up both aws_event_loop and aws_dispatch_loop from memory. + * scheduler and then iterate through cross thread tasks. * - * It is possible that there are scheduled_iterations that are be queued to run s_run_iteration() up to 1 second - * AFTER s_dispatch_queue_destroy_task() has executued. Any iteration blocks scheduled to run in the future will - * keep Apple's dispatch queue alive until the blocks complete. + * It is possible that there are scheduled_iterations that are be queued to run s_run_iteration() + * AFTER s_dispatch_queue_destroy_task() has executed. Any iteration blocks scheduled to run in the future will + * keep Apple's dispatch queue alive until the blocks complete. Because the dispatch loop is in the SHUTTING_DOWN + * state, these iterations will not do anything. */ dispatch_async_and_wait_f(dispatch_loop->dispatch_queue, dispatch_loop, s_dispatch_queue_destroy_task); + + /* + * This is the release of the initial ref count of 1 that the event loop was created with. + */ + s_dispatch_loop_release(dispatch_loop); + + s_lock_synced_data(dispatch_loop); + aws_condition_variable_wait_pred( + &dispatch_loop->synced_data.signal, + &dispatch_loop->synced_data.synced_data_lock, + s_wait_for_terminated_state, + dispatch_loop); + s_unlock_synced_data(dispatch_loop); + + /* + * We know that all scheduling entries have cleaned up. We can destroy ourselves now. Upon return, the caller + * is guaranteed that all memory related to the event loop has been released, + */ + s_dispatch_event_loop_final_destroy(event_loop); } static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) { @@ -458,10 +490,10 @@ static int s_run(struct aws_event_loop *event_loop) { struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data; s_lock_synced_data(dispatch_loop); - if (dispatch_loop->synced_data.suspended) { + if (dispatch_loop->synced_data.execution_state == AWS_DLES_SUSPENDED) { AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting event-loop thread.", (void *)event_loop); + dispatch_loop->synced_data.execution_state = AWS_DLES_RUNNING; dispatch_resume(dispatch_loop->dispatch_queue); - dispatch_loop->synced_data.suspended = false; s_try_schedule_new_iteration(dispatch_loop, 0); } s_unlock_synced_data(dispatch_loop); @@ -476,8 +508,8 @@ static int s_stop(struct aws_event_loop *event_loop) { struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data; s_lock_synced_data(dispatch_loop); - if (!dispatch_loop->synced_data.suspended) { - dispatch_loop->synced_data.suspended = true; + if (dispatch_loop->synced_data.execution_state == AWS_DLES_RUNNING) { + dispatch_loop->synced_data.execution_state = AWS_DLES_SUSPENDED; AWS_LOGF_INFO( AWS_LS_IO_EVENT_LOOP, "id=%p: Suspending event loop's dispatch queue thread.", (void *)event_loop); @@ -501,32 +533,28 @@ static int s_stop(struct aws_event_loop *event_loop) { static void s_run_iteration(void *service_entry) { struct scheduled_iteration_entry *entry = service_entry; struct aws_dispatch_loop *dispatch_loop = entry->dispatch_loop; + + s_lock_synced_data(dispatch_loop); + + AWS_FATAL_ASSERT(aws_priority_queue_node_is_in_queue(&entry->priority_queue_node)); + aws_priority_queue_remove(&dispatch_loop->synced_data.scheduled_iterations, &entry, &entry->priority_queue_node); + /* - * A scheduled_iteration_entry can have been enqueued by Apple to run AFTER `s_dispatch_queue_destroy_task()` has - * been executed and the `aws_dispatch_loop` and parent `aws_event_loop` have been cleaned up. During the execution - * of `s_dispatch_queue_destroy_task()`, all scheduled_iteration_entry nodes within the `aws_dispatch_loop`'s - * scheduled_iterations will have had their `dispatch_loop` pointer set to NULL. That value is being checked here to - * determine whether this iteration is executing on an Apple dispatch queue that is no longer associated with an - * `aws_dispatch_loop` or an `aws_event_loop`. + * If we're shutting down, then don't do anything. The destroy task handles purging and canceling tasks. + * + * Note that is possible race-wise to end up with execution_state being SUSPENDED here. In that case, just run + * normally. */ - if (entry->dispatch_loop == NULL) { - /* - * If dispatch_loop is NULL both the `aws_dispatch_loop` and `aws_event_loop` have been destroyed and memory - * cleaned up. Destroy the `scheduled_iteration_entry` to not leak memory and end the block to release its - * refcount on Apple's dispatch queue. - */ - s_scheduled_iteration_entry_destroy(entry); - return; + if (entry->dispatch_loop->synced_data.execution_state == AWS_DLES_SHUTTING_DOWN) { + goto done; } - struct aws_linked_list local_cross_thread_tasks; - aws_linked_list_init(&local_cross_thread_tasks); - - s_lock_synced_data(dispatch_loop); dispatch_loop->synced_data.current_thread_id = aws_thread_current_thread_id(); dispatch_loop->synced_data.is_executing = true; // swap the cross-thread tasks into task-local data + struct aws_linked_list local_cross_thread_tasks; + aws_linked_list_init(&local_cross_thread_tasks); aws_linked_list_swap_contents(&dispatch_loop->synced_data.cross_thread_tasks, &local_cross_thread_tasks); // run the full iteration here: local cross-thread tasks @@ -534,7 +562,11 @@ static void s_run_iteration(void *service_entry) { struct aws_linked_list_node *node = aws_linked_list_pop_front(&local_cross_thread_tasks); struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); - /* Timestamp 0 is used to denote "now" tasks */ + /* + * Timestamp 0 is used to denote "now" tasks + * + * Because is_executing is true, no additional entries will be scheduled by these invocations. + */ if (task->timestamp == 0) { aws_task_scheduler_schedule_now(&dispatch_loop->scheduler, task); } else { @@ -558,11 +590,6 @@ static void s_run_iteration(void *service_entry) { dispatch_loop->synced_data.is_executing = false; - /* Remove the entry that's ending its iteration before further scheduling */ - aws_priority_queue_remove(&dispatch_loop->synced_data.scheduled_iterations, &entry, &entry->priority_queue_node); - /* destroy the completed service entry. */ - s_scheduled_iteration_entry_destroy(entry); - bool should_schedule = false; uint64_t should_schedule_at_time = 0; /* @@ -585,7 +612,34 @@ static void s_run_iteration(void *service_entry) { s_try_schedule_new_iteration(dispatch_loop, should_schedule_at_time); } +done: + s_unlock_synced_data(dispatch_loop); + + /* destroy the completed service entry. */ + s_scheduled_iteration_entry_destroy(entry); +} + +/** + * Helper function to check if another scheduled iteration already exists that will handle our needs. + * + * The function should be wrapped with the synced_data_lock to safely access the scheduled_iterations list + */ +static bool s_should_schedule_iteration( + struct aws_priority_queue *scheduled_iterations, + uint64_t proposed_iteration_time) { + if (aws_priority_queue_size(scheduled_iterations) == 0) { + return true; + } + + struct scheduled_iteration_entry **entry_ptr = NULL; + aws_priority_queue_top(scheduled_iterations, (void **)&entry_ptr); + AWS_FATAL_ASSERT(entry_ptr != NULL); + struct scheduled_iteration_entry *entry = *entry_ptr; + AWS_FATAL_ASSERT(entry != NULL); + + /* is the next scheduled iteration later than what we require? */ + return entry->timestamp > proposed_iteration_time; } /** @@ -599,28 +653,30 @@ static void s_run_iteration(void *service_entry) { * aws_dispatch_loop->sycned_data */ static void s_try_schedule_new_iteration(struct aws_dispatch_loop *dispatch_loop, uint64_t timestamp) { - if (dispatch_loop->synced_data.suspended || dispatch_loop->synced_data.is_executing) { - return; - } - - if (!s_should_schedule_iteration(&dispatch_loop->synced_data.scheduled_iterations, timestamp)) { + if (dispatch_loop->synced_data.execution_state == AWS_DLES_SUSPENDED || dispatch_loop->synced_data.is_executing) { return; } - struct scheduled_iteration_entry *entry = s_scheduled_iteration_entry_new(dispatch_loop, timestamp); - aws_priority_queue_push_ref( - &dispatch_loop->synced_data.scheduled_iterations, (void *)&entry, &entry->priority_queue_node); - /** * Apple dispatch queue uses automatic reference counting (ARC). If an iteration is scheduled to run in the future, * the dispatch queue will persist until it is executed. Scheduling a block far into the future will keep the - * dispatch queue alive unnecessarily long, even after aws_event_loop and aws_dispatch_loop have been fully - * destroyed and cleaned up. To mitigate this, we ensure an iteration is scheduled no longer than 1 second in the + * dispatch queue alive unnecessarily long, which blocks event loop group shutdown from completion. + * To mitigate this, we ensure an iteration is scheduled no longer than 1 second in the * future. */ uint64_t now_ns = 0; aws_event_loop_current_clock_time(dispatch_loop->base_loop, &now_ns); uint64_t delta = timestamp > now_ns ? timestamp - now_ns : 0; + delta = aws_min_u64(delta, AWS_DISPATCH_QUEUE_MAX_FUTURE_SERVICE_INTERVAL); + uint64_t clamped_timestamp = now_ns + delta; + + if (!s_should_schedule_iteration(&dispatch_loop->synced_data.scheduled_iterations, clamped_timestamp)) { + return; + } + + struct scheduled_iteration_entry *entry = s_scheduled_iteration_entry_new(dispatch_loop, clamped_timestamp); + aws_priority_queue_push_ref( + &dispatch_loop->synced_data.scheduled_iterations, (void *)&entry, &entry->priority_queue_node); if (delta == 0) { /* @@ -632,14 +688,13 @@ static void s_try_schedule_new_iteration(struct aws_dispatch_loop *dispatch_loop AWS_LS_IO_EVENT_LOOP, "id=%p: Scheduling run iteration on event loop.", (void *)dispatch_loop->base_loop); } else { /* - * If the timestamp is set to execute sometime in the future, we clamp the time to 1 second max, convert the - * time to the format dispatch queue expects, and then schedule `s_run_iteration()` to run in the future using - * `dispatch_after_f()`. `dispatch_after_f()` does not immediately place the block onto the dispatch queue but - * instead obtains a refcount of Apple's dispatch queue and then schedules onto it at the requested time. Any - * blocks scheduled using `dispatch_async_f()` or `dispatch_after_f()` with a closer dispatch time will be - * placed on the dispatch queue and execute in order. + * If the timestamp is set to execute sometime in the future, we clamp the time based on a maximum delta, + * convert the time to the format dispatch queue expects, and then schedule `s_run_iteration()` to run in the + * future using `dispatch_after_f()`. `dispatch_after_f()` does not immediately place the block onto the + * dispatch queue but instead obtains a refcount of Apple's dispatch queue and then schedules onto it at the + * requested time. Any blocks scheduled using `dispatch_async_f()` or `dispatch_after_f()` with a closer + * dispatch time will be placed on the dispatch queue and execute in order. */ - delta = aws_min_u64(delta, AWS_TIMESTAMP_NANOS); dispatch_time_t when = dispatch_time(DISPATCH_TIME_NOW, delta); dispatch_after_f(when, dispatch_loop->dispatch_queue, entry, s_run_iteration); AWS_LOGF_TRACE( diff --git a/source/darwin/dispatch_queue_event_loop_private.h b/source/darwin/dispatch_queue_event_loop_private.h index 2ff102ac5..c1d702bfe 100644 --- a/source/darwin/dispatch_queue_event_loop_private.h +++ b/source/darwin/dispatch_queue_event_loop_private.h @@ -6,6 +6,7 @@ */ #include +#include #include #include #include @@ -35,6 +36,13 @@ struct aws_dispatch_loop { */ struct aws_mutex synced_data_lock; + /* + * Allows blocking waits for changes in synced data state. Currently used by the external destruction process + * to wait for the loop to enter the TERMINATED state. It is acceptable to do a blocking wait because + * event loop group destruction is done in a dedicated thread spawned only for that purpose. + */ + struct aws_condition_variable signal; + /* * `is_executing` flag and `current_thread_id` are used together to identify the thread id of the dispatch queue * running the current block. See dispatch queue's `s_is_on_callers_thread()` implementation for details. @@ -51,7 +59,6 @@ struct aws_dispatch_loop { * * Calling dispatch_sync() on a suspended dispatch queue will deadlock. */ - bool suspended; enum aws_dispatch_loop_execution_state execution_state; struct aws_linked_list cross_thread_tasks; @@ -63,6 +70,7 @@ struct aws_dispatch_loop { * When we schedule a new run iteration, scheduled_iterations is checked to see if the scheduling attempt is * redundant. */ + // TODO: this can be a linked list struct aws_priority_queue scheduled_iterations; } synced_data; }; From 6aa2ee98a24d7e58897bec34c13002e864cfd129 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Wed, 12 Feb 2025 10:05:57 -0800 Subject: [PATCH 4/9] Updates --- source/darwin/dispatch_queue_event_loop.c | 64 ++++++++--------------- 1 file changed, 22 insertions(+), 42 deletions(-) diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 8aa72453f..357fafa70 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -194,8 +194,6 @@ static void s_scheduled_iteration_entry_destroy(struct scheduled_iteration_entry static void s_dispatch_event_loop_final_destroy(struct aws_event_loop *event_loop) { struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data; - // The scheduler should be cleaned up and zeroed out in s_dispatch_queue_destroy_task. - // Double-check here in case the destroy function is not called or event loop initialization failed. if (aws_task_scheduler_is_valid(&dispatch_loop->scheduler)) { aws_task_scheduler_clean_up(&dispatch_loop->scheduler); } @@ -354,7 +352,7 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( return NULL; } -static void s_dispatch_queue_destroy_task(void *context) { +static void s_dispatch_queue_purge_cross_thread_tasks(void *context) { struct aws_dispatch_loop *dispatch_loop = context; AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)dispatch_loop->base_loop); @@ -367,35 +365,29 @@ static void s_dispatch_queue_destroy_task(void *context) { /* Cancel all tasks currently scheduled in the task scheduler. */ aws_task_scheduler_clean_up(&dispatch_loop->scheduler); - /* - * Swap tasks from cross_thread_tasks into local_cross_thread_tasks to cancel them as well as the tasks already - * in the scheduler. - */ struct aws_linked_list local_cross_thread_tasks; aws_linked_list_init(&local_cross_thread_tasks); - s_lock_synced_data(dispatch_loop); -populate_local_cross_thread_tasks: - aws_linked_list_swap_contents(&dispatch_loop->synced_data.cross_thread_tasks, &local_cross_thread_tasks); - s_unlock_synced_data(dispatch_loop); + bool done = false; + while (!done) { + /* Swap tasks from cross_thread_tasks into local_cross_thread_tasks to cancel them. */ + s_lock_synced_data(dispatch_loop); + aws_linked_list_swap_contents(&dispatch_loop->synced_data.cross_thread_tasks, &local_cross_thread_tasks); + s_unlock_synced_data(dispatch_loop); - /* Cancel all tasks that were in cross_thread_tasks */ - while (!aws_linked_list_empty(&local_cross_thread_tasks)) { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&local_cross_thread_tasks); - struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); - task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); - } - - s_lock_synced_data(dispatch_loop); + if (aws_linked_list_empty(&local_cross_thread_tasks)) { + done = true; + } - /* - * Check if more cross thread tasks have been added since cancelling existing tasks. If there were, we must run - * them with AWS_TASK_STATUS_CANCELED as well before moving on with cleanup and destruction. - */ - if (!aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks)) { - goto populate_local_cross_thread_tasks; + /* Cancel all tasks that were in cross_thread_tasks */ + while (!aws_linked_list_empty(&local_cross_thread_tasks)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&local_cross_thread_tasks); + struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); + task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); + } } + s_lock_synced_data(dispatch_loop); dispatch_loop->synced_data.is_executing = false; s_unlock_synced_data(dispatch_loop); } @@ -429,23 +421,6 @@ static void s_complete_destroy(struct aws_event_loop *event_loop) { // This would be deadlock AWS_FATAL_ASSERT(!aws_event_loop_thread_is_callers_thread(event_loop)); - /* - * `dispatch_async_and_wait_f()` schedules a block to execute in FIFO order on Apple's dispatch queue and waits - * for it to complete before moving on. - * - * Any block that is currently running on the dispatch queue will be completed before - * `s_dispatch_queue_destroy_task()` block is executed. - * - * `s_dispatch_queue_destroy_task()` will cancel outstanding tasks that have already been scheduled to the task - * scheduler and then iterate through cross thread tasks. - * - * It is possible that there are scheduled_iterations that are be queued to run s_run_iteration() - * AFTER s_dispatch_queue_destroy_task() has executed. Any iteration blocks scheduled to run in the future will - * keep Apple's dispatch queue alive until the blocks complete. Because the dispatch loop is in the SHUTTING_DOWN - * state, these iterations will not do anything. - */ - dispatch_async_and_wait_f(dispatch_loop->dispatch_queue, dispatch_loop, s_dispatch_queue_destroy_task); - /* * This is the release of the initial ref count of 1 that the event loop was created with. */ @@ -459,6 +434,11 @@ static void s_complete_destroy(struct aws_event_loop *event_loop) { dispatch_loop); s_unlock_synced_data(dispatch_loop); + /* + * There are no more references to the dispatch loop anywhere. Purge any remaining cross thread tasks. + */ + s_dispatch_queue_purge_cross_thread_tasks(dispatch_loop); + /* * We know that all scheduling entries have cleaned up. We can destroy ourselves now. Upon return, the caller * is guaranteed that all memory related to the event loop has been released, From cd41c8fb84ee1867b605b040ecf84cb2a627b288 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Wed, 12 Feb 2025 14:06:30 -0800 Subject: [PATCH 5/9] Doc updates --- include/aws/io/event_loop.h | 10 ++++++++-- source/darwin/dispatch_queue_event_loop.c | 8 +++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/include/aws/io/event_loop.h b/include/aws/io/event_loop.h index 11d28ddae..61421bf4b 100644 --- a/include/aws/io/event_loop.h +++ b/include/aws/io/event_loop.h @@ -247,17 +247,21 @@ void aws_event_loop_clean_up_base(struct aws_event_loop *event_loop); /** * @internal - Don't use outside of testing. * - * Invokes the destroy() fn for the event loop implementation. + * Destroys an event loop implementation. * If the event loop is still in a running state, this function will block waiting on the event loop to shutdown. - * If you do not want this function to block, call aws_event_loop_stop() manually first. * If the event loop is shared by multiple threads then destroy must be called by exactly one thread. All other threads * must ensure their API calls to the event loop happen-before the call to destroy. + * + * Internally, this calls aws_event_loop_start_destroy() followed by aws_event_loop_complete_destroy() */ AWS_IO_API void aws_event_loop_destroy(struct aws_event_loop *event_loop); /** * @internal + * + * Signals an event loop to begin its destruction process. If an event loop's implementation of this API does anything, + * it must be quick and non-blocking. Most event loop implementations have an empty implementation for this function. */ AWS_IO_API void aws_event_loop_start_destroy(struct aws_event_loop *event_loop); @@ -265,6 +269,8 @@ void aws_event_loop_start_destroy(struct aws_event_loop *event_loop); /** * @internal * + * Waits for an event loop to complete its destruction process. aws_event_loop_start_destroy() must have been called + * previously for this function to not deadlock. */ AWS_IO_API void aws_event_loop_complete_destroy(struct aws_event_loop *event_loop); diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 357fafa70..5445285e0 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -305,7 +305,6 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( AWS_LS_IO_EVENT_LOOP, "id=%p: Apple dispatch queue created with id: %s", (void *)loop, dispatch_queue_id); /* The dispatch queue is suspended at this point. */ - // dispatch_loop->synced_data.suspended = true; dispatch_loop->synced_data.is_executing = false; if (aws_task_scheduler_init(&dispatch_loop->scheduler, alloc)) { @@ -397,10 +396,9 @@ static void s_start_destroy(struct aws_event_loop *event_loop) { struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data; s_lock_synced_data(dispatch_loop); - AWS_FATAL_ASSERT( - dispatch_loop->synced_data.execution_state == AWS_DLES_RUNNING || - dispatch_loop->synced_data.execution_state == AWS_DLES_SUSPENDED); - if (dispatch_loop->synced_data.execution_state == AWS_DLES_SUSPENDED) { + enum aws_dispatch_queue_execution_state execution_state = dispatch_loop->synced_data.execution_state; + AWS_FATAL_ASSERT(execution_state == AWS_DLES_RUNNING || execution_state == AWS_DLES_SUSPENDED); + if (execution_state == AWS_DLES_SUSPENDED) { dispatch_resume(dispatch_loop->dispatch_queue); } dispatch_loop->synced_data.execution_state = AWS_DLES_SHUTTING_DOWN; From f81423f84edff5f2ac9d1268ba15aacfdd9d00b4 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Wed, 12 Feb 2025 14:14:06 -0800 Subject: [PATCH 6/9] Oops --- source/darwin/dispatch_queue_event_loop.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 5445285e0..29e7c7b3b 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -396,7 +396,7 @@ static void s_start_destroy(struct aws_event_loop *event_loop) { struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data; s_lock_synced_data(dispatch_loop); - enum aws_dispatch_queue_execution_state execution_state = dispatch_loop->synced_data.execution_state; + enum aws_dispatch_loop_execution_state execution_state = dispatch_loop->synced_data.execution_state; AWS_FATAL_ASSERT(execution_state == AWS_DLES_RUNNING || execution_state == AWS_DLES_SUSPENDED); if (execution_state == AWS_DLES_SUSPENDED) { dispatch_resume(dispatch_loop->dispatch_queue); From bb254fc3b28511b8b260f3df43cd26cf67b2bf8d Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 13 Feb 2025 12:18:00 -0800 Subject: [PATCH 7/9] More accurate culling check for try_schedule_new_iteration --- source/darwin/dispatch_queue_event_loop.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 29e7c7b3b..37eeeed8c 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -631,7 +631,7 @@ static bool s_should_schedule_iteration( * aws_dispatch_loop->sycned_data */ static void s_try_schedule_new_iteration(struct aws_dispatch_loop *dispatch_loop, uint64_t timestamp) { - if (dispatch_loop->synced_data.execution_state == AWS_DLES_SUSPENDED || dispatch_loop->synced_data.is_executing) { + if (dispatch_loop->synced_data.execution_state != AWS_DLES_RUNNING) { return; } From cbdf28ec02e5ecfddf1088518e158ed448ddaf9d Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 13 Feb 2025 13:33:07 -0800 Subject: [PATCH 8/9] Oops that was dumb --- source/darwin/dispatch_queue_event_loop.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 37eeeed8c..116ff4820 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -631,7 +631,7 @@ static bool s_should_schedule_iteration( * aws_dispatch_loop->sycned_data */ static void s_try_schedule_new_iteration(struct aws_dispatch_loop *dispatch_loop, uint64_t timestamp) { - if (dispatch_loop->synced_data.execution_state != AWS_DLES_RUNNING) { + if (dispatch_loop->synced_data.execution_state != AWS_DLES_RUNNING || dispatch_loop->synced_data.is_executing) { return; } From 2d876028b252411b2d190509bc44695c847b3489 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Tue, 18 Feb 2025 09:07:08 -0800 Subject: [PATCH 9/9] trival comment fix --- source/darwin/dispatch_queue_event_loop.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 116ff4820..356c48f25 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -639,8 +639,8 @@ static void s_try_schedule_new_iteration(struct aws_dispatch_loop *dispatch_loop * Apple dispatch queue uses automatic reference counting (ARC). If an iteration is scheduled to run in the future, * the dispatch queue will persist until it is executed. Scheduling a block far into the future will keep the * dispatch queue alive unnecessarily long, which blocks event loop group shutdown from completion. - * To mitigate this, we ensure an iteration is scheduled no longer than 1 second in the - * future. + * To mitigate this, we ensure an iteration is scheduled no longer than + * AWS_DISPATCH_QUEUE_MAX_FUTURE_SERVICE_INTERVAL second in the future. */ uint64_t now_ns = 0; aws_event_loop_current_clock_time(dispatch_loop->base_loop, &now_ns);