diff --git a/iree/hal/cts/semaphore_test.cc b/iree/hal/cts/semaphore_test.cc index d2a31fa4a885..33eae1d56e8f 100644 --- a/iree/hal/cts/semaphore_test.cc +++ b/iree/hal/cts/semaphore_test.cc @@ -245,39 +245,44 @@ TEST_P(SemaphoreTest, SubmitWithNoCommandBuffers) { } TEST_P(SemaphoreTest, SubmitAndSignal) { - iree_hal_command_buffer_t* command_buffer; - IREE_ASSERT_OK(iree_hal_command_buffer_create( - device_, IREE_HAL_COMMAND_BUFFER_MODE_ONE_SHOT, - IREE_HAL_COMMAND_CATEGORY_DISPATCH, &command_buffer)); - - IREE_ASSERT_OK(iree_hal_command_buffer_begin(command_buffer)); - IREE_ASSERT_OK(iree_hal_command_buffer_end(command_buffer)); - - // No waits, one signal which we immediately wait on after submit. - iree_hal_submission_batch_t submission_batch; - submission_batch.wait_semaphores.count = 0; - submission_batch.wait_semaphores.semaphores = NULL; - submission_batch.wait_semaphores.payload_values = NULL; - submission_batch.command_buffer_count = 1; - submission_batch.command_buffers = &command_buffer; - iree_hal_semaphore_t* signal_semaphore; - IREE_ASSERT_OK(iree_hal_semaphore_create(device_, 0ull, &signal_semaphore)); - iree_hal_semaphore_t* signal_semaphore_ptrs[] = {signal_semaphore}; - submission_batch.signal_semaphores.count = - IREE_ARRAYSIZE(signal_semaphore_ptrs); - submission_batch.signal_semaphores.semaphores = signal_semaphore_ptrs; - uint64_t payload_values[] = {1ull}; - submission_batch.signal_semaphores.payload_values = payload_values; - - IREE_ASSERT_OK( - iree_hal_device_queue_submit(device_, IREE_HAL_COMMAND_CATEGORY_DISPATCH, - /*queue_affinity=*/0, - /*batch_count=*/1, &submission_batch)); - IREE_ASSERT_OK(iree_hal_semaphore_wait_with_deadline( - signal_semaphore, 1ull, IREE_TIME_INFINITE_FUTURE)); - - iree_hal_command_buffer_release(command_buffer); - iree_hal_semaphore_release(signal_semaphore); + for (int i = 0; i < 100000; ++i) { + iree_hal_command_buffer_t* command_buffer; + IREE_ASSERT_OK(iree_hal_command_buffer_create( + device_, IREE_HAL_COMMAND_BUFFER_MODE_ONE_SHOT, + IREE_HAL_COMMAND_CATEGORY_DISPATCH, &command_buffer)); + + IREE_ASSERT_OK(iree_hal_command_buffer_begin(command_buffer)); + IREE_ASSERT_OK(iree_hal_command_buffer_end(command_buffer)); + + // No waits, one signal which we immediately wait on after submit. + iree_hal_submission_batch_t submission_batch; + submission_batch.wait_semaphores.count = 0; + submission_batch.wait_semaphores.semaphores = NULL; + submission_batch.wait_semaphores.payload_values = NULL; + submission_batch.command_buffer_count = 1; + submission_batch.command_buffers = &command_buffer; + iree_hal_semaphore_t* signal_semaphore; + IREE_ASSERT_OK(iree_hal_semaphore_create(device_, 0ull, &signal_semaphore)); + iree_hal_semaphore_t* signal_semaphore_ptrs[] = {signal_semaphore}; + submission_batch.signal_semaphores.count = + IREE_ARRAYSIZE(signal_semaphore_ptrs); + submission_batch.signal_semaphores.semaphores = signal_semaphore_ptrs; + uint64_t payload_values[] = {1ull}; + submission_batch.signal_semaphores.payload_values = payload_values; + + IREE_ASSERT_OK(iree_hal_device_queue_submit( + device_, IREE_HAL_COMMAND_CATEGORY_DISPATCH, + /*queue_affinity=*/0, + /*batch_count=*/1, &submission_batch)); + + IREE_ASSERT_OK(iree_hal_device_wait_idle_with_deadline( + device_, IREE_TIME_INFINITE_FUTURE)); + IREE_ASSERT_OK(iree_hal_semaphore_wait_with_timeout(signal_semaphore, 1ull, + 1000000000)); + + iree_hal_command_buffer_release(command_buffer); + iree_hal_semaphore_release(signal_semaphore); + } } TEST_P(SemaphoreTest, SubmitWithWait) { diff --git a/iree/hal/local/task_command_buffer.c b/iree/hal/local/task_command_buffer.c index 589064ea059c..baf9e42d94e3 100644 --- a/iree/hal/local/task_command_buffer.c +++ b/iree/hal/local/task_command_buffer.c @@ -775,6 +775,7 @@ static iree_status_t iree_hal_task_command_buffer_build_dispatch( iree_task_make_dispatch_closure( iree_hal_cmd_dispatch_tile, (uintptr_t)cmd), workgroup_size, workgroup_count, &cmd->task); + cmd->task.header.flags |= IREE_TASK_FLAG_DISPATCH_SLICED; // Copy only the push constant range used by the executable. uint8_t* cmd_ptr = (uint8_t*)cmd + sizeof(*cmd); diff --git a/iree/task/executor.c b/iree/task/executor.c index a4d6aa665d0d..9976fc40e79c 100644 --- a/iree/task/executor.c +++ b/iree/task/executor.c @@ -57,6 +57,8 @@ iree_status_t iree_task_executor_create( executor->scheduling_mode = scheduling_mode; iree_atomic_task_slist_initialize(&executor->incoming_ready_slist); iree_atomic_task_slist_initialize(&executor->incoming_waiting_slist); + iree_atomic_store_int32(&executor->submission_epoch, 0, + iree_memory_order_seq_cst); iree_slim_mutex_initialize(&executor->coordinator_mutex); iree_slim_mutex_initialize(&executor->wait_mutex); @@ -291,6 +293,10 @@ void iree_task_executor_merge_submission(iree_task_executor_t* executor, // be modified by other threads. We can no longer assume anything about the // submission lists and can only discard them. iree_task_submission_reset(submission); + + // Bump submission epoch indicating that the executor state has changed. + iree_atomic_fetch_add_int32(&executor->submission_epoch, 1, + iree_memory_order_release); } void iree_task_executor_submit(iree_task_executor_t* executor, @@ -533,8 +539,10 @@ void iree_task_executor_coordinate(iree_task_executor_t* executor, // We may be adding tasks/waiting/etc on each pass through coordination - to // ensure we completely drain the incoming queues and satisfied waits we loop // until there's nothing left to coordinate. - bool schedule_dirty = true; - do { + while (true) { + int32_t current_submission_epoch = iree_atomic_load_int32( + &executor->submission_epoch, iree_memory_order_acquire); + // Check for incoming submissions and move their posted tasks into our // local lists. Any of the tasks here are ready to execute immediately and // ones we should be able to distribute to workers without delay. The @@ -606,12 +614,16 @@ void iree_task_executor_coordinate(iree_task_executor_t* executor, // this. if (!iree_task_submission_is_empty(&pending_submission)) { iree_task_executor_merge_submission(executor, &pending_submission); - schedule_dirty = true; - } else { - schedule_dirty = false; } - } while (schedule_dirty); + if (current_submission_epoch == + iree_atomic_load_int32(&executor->submission_epoch, + iree_memory_order_acquire)) { + break; + } + } + + IREE_ASSERT(!executor->incoming_ready_slist.impl.head); iree_slim_mutex_unlock(&executor->coordinator_mutex); IREE_TRACE_ZONE_END(z0); } diff --git a/iree/task/executor_impl.h b/iree/task/executor_impl.h index dbb17c57a4ff..18d338a23e6e 100644 --- a/iree/task/executor_impl.h +++ b/iree/task/executor_impl.h @@ -71,6 +71,9 @@ struct iree_task_executor_s { // A list of incoming wait tasks that need to be waited on. Order doesn't // really matter here as all tasks will be waited on simultaneously. iree_atomic_task_slist_t incoming_waiting_slist; + // Incremented each time new work is submitted to the executor. Used for + // change detection. + iree_atomic_int32_t submission_epoch; // Guards coordination logic; only one thread at a time may be acting as the // coordinator. diff --git a/iree/task/task.c b/iree/task/task.c index 28e600305c4a..419549080736 100644 --- a/iree/task/task.c +++ b/iree/task/task.c @@ -445,11 +445,11 @@ void iree_task_dispatch_issue_sliced(iree_task_dispatch_t* dispatch_task, workgroup_base[2] = slice_z * tiles_per_slice_z; uint32_t workgroup_range[3]; workgroup_range[0] = iree_min( - workgroup_count[0], workgroup_base[0] + tiles_per_slice_x - 1); + workgroup_count[0] - 1, workgroup_base[0] + tiles_per_slice_x - 1); workgroup_range[1] = iree_min( - workgroup_count[1], workgroup_base[1] + tiles_per_slice_y - 1); + workgroup_count[1] - 1, workgroup_base[1] + tiles_per_slice_y - 1); workgroup_range[2] = iree_min( - workgroup_count[2], workgroup_base[2] + tiles_per_slice_z - 1); + workgroup_count[2] - 1, workgroup_base[2] + tiles_per_slice_z - 1); // Allocate and initialize the slice. iree_task_dispatch_slice_t* slice_task = diff --git a/iree/task/worker.c b/iree/task/worker.c index eb8beb78c058..7b59b3884343 100644 --- a/iree/task/worker.c +++ b/iree/task/worker.c @@ -166,8 +166,9 @@ iree_task_t* iree_task_worker_try_steal_task(iree_task_worker_t* worker, // Executes a task on a worker. // Only task types that are scheduled to workers are handled; all others must be // handled by the coordinator during scheduling. -static iree_status_t iree_task_worker_execute(iree_task_worker_t* worker, - iree_task_t* task) { +static iree_status_t iree_task_worker_execute( + iree_task_worker_t* worker, iree_task_t* task, + iree_task_submission_t* pending_submission) { // Execute the task and resolve the task and gather any tasks that are now // ready for submission to the executor. They'll be scheduled the next time // the coordinator runs. @@ -176,22 +177,20 @@ static iree_status_t iree_task_worker_execute(iree_task_worker_t* worker, // BFS behavior at the cost of the additional merge overhead - it's probably // worth it? // TODO(benvanik): handle partial tasks and re-queuing. - iree_task_submission_t pending_submission; - iree_task_submission_initialize(&pending_submission); switch (task->type) { case IREE_TASK_TYPE_CALL: { IREE_RETURN_IF_ERROR( - iree_task_call_execute((iree_task_call_t*)task, &pending_submission)); + iree_task_call_execute((iree_task_call_t*)task, pending_submission)); break; } case IREE_TASK_TYPE_DISPATCH_SLICE: { IREE_RETURN_IF_ERROR(iree_task_dispatch_slice_execute( - (iree_task_dispatch_slice_t*)task, &pending_submission)); + (iree_task_dispatch_slice_t*)task, pending_submission)); break; } case IREE_TASK_TYPE_DISPATCH_SHARD: { IREE_RETURN_IF_ERROR(iree_task_dispatch_shard_execute( - (iree_task_dispatch_shard_t*)task, &pending_submission)); + (iree_task_dispatch_shard_t*)task, pending_submission)); break; } default: @@ -202,16 +201,14 @@ static iree_status_t iree_task_worker_execute(iree_task_worker_t* worker, // NOTE: task is invalidated here! task = NULL; - if (!iree_task_submission_is_empty(&pending_submission)) { - iree_task_executor_merge_submission(worker->executor, &pending_submission); - } return iree_ok_status(); } // Pumps the worker thread once, processing a single task. // Returns true if pumping should continue as there are more tasks remaining or // false if the caller should wait for more tasks to be posted. -static bool iree_task_worker_pump_once(iree_task_worker_t* worker) { +static bool iree_task_worker_pump_once( + iree_task_worker_t* worker, iree_task_submission_t* pending_submission) { IREE_TRACE_ZONE_BEGIN(z0); // Check the local work queue for any work we know we should start @@ -252,7 +249,8 @@ static bool iree_task_worker_pump_once(iree_task_worker_t* worker) { // Execute the task (may call out to arbitrary user code and may submit more // tasks for execution). - iree_status_t status = iree_task_worker_execute(worker, task); + iree_status_t status = + iree_task_worker_execute(worker, task, pending_submission); // TODO(#4026): propagate failure to task scope. // We currently drop the error on the floor here; that's because the error @@ -294,10 +292,20 @@ static void iree_task_worker_pump_until_exit(iree_task_worker_t* worker) { ~worker->worker_bit, iree_memory_order_seq_cst); - while (iree_task_worker_pump_once(worker)) { + iree_task_submission_t pending_submission; + iree_task_submission_initialize(&pending_submission); + + while (iree_task_worker_pump_once(worker, &pending_submission)) { // All work done ^, which will return false when the worker should wait. } + bool retry = false; + if (!iree_task_submission_is_empty(&pending_submission)) { + iree_task_executor_merge_submission(worker->executor, + &pending_submission); + retry = true; + } + // When we encounter a complete lack of work we can self-nominate to check // the global work queue and distribute work to other threads. Only one // coordinator can be running at a time so we also ensure that if another @@ -309,18 +317,22 @@ static void iree_task_worker_pump_until_exit(iree_task_worker_t* worker) { iree_task_executor_coordinate(worker->executor, worker, /*speculative=*/true); + // We've finished all the work we have scheduled so set our idle flag. This + // ensures that if any other thread comes in and wants to give us work we + // will properly coordinate/wake below. + iree_atomic_task_affinity_set_fetch_or(&worker->executor->worker_idle_mask, + worker->worker_bit, + iree_memory_order_seq_cst); + // If nothing has been enqueued since we started this loop (so even // coordination didn't find anything) we go idle. Otherwise we fall // through and try the loop again. - if (!iree_task_queue_is_empty(&worker->local_task_queue)) { + if (retry || !iree_task_queue_is_empty(&worker->local_task_queue)) { // Have more work to do; loop around to try another pump. iree_notification_cancel_wait(&worker->wake_notification); } else { IREE_TRACE_ZONE_BEGIN_NAMED(z_wait, "iree_task_worker_main_pump_wake_wait"); - iree_atomic_task_affinity_set_fetch_or( - &worker->executor->worker_idle_mask, worker->worker_bit, - iree_memory_order_seq_cst); iree_notification_commit_wait(&worker->wake_notification, wait_token); IREE_TRACE_ZONE_END(z_wait); }