1
0
mirror of https://github.com/godotengine/godot.git synced 2025-11-04 12:00:25 +00:00

WorkerThreadPool: Overhaul scheduling and synchronization

This commits rewrites the sync logic in a way that the
`use_system_threads_for_low_priority_tasks` setting, which was added due to
the lack of a cross-platform wait-for-multiple-objects functionality, can be
removed (it's as if it was effectively hardcoded to `false`).

With the new implementation, we have the best of both worlds: threads don't
have to poll, plus no bespoke threads are used.

In addition, regarding deadlock prevention, since not every possible case of
wait-deadlock could be avoided, this commits removes the current best-effort
avoidance mechanisms and keeps only a simple, pessimistic way of detection.

It turns out that the only current user of deadlock prevention, ResourceLoader,
works fine with it and so every possible situation in resource loading is now
properly handled, with no possibilities of deadlocking. There's a comment in
the code with further details.

Lastly, a potential for load tasks never being awaited/disposed is cleared.
This commit is contained in:
Pedro J. Estébanez
2024-01-05 17:39:26 +01:00
parent fe8c217b7c
commit 9444d297ed
9 changed files with 293 additions and 278 deletions

View File

@@ -43,24 +43,15 @@ void WorkerThreadPool::Task::free_template_userdata() {
WorkerThreadPool *WorkerThreadPool::singleton = nullptr;
void WorkerThreadPool::_process_task_queue() {
task_mutex.lock();
Task *task = task_queue.first()->self();
task_queue.remove(task_queue.first());
task_mutex.unlock();
_process_task(task);
}
void WorkerThreadPool::_process_task(Task *p_task) {
bool low_priority = p_task->low_priority;
int pool_thread_index = -1;
Task *prev_low_prio_task = nullptr; // In case this is recursively called.
int pool_thread_index = thread_ids[Thread::get_caller_id()];
ThreadData &curr_thread = threads[pool_thread_index];
Task *prev_task = nullptr; // In case this is recursively called.
bool safe_for_nodes_backup = is_current_thread_safe_for_nodes();
if (!use_native_low_priority_threads) {
{
// Tasks must start with this unset. They are free to set-and-forget otherwise.
set_current_thread_safe_for_nodes(false);
pool_thread_index = thread_ids[Thread::get_caller_id()];
ThreadData &curr_thread = threads[pool_thread_index];
// Since the WorkerThreadPool is started before the script server,
// its pre-created threads can't have ScriptServer::thread_enter() called on them early.
// Therefore, we do it late at the first opportunity, so in case the task
@@ -71,13 +62,8 @@ void WorkerThreadPool::_process_task(Task *p_task) {
}
task_mutex.lock();
p_task->pool_thread_index = pool_thread_index;
if (low_priority) {
low_priority_tasks_running++;
prev_low_prio_task = curr_thread.current_low_prio_task;
curr_thread.current_low_prio_task = p_task;
} else {
curr_thread.current_low_prio_task = nullptr;
}
prev_task = curr_thread.current_task;
curr_thread.current_task = p_task;
task_mutex.unlock();
}
@@ -111,33 +97,24 @@ void WorkerThreadPool::_process_task(Task *p_task) {
memdelete(p_task->template_userdata); // This is no longer needed at this point, so get rid of it.
}
if (low_priority && use_native_low_priority_threads) {
p_task->completed = true;
p_task->done_semaphore.post();
if (do_post) {
p_task->group->completed.set_to(true);
}
} else {
if (do_post) {
p_task->group->done_semaphore.post();
p_task->group->completed.set_to(true);
}
uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
uint32_t finished_users = p_task->group->finished.increment();
if (finished_users == max_users) {
// Get rid of the group, because nobody else is using it.
task_mutex.lock();
group_allocator.free(p_task->group);
task_mutex.unlock();
}
// For groups, tasks get rid of themselves.
if (do_post) {
p_task->group->done_semaphore.post();
p_task->group->completed.set_to(true);
}
uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
uint32_t finished_users = p_task->group->finished.increment();
if (finished_users == max_users) {
// Get rid of the group, because nobody else is using it.
task_mutex.lock();
task_allocator.free(p_task);
group_allocator.free(p_task->group);
task_mutex.unlock();
}
// For groups, tasks get rid of themselves.
task_mutex.lock();
task_allocator.free(p_task);
} else {
if (p_task->native_func) {
p_task->native_func(p_task->native_func_userdata);
@@ -150,88 +127,162 @@ void WorkerThreadPool::_process_task(Task *p_task) {
task_mutex.lock();
p_task->completed = true;
for (uint8_t i = 0; i < p_task->waiting; i++) {
p_task->done_semaphore.post();
p_task->pool_thread_index = -1;
if (p_task->waiting_user) {
p_task->done_semaphore.post(p_task->waiting_user);
}
if (!use_native_low_priority_threads) {
p_task->pool_thread_index = -1;
// Let awaiters know.
for (uint32_t i = 0; i < threads.size(); i++) {
if (threads[i].awaited_task == p_task) {
threads[i].cond_var.notify_one();
threads[i].signaled = true;
}
}
task_mutex.unlock(); // Keep mutex down to here since on unlock the task may be freed.
}
// Task may have been freed by now (all callers notified).
p_task = nullptr;
if (!use_native_low_priority_threads) {
bool post = false;
task_mutex.lock();
ThreadData &curr_thread = threads[pool_thread_index];
curr_thread.current_low_prio_task = prev_low_prio_task;
if (low_priority) {
{
curr_thread.current_task = prev_task;
if (p_task->low_priority) {
low_priority_threads_used--;
low_priority_tasks_running--;
// A low prioriry task was freed, so see if we can move a pending one to the high priority queue.
if (_try_promote_low_priority_task()) {
post = true;
}
if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
_prevent_low_prio_saturation_deadlock();
if (_try_promote_low_priority_task()) {
if (prev_task) { // Otherwise, this thread will catch it.
_notify_threads(&curr_thread, 1, 0);
}
}
}
task_mutex.unlock();
if (post) {
task_available_semaphore.post();
}
}
set_current_thread_safe_for_nodes(safe_for_nodes_backup);
}
void WorkerThreadPool::_thread_function(void *p_user) {
ThreadData *thread_data = (ThreadData *)p_user;
while (true) {
singleton->task_available_semaphore.wait();
if (singleton->exit_threads) {
break;
Task *task_to_process = nullptr;
{
MutexLock lock(singleton->task_mutex);
if (singleton->exit_threads) {
return;
}
thread_data->signaled = false;
if (singleton->task_queue.first()) {
task_to_process = singleton->task_queue.first()->self();
singleton->task_queue.remove(singleton->task_queue.first());
} else {
thread_data->cond_var.wait(lock);
DEV_ASSERT(singleton->exit_threads || thread_data->signaled);
}
}
if (task_to_process) {
singleton->_process_task(task_to_process);
}
singleton->_process_task_queue();
}
}
void WorkerThreadPool::_native_low_priority_thread_function(void *p_user) {
Task *task = (Task *)p_user;
singleton->_process_task(task);
}
void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) {
void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) {
// Fall back to processing on the calling thread if there are no worker threads.
// Separated into its own variable to make it easier to extend this logic
// in custom builds.
bool process_on_calling_thread = threads.size() == 0;
if (process_on_calling_thread) {
_process_task(p_task);
task_mutex.unlock();
for (uint32_t i = 0; i < p_count; i++) {
_process_task(p_tasks[i]);
}
return;
}
task_mutex.lock();
p_task->low_priority = !p_high_priority;
if (!p_high_priority && use_native_low_priority_threads) {
p_task->low_priority_thread = native_thread_allocator.alloc();
task_mutex.unlock();
uint32_t to_process = 0;
uint32_t to_promote = 0;
if (p_task->group) {
p_task->group->low_priority_native_tasks.push_back(p_task);
ThreadData *caller_pool_thread = thread_ids.has(Thread::get_caller_id()) ? &threads[thread_ids[Thread::get_caller_id()]] : nullptr;
for (uint32_t i = 0; i < p_count; i++) {
p_tasks[i]->low_priority = !p_high_priority;
if (p_high_priority || low_priority_threads_used < max_low_priority_threads) {
task_queue.add_last(&p_tasks[i]->task_elem);
if (!p_high_priority) {
low_priority_threads_used++;
}
to_process++;
} else {
// Too many threads using low priority, must go to queue.
low_priority_task_queue.add_last(&p_tasks[i]->task_elem);
to_promote++;
}
p_task->low_priority_thread->start(_native_low_priority_thread_function, p_task); // Pask task directly to thread.
} else if (p_high_priority || low_priority_threads_used < max_low_priority_threads) {
task_queue.add_last(&p_task->task_elem);
if (!p_high_priority) {
low_priority_threads_used++;
}
_notify_threads(caller_pool_thread, to_process, to_promote);
task_mutex.unlock();
}
void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) {
uint32_t to_process = p_process_count;
uint32_t to_promote = p_promote_count;
// This is where which threads are awaken is decided according to the workload.
// Threads that will anyway have a chance to check the situation and process/promote tasks
// are excluded from being notified. Others will be tried anyway to try to distribute load.
// The current thread, if is a pool thread, is also excluded depending on the promoting/processing
// needs because it will anyway loop again. However, it will contribute to decreasing the count,
// which helps reducing sync traffic.
uint32_t thread_count = threads.size();
// First round:
// 1. For processing: notify threads that are not running tasks, to keep the stacks as shallow as possible.
// 2. For promoting: since it's exclusive with processing, we fin threads able to promote low-prio tasks now.
for (uint32_t i = 0;
i < thread_count && (to_process || to_promote);
i++, notify_index = (notify_index + 1) % thread_count) {
ThreadData &th = threads[notify_index];
if (th.signaled) {
continue;
}
if (th.current_task) {
// Good thread for promoting low-prio?
if (to_promote && th.awaited_task && th.current_task->low_priority) {
if (likely(&th != p_current_thread_data)) {
th.cond_var.notify_one();
}
th.signaled = true;
to_promote--;
}
} else {
if (to_process) {
if (likely(&th != p_current_thread_data)) {
th.cond_var.notify_one();
}
th.signaled = true;
to_process--;
}
}
}
// Second round:
// For processing: if the first round wasn't enough, let's try now with threads processing tasks but currently awaiting.
for (uint32_t i = 0;
i < thread_count && to_process;
i++, notify_index = (notify_index + 1) % thread_count) {
ThreadData &th = threads[notify_index];
if (th.signaled) {
continue;
}
if (th.awaited_task) {
if (likely(&th != p_current_thread_data)) {
th.cond_var.notify_one();
}
th.signaled = true;
to_process--;
}
task_mutex.unlock();
task_available_semaphore.post();
} else {
// Too many threads using low priority, must go to queue.
low_priority_task_queue.add_last(&p_task->task_elem);
task_mutex.unlock();
}
}
@@ -247,23 +298,6 @@ bool WorkerThreadPool::_try_promote_low_priority_task() {
}
}
void WorkerThreadPool::_prevent_low_prio_saturation_deadlock() {
if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
#ifdef DEV_ENABLED
print_verbose("WorkerThreadPool: Low-prio slots saturated with tasks all waiting for other low-prio tasks. Attempting to avoid deadlock by scheduling one extra task.");
#endif
// In order not to create dependency cycles, we can only schedule the next one.
// We'll keep doing the same until the deadlock is broken,
SelfList<Task> *to_promote = low_priority_task_queue.first();
if (to_promote) {
low_priority_task_queue.remove(to_promote);
task_queue.add_last(to_promote);
low_priority_threads_used++;
task_available_semaphore.post();
}
}
}
WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) {
return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description);
}
@@ -273,15 +307,15 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
// Get a free task
Task *task = task_allocator.alloc();
TaskID id = last_task++;
task->self = id;
task->callable = p_callable;
task->native_func = p_func;
task->native_func_userdata = p_userdata;
task->description = p_description;
task->template_userdata = p_template_userdata;
tasks.insert(id, task);
task_mutex.unlock();
_post_task(task, p_high_priority);
_post_tasks_and_unlock(&task, 1, p_high_priority);
return id;
}
@@ -313,105 +347,109 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
}
Task *task = *taskp;
if (!task->completed) {
if (!use_native_low_priority_threads && task->pool_thread_index != -1) { // Otherwise, it's not running yet.
int caller_pool_th_index = thread_ids.has(Thread::get_caller_id()) ? thread_ids[Thread::get_caller_id()] : -1;
if (caller_pool_th_index == task->pool_thread_index) {
// Deadlock prevention.
// Waiting for a task run on this same thread? That means the task to be awaited started waiting as well
// and another task was run to make use of the thread in the meantime, with enough bad luck as to
// the need to wait for the original task arose in turn.
// In other words, the task we want to wait for is buried in the stack.
// Let's report the caller about the issue to it handles as it sees fit.
task_mutex.unlock();
return ERR_BUSY;
}
if (task->completed) {
if (task->waiting_pool == 0 && task->waiting_user == 0) {
tasks.erase(p_task_id);
task_allocator.free(task);
}
task->waiting++;
bool is_low_prio_waiting_for_another = false;
if (!use_native_low_priority_threads) {
// Deadlock prevention:
// If all low-prio tasks are waiting for other low-prio tasks and there are no more free low-prio slots,
// we have a no progressable situation. We can apply a workaround, consisting in promoting an awaited queued
// low-prio task to the schedule queue so it can run and break the "impasse".
// NOTE: A similar reasoning could be made about high priority tasks, but there are usually much more
// than low-prio. Therefore, a deadlock there would only happen when dealing with a very complex task graph
// or when there are too few worker threads (limited platforms or exotic settings). If that turns out to be
// an issue in the real world, a further fix can be applied against that.
if (task->low_priority) {
bool awaiter_is_a_low_prio_task = thread_ids.has(Thread::get_caller_id()) && threads[thread_ids[Thread::get_caller_id()]].current_low_prio_task;
if (awaiter_is_a_low_prio_task) {
is_low_prio_waiting_for_another = true;
low_priority_tasks_awaiting_others++;
if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
_prevent_low_prio_saturation_deadlock();
}
}
}
}
task_mutex.unlock();
if (use_native_low_priority_threads && task->low_priority) {
task->done_semaphore.wait();
} else {
bool current_is_pool_thread = thread_ids.has(Thread::get_caller_id());
if (current_is_pool_thread) {
// We are an actual process thread, we must not be blocked so continue processing stuff if available.
bool must_exit = false;
while (true) {
if (task->done_semaphore.try_wait()) {
// If done, exit
break;
}
if (!must_exit) {
if (task_available_semaphore.try_wait()) {
if (exit_threads) {
must_exit = true;
} else {
// Solve tasks while they are around.
bool safe_for_nodes_backup = is_current_thread_safe_for_nodes();
_process_task_queue();
set_current_thread_safe_for_nodes(safe_for_nodes_backup);
continue;
}
} else if (!use_native_low_priority_threads && task->low_priority) {
// A low prioriry task started waiting, so see if we can move a pending one to the high priority queue.
task_mutex.lock();
bool post = _try_promote_low_priority_task();
task_mutex.unlock();
if (post) {
task_available_semaphore.post();
}
}
}
OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance.
}
} else {
task->done_semaphore.wait();
}
}
task_mutex.lock();
if (is_low_prio_waiting_for_another) {
low_priority_tasks_awaiting_others--;
}
task->waiting--;
return OK;
}
if (task->waiting == 0) {
if (use_native_low_priority_threads && task->low_priority) {
task->low_priority_thread->wait_to_finish();
native_thread_allocator.free(task->low_priority_thread);
}
tasks.erase(p_task_id);
task_allocator.free(task);
ThreadData *caller_pool_thread = thread_ids.has(Thread::get_caller_id()) ? &threads[thread_ids[Thread::get_caller_id()]] : nullptr;
if (caller_pool_thread && p_task_id <= caller_pool_thread->current_task->self) {
// Deadlock prevention:
// When a pool thread wants to wait for an older task, the following situations can happen:
// 1. Awaited task is deep in the stack of the awaiter.
// 2. A group of awaiter threads end up depending on some tasks buried in the stack
// of their worker threads in such a way that progress can't be made.
// Both would entail a deadlock. Some may be handled here in the WorkerThreadPool
// with some extra logic and bookkeeping. However, there would still be unavoidable
// cases of deadlock because of the way waiting threads process outstanding tasks.
// Taking into account there's no feasible solution for every possible case
// with the current design, we just simply reject attempts to await on older tasks,
// with a specific error code that signals the situation so the caller can handle it.
task_mutex.unlock();
return ERR_BUSY;
}
if (caller_pool_thread) {
task->waiting_pool++;
} else {
task->waiting_user++;
}
task_mutex.unlock();
if (caller_pool_thread) {
while (true) {
Task *task_to_process = nullptr;
{
MutexLock lock(task_mutex);
bool was_signaled = caller_pool_thread->signaled;
caller_pool_thread->signaled = false;
if (task->completed) {
// This thread was awaken also for some reason, but it's about to exit.
// Let's find out what may be pending and forward the requests.
if (!exit_threads && was_signaled) {
uint32_t to_process = task_queue.first() ? 1 : 0;
uint32_t to_promote = caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0;
if (to_process || to_promote) {
// This thread must be left alone since it won't loop again.
caller_pool_thread->signaled = true;
_notify_threads(caller_pool_thread, to_process, to_promote);
}
}
task->waiting_pool--;
if (task->waiting_pool == 0 && task->waiting_user == 0) {
tasks.erase(p_task_id);
task_allocator.free(task);
}
break;
}
if (!exit_threads) {
// This is a thread from the pool. It shouldn't just idle.
// Let's try to process other tasks while we wait.
if (caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
if (_try_promote_low_priority_task()) {
_notify_threads(caller_pool_thread, 1, 0);
}
}
if (singleton->task_queue.first()) {
task_to_process = task_queue.first()->self();
task_queue.remove(task_queue.first());
}
if (!task_to_process) {
caller_pool_thread->awaited_task = task;
caller_pool_thread->cond_var.wait(lock);
DEV_ASSERT(exit_threads || caller_pool_thread->signaled || task->completed);
caller_pool_thread->awaited_task = nullptr;
}
}
}
if (task_to_process) {
_process_task(task_to_process);
}
}
} else {
task->done_semaphore.wait();
task_mutex.lock();
task->waiting_user--;
if (task->waiting_pool == 0 && task->waiting_user == 0) {
tasks.erase(p_task_id);
task_allocator.free(task);
}
task_mutex.unlock();
}
return OK;
}
@@ -455,11 +493,8 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
}
groups[id] = group;
task_mutex.unlock();
for (int i = 0; i < p_tasks; i++) {
_post_task(tasks_posted[i], p_high_priority);
}
_post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority);
return id;
}
@@ -502,21 +537,9 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) {
if (!groupp) {
ERR_FAIL_MSG("Invalid Group ID");
}
Group *group = *groupp;
if (group->low_priority_native_tasks.size() > 0) {
for (Task *task : group->low_priority_native_tasks) {
task->low_priority_thread->wait_to_finish();
task_mutex.lock();
native_thread_allocator.free(task->low_priority_thread);
task_allocator.free(task);
task_mutex.unlock();
}
task_mutex.lock();
group_allocator.free(group);
task_mutex.unlock();
} else {
{
Group *group = *groupp;
group->done_semaphore.wait();
uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
@@ -540,19 +563,13 @@ int WorkerThreadPool::get_thread_index() {
return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1;
}
void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_priority, float p_low_priority_task_ratio) {
void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {
ERR_FAIL_COND(threads.size() > 0);
if (p_thread_count < 0) {
p_thread_count = OS::get_singleton()->get_default_thread_pool_size();
}
if (p_use_native_threads_low_priority) {
max_low_priority_threads = 0;
} else {
max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1);
}
use_native_low_priority_threads = p_use_native_threads_low_priority;
max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1);
threads.resize(p_thread_count);
@@ -576,12 +593,13 @@ void WorkerThreadPool::finish() {
}
task_mutex.unlock();
exit_threads = true;
for (uint32_t i = 0; i < threads.size(); i++) {
task_available_semaphore.post();
{
MutexLock lock(task_mutex);
exit_threads = true;
}
for (ThreadData &data : threads) {
data.cond_var.notify_one();
}
for (ThreadData &data : threads) {
data.thread.wait_to_finish();
}