Skip to content

Commit

Permalink
TRY SLICED
Browse files Browse the repository at this point in the history
  • Loading branch information
benvanik committed Jan 24, 2021
1 parent f6e4c74 commit e0a1397
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 59 deletions.
71 changes: 38 additions & 33 deletions iree/hal/cts/semaphore_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,39 +245,44 @@ TEST_P(SemaphoreTest, SubmitWithNoCommandBuffers) {
}

TEST_P(SemaphoreTest, SubmitAndSignal) {
iree_hal_command_buffer_t* command_buffer;
IREE_ASSERT_OK(iree_hal_command_buffer_create(
device_, IREE_HAL_COMMAND_BUFFER_MODE_ONE_SHOT,
IREE_HAL_COMMAND_CATEGORY_DISPATCH, &command_buffer));

IREE_ASSERT_OK(iree_hal_command_buffer_begin(command_buffer));
IREE_ASSERT_OK(iree_hal_command_buffer_end(command_buffer));

// No waits, one signal which we immediately wait on after submit.
iree_hal_submission_batch_t submission_batch;
submission_batch.wait_semaphores.count = 0;
submission_batch.wait_semaphores.semaphores = NULL;
submission_batch.wait_semaphores.payload_values = NULL;
submission_batch.command_buffer_count = 1;
submission_batch.command_buffers = &command_buffer;
iree_hal_semaphore_t* signal_semaphore;
IREE_ASSERT_OK(iree_hal_semaphore_create(device_, 0ull, &signal_semaphore));
iree_hal_semaphore_t* signal_semaphore_ptrs[] = {signal_semaphore};
submission_batch.signal_semaphores.count =
IREE_ARRAYSIZE(signal_semaphore_ptrs);
submission_batch.signal_semaphores.semaphores = signal_semaphore_ptrs;
uint64_t payload_values[] = {1ull};
submission_batch.signal_semaphores.payload_values = payload_values;

IREE_ASSERT_OK(
iree_hal_device_queue_submit(device_, IREE_HAL_COMMAND_CATEGORY_DISPATCH,
/*queue_affinity=*/0,
/*batch_count=*/1, &submission_batch));
IREE_ASSERT_OK(iree_hal_semaphore_wait_with_deadline(
signal_semaphore, 1ull, IREE_TIME_INFINITE_FUTURE));

iree_hal_command_buffer_release(command_buffer);
iree_hal_semaphore_release(signal_semaphore);
for (int i = 0; i < 100000; ++i) {
iree_hal_command_buffer_t* command_buffer;
IREE_ASSERT_OK(iree_hal_command_buffer_create(
device_, IREE_HAL_COMMAND_BUFFER_MODE_ONE_SHOT,
IREE_HAL_COMMAND_CATEGORY_DISPATCH, &command_buffer));

IREE_ASSERT_OK(iree_hal_command_buffer_begin(command_buffer));
IREE_ASSERT_OK(iree_hal_command_buffer_end(command_buffer));

// No waits, one signal which we immediately wait on after submit.
iree_hal_submission_batch_t submission_batch;
submission_batch.wait_semaphores.count = 0;
submission_batch.wait_semaphores.semaphores = NULL;
submission_batch.wait_semaphores.payload_values = NULL;
submission_batch.command_buffer_count = 1;
submission_batch.command_buffers = &command_buffer;
iree_hal_semaphore_t* signal_semaphore;
IREE_ASSERT_OK(iree_hal_semaphore_create(device_, 0ull, &signal_semaphore));
iree_hal_semaphore_t* signal_semaphore_ptrs[] = {signal_semaphore};
submission_batch.signal_semaphores.count =
IREE_ARRAYSIZE(signal_semaphore_ptrs);
submission_batch.signal_semaphores.semaphores = signal_semaphore_ptrs;
uint64_t payload_values[] = {1ull};
submission_batch.signal_semaphores.payload_values = payload_values;

IREE_ASSERT_OK(iree_hal_device_queue_submit(
device_, IREE_HAL_COMMAND_CATEGORY_DISPATCH,
/*queue_affinity=*/0,
/*batch_count=*/1, &submission_batch));

IREE_ASSERT_OK(iree_hal_device_wait_idle_with_deadline(
device_, IREE_TIME_INFINITE_FUTURE));
IREE_ASSERT_OK(iree_hal_semaphore_wait_with_timeout(signal_semaphore, 1ull,
1000000000));

iree_hal_command_buffer_release(command_buffer);
iree_hal_semaphore_release(signal_semaphore);
}
}

TEST_P(SemaphoreTest, SubmitWithWait) {
Expand Down
1 change: 1 addition & 0 deletions iree/hal/local/task_command_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ static iree_status_t iree_hal_task_command_buffer_build_dispatch(
iree_task_make_dispatch_closure(
iree_hal_cmd_dispatch_tile, (uintptr_t)cmd),
workgroup_size, workgroup_count, &cmd->task);
cmd->task.header.flags |= IREE_TASK_FLAG_DISPATCH_SLICED;

// Copy only the push constant range used by the executable.
uint8_t* cmd_ptr = (uint8_t*)cmd + sizeof(*cmd);
Expand Down
24 changes: 18 additions & 6 deletions iree/task/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ iree_status_t iree_task_executor_create(
executor->scheduling_mode = scheduling_mode;
iree_atomic_task_slist_initialize(&executor->incoming_ready_slist);
iree_atomic_task_slist_initialize(&executor->incoming_waiting_slist);
iree_atomic_store_int32(&executor->submission_epoch, 0,
iree_memory_order_seq_cst);
iree_slim_mutex_initialize(&executor->coordinator_mutex);
iree_slim_mutex_initialize(&executor->wait_mutex);

Expand Down Expand Up @@ -291,6 +293,10 @@ void iree_task_executor_merge_submission(iree_task_executor_t* executor,
// be modified by other threads. We can no longer assume anything about the
// submission lists and can only discard them.
iree_task_submission_reset(submission);

// Bump submission epoch indicating that the executor state has changed.
iree_atomic_fetch_add_int32(&executor->submission_epoch, 1,
iree_memory_order_release);
}

void iree_task_executor_submit(iree_task_executor_t* executor,
Expand Down Expand Up @@ -533,8 +539,10 @@ void iree_task_executor_coordinate(iree_task_executor_t* executor,
// We may be adding tasks/waiting/etc on each pass through coordination - to
// ensure we completely drain the incoming queues and satisfied waits we loop
// until there's nothing left to coordinate.
bool schedule_dirty = true;
do {
while (true) {
int32_t current_submission_epoch = iree_atomic_load_int32(
&executor->submission_epoch, iree_memory_order_acquire);

// Check for incoming submissions and move their posted tasks into our
// local lists. Any of the tasks here are ready to execute immediately and
// ones we should be able to distribute to workers without delay. The
Expand Down Expand Up @@ -606,12 +614,16 @@ void iree_task_executor_coordinate(iree_task_executor_t* executor,
// this.
if (!iree_task_submission_is_empty(&pending_submission)) {
iree_task_executor_merge_submission(executor, &pending_submission);
schedule_dirty = true;
} else {
schedule_dirty = false;
}
} while (schedule_dirty);

if (current_submission_epoch ==
iree_atomic_load_int32(&executor->submission_epoch,
iree_memory_order_acquire)) {
break;
}
}

IREE_ASSERT(!executor->incoming_ready_slist.impl.head);
iree_slim_mutex_unlock(&executor->coordinator_mutex);
IREE_TRACE_ZONE_END(z0);
}
Expand Down
3 changes: 3 additions & 0 deletions iree/task/executor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ struct iree_task_executor_s {
// A list of incoming wait tasks that need to be waited on. Order doesn't
// really matter here as all tasks will be waited on simultaneously.
iree_atomic_task_slist_t incoming_waiting_slist;
// Incremented each time new work is submitted to the executor. Used for
// change detection.
iree_atomic_int32_t submission_epoch;

// Guards coordination logic; only one thread at a time may be acting as the
// coordinator.
Expand Down
6 changes: 3 additions & 3 deletions iree/task/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,11 @@ void iree_task_dispatch_issue_sliced(iree_task_dispatch_t* dispatch_task,
workgroup_base[2] = slice_z * tiles_per_slice_z;
uint32_t workgroup_range[3];
workgroup_range[0] = iree_min(
workgroup_count[0], workgroup_base[0] + tiles_per_slice_x - 1);
workgroup_count[0] - 1, workgroup_base[0] + tiles_per_slice_x - 1);
workgroup_range[1] = iree_min(
workgroup_count[1], workgroup_base[1] + tiles_per_slice_y - 1);
workgroup_count[1] - 1, workgroup_base[1] + tiles_per_slice_y - 1);
workgroup_range[2] = iree_min(
workgroup_count[2], workgroup_base[2] + tiles_per_slice_z - 1);
workgroup_count[2] - 1, workgroup_base[2] + tiles_per_slice_z - 1);

// Allocate and initialize the slice.
iree_task_dispatch_slice_t* slice_task =
Expand Down
46 changes: 29 additions & 17 deletions iree/task/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ iree_task_t* iree_task_worker_try_steal_task(iree_task_worker_t* worker,
// Executes a task on a worker.
// Only task types that are scheduled to workers are handled; all others must be
// handled by the coordinator during scheduling.
static iree_status_t iree_task_worker_execute(iree_task_worker_t* worker,
iree_task_t* task) {
static iree_status_t iree_task_worker_execute(
iree_task_worker_t* worker, iree_task_t* task,
iree_task_submission_t* pending_submission) {
// Execute the task and resolve the task and gather any tasks that are now
// ready for submission to the executor. They'll be scheduled the next time
// the coordinator runs.
Expand All @@ -176,22 +177,20 @@ static iree_status_t iree_task_worker_execute(iree_task_worker_t* worker,
// BFS behavior at the cost of the additional merge overhead - it's probably
// worth it?
// TODO(benvanik): handle partial tasks and re-queuing.
iree_task_submission_t pending_submission;
iree_task_submission_initialize(&pending_submission);
switch (task->type) {
case IREE_TASK_TYPE_CALL: {
IREE_RETURN_IF_ERROR(
iree_task_call_execute((iree_task_call_t*)task, &pending_submission));
iree_task_call_execute((iree_task_call_t*)task, pending_submission));
break;
}
case IREE_TASK_TYPE_DISPATCH_SLICE: {
IREE_RETURN_IF_ERROR(iree_task_dispatch_slice_execute(
(iree_task_dispatch_slice_t*)task, &pending_submission));
(iree_task_dispatch_slice_t*)task, pending_submission));
break;
}
case IREE_TASK_TYPE_DISPATCH_SHARD: {
IREE_RETURN_IF_ERROR(iree_task_dispatch_shard_execute(
(iree_task_dispatch_shard_t*)task, &pending_submission));
(iree_task_dispatch_shard_t*)task, pending_submission));
break;
}
default:
Expand All @@ -202,16 +201,14 @@ static iree_status_t iree_task_worker_execute(iree_task_worker_t* worker,
// NOTE: task is invalidated here!
task = NULL;

if (!iree_task_submission_is_empty(&pending_submission)) {
iree_task_executor_merge_submission(worker->executor, &pending_submission);
}
return iree_ok_status();
}

// Pumps the worker thread once, processing a single task.
// Returns true if pumping should continue as there are more tasks remaining or
// false if the caller should wait for more tasks to be posted.
static bool iree_task_worker_pump_once(iree_task_worker_t* worker) {
static bool iree_task_worker_pump_once(
iree_task_worker_t* worker, iree_task_submission_t* pending_submission) {
IREE_TRACE_ZONE_BEGIN(z0);

// Check the local work queue for any work we know we should start
Expand Down Expand Up @@ -252,7 +249,8 @@ static bool iree_task_worker_pump_once(iree_task_worker_t* worker) {

// Execute the task (may call out to arbitrary user code and may submit more
// tasks for execution).
iree_status_t status = iree_task_worker_execute(worker, task);
iree_status_t status =
iree_task_worker_execute(worker, task, pending_submission);

// TODO(#4026): propagate failure to task scope.
// We currently drop the error on the floor here; that's because the error
Expand Down Expand Up @@ -294,10 +292,20 @@ static void iree_task_worker_pump_until_exit(iree_task_worker_t* worker) {
~worker->worker_bit,
iree_memory_order_seq_cst);

while (iree_task_worker_pump_once(worker)) {
iree_task_submission_t pending_submission;
iree_task_submission_initialize(&pending_submission);

while (iree_task_worker_pump_once(worker, &pending_submission)) {
// All work done ^, which will return false when the worker should wait.
}

bool retry = false;
if (!iree_task_submission_is_empty(&pending_submission)) {
iree_task_executor_merge_submission(worker->executor,
&pending_submission);
retry = true;
}

// When we encounter a complete lack of work we can self-nominate to check
// the global work queue and distribute work to other threads. Only one
// coordinator can be running at a time so we also ensure that if another
Expand All @@ -309,18 +317,22 @@ static void iree_task_worker_pump_until_exit(iree_task_worker_t* worker) {
iree_task_executor_coordinate(worker->executor, worker,
/*speculative=*/true);

// We've finished all the work we have scheduled so set our idle flag. This
// ensures that if any other thread comes in and wants to give us work we
// will properly coordinate/wake below.
iree_atomic_task_affinity_set_fetch_or(&worker->executor->worker_idle_mask,
worker->worker_bit,
iree_memory_order_seq_cst);

// If nothing has been enqueued since we started this loop (so even
// coordination didn't find anything) we go idle. Otherwise we fall
// through and try the loop again.
if (!iree_task_queue_is_empty(&worker->local_task_queue)) {
if (retry || !iree_task_queue_is_empty(&worker->local_task_queue)) {
// Have more work to do; loop around to try another pump.
iree_notification_cancel_wait(&worker->wake_notification);
} else {
IREE_TRACE_ZONE_BEGIN_NAMED(z_wait,
"iree_task_worker_main_pump_wake_wait");
iree_atomic_task_affinity_set_fetch_or(
&worker->executor->worker_idle_mask, worker->worker_bit,
iree_memory_order_seq_cst);
iree_notification_commit_wait(&worker->wake_notification, wait_token);
IREE_TRACE_ZONE_END(z_wait);
}
Expand Down

0 comments on commit e0a1397

Please sign in to comment.