You've already forked godot
mirror of
https://github.com/godotengine/godot.git
synced 2025-11-04 12:00:25 +00:00
Merge pull request #108697 from clayjohn/pump-task
Ensure that threads only process one pump task
This commit is contained in:
@@ -76,6 +76,7 @@ void WorkerThreadPool::_process_task(Task *p_task) {
|
|||||||
p_task->pool_thread_index = pool_thread_index;
|
p_task->pool_thread_index = pool_thread_index;
|
||||||
prev_task = curr_thread.current_task;
|
prev_task = curr_thread.current_task;
|
||||||
curr_thread.current_task = p_task;
|
curr_thread.current_task = p_task;
|
||||||
|
curr_thread.has_pump_task = p_task->is_pump_task;
|
||||||
if (p_task->pending_notify_yield_over) {
|
if (p_task->pending_notify_yield_over) {
|
||||||
curr_thread.yield_is_over = true;
|
curr_thread.yield_is_over = true;
|
||||||
}
|
}
|
||||||
@@ -218,11 +219,13 @@ void WorkerThreadPool::_thread_function(void *p_user) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock) {
|
void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock, bool p_pump_task) {
|
||||||
// Fall back to processing on the calling thread if there are no worker threads.
|
// Fall back to processing on the calling thread if there are no worker threads.
|
||||||
// Separated into its own variable to make it easier to extend this logic
|
// Separated into its own variable to make it easier to extend this logic
|
||||||
// in custom builds.
|
// in custom builds.
|
||||||
bool process_on_calling_thread = threads.is_empty();
|
|
||||||
|
// Avoid calling pump tasks or low priority tasks from the calling thread.
|
||||||
|
bool process_on_calling_thread = threads.is_empty() && p_high_priority && !p_pump_task;
|
||||||
if (process_on_calling_thread) {
|
if (process_on_calling_thread) {
|
||||||
p_lock.temp_unlock();
|
p_lock.temp_unlock();
|
||||||
for (uint32_t i = 0; i < p_count; i++) {
|
for (uint32_t i = 0; i < p_count; i++) {
|
||||||
@@ -339,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *
|
|||||||
return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description);
|
return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description);
|
||||||
}
|
}
|
||||||
|
|
||||||
WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) {
|
WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description, bool p_pump_task) {
|
||||||
MutexLock<BinaryMutex> lock(task_mutex);
|
MutexLock<BinaryMutex> lock(task_mutex);
|
||||||
|
|
||||||
// Get a free task
|
// Get a free task
|
||||||
@@ -351,15 +354,50 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
|
|||||||
task->native_func_userdata = p_userdata;
|
task->native_func_userdata = p_userdata;
|
||||||
task->description = p_description;
|
task->description = p_description;
|
||||||
task->template_userdata = p_template_userdata;
|
task->template_userdata = p_template_userdata;
|
||||||
|
task->is_pump_task = p_pump_task;
|
||||||
tasks.insert(id, task);
|
tasks.insert(id, task);
|
||||||
|
|
||||||
_post_tasks(&task, 1, p_high_priority, lock);
|
#ifdef THREADS_ENABLED
|
||||||
|
if (p_pump_task) {
|
||||||
|
pump_task_count++;
|
||||||
|
int thread_count = get_thread_count();
|
||||||
|
if (pump_task_count >= thread_count) {
|
||||||
|
print_verbose(vformat("A greater number of dedicated threads were requested (%d) than threads available (%d). Please increase the number of available worker task threads. Recovering this session by spawning more worker task threads.", pump_task_count + 1, thread_count)); // +1 because we want to keep a Thread without any pump tasks free.
|
||||||
|
|
||||||
|
Thread::Settings settings;
|
||||||
|
#ifdef __APPLE__
|
||||||
|
// The default stack size for new threads on Apple platforms is 512KiB.
|
||||||
|
// This is insufficient when using a library like SPIRV-Cross,
|
||||||
|
// which can generate deep stacks and result in a stack overflow.
|
||||||
|
#ifdef DEV_ENABLED
|
||||||
|
// Debug builds need an even larger stack size.
|
||||||
|
settings.stack_size = 2 * 1024 * 1024; // 2 MiB
|
||||||
|
#else
|
||||||
|
settings.stack_size = 1 * 1024 * 1024; // 1 MiB
|
||||||
|
#endif
|
||||||
|
#endif
|
||||||
|
// Re-sizing implies relocation, which is not supported for this array.
|
||||||
|
CRASH_COND_MSG(thread_count + 1 > (int)threads.get_capacity(), "Reserve trick for worker thread pool failed. Crashing.");
|
||||||
|
threads.resize_initialized(thread_count + 1);
|
||||||
|
threads[thread_count].index = thread_count;
|
||||||
|
threads[thread_count].pool = this;
|
||||||
|
threads[thread_count].thread.start(&WorkerThreadPool::_thread_function, &threads[thread_count], settings);
|
||||||
|
thread_ids.insert(threads[thread_count].thread.get_id(), thread_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
_post_tasks(&task, 1, p_high_priority, lock, p_pump_task);
|
||||||
|
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description) {
|
WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description, bool p_pump_task) {
|
||||||
return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description);
|
return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description, p_pump_task);
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerThreadPool::TaskID WorkerThreadPool::add_task_bind(const Callable &p_action, bool p_high_priority, const String &p_description) {
|
||||||
|
return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const {
|
bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const {
|
||||||
@@ -510,7 +548,12 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
|
|||||||
|
|
||||||
if (p_caller_pool_thread->pool->task_queue.first()) {
|
if (p_caller_pool_thread->pool->task_queue.first()) {
|
||||||
task_to_process = task_queue.first()->self();
|
task_to_process = task_queue.first()->self();
|
||||||
task_queue.remove(task_queue.first());
|
if ((p_task == ThreadData::YIELDING || p_caller_pool_thread->has_pump_task == true) && task_to_process->is_pump_task) {
|
||||||
|
task_to_process = nullptr;
|
||||||
|
_notify_threads(p_caller_pool_thread, 1, 0);
|
||||||
|
} else {
|
||||||
|
task_queue.remove(task_queue.first());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!task_to_process) {
|
if (!task_to_process) {
|
||||||
@@ -661,7 +704,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
|
|||||||
|
|
||||||
groups[id] = group;
|
groups[id] = group;
|
||||||
|
|
||||||
_post_tasks(tasks_posted, p_tasks, p_high_priority, lock);
|
_post_tasks(tasks_posted, p_tasks, p_high_priority, lock, false);
|
||||||
|
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
@@ -788,6 +831,11 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
|
|||||||
|
|
||||||
print_verbose(vformat("WorkerThreadPool: %d threads, %d max low-priority.", p_thread_count, max_low_priority_threads));
|
print_verbose(vformat("WorkerThreadPool: %d threads, %d max low-priority.", p_thread_count, max_low_priority_threads));
|
||||||
|
|
||||||
|
#ifdef THREADS_ENABLED
|
||||||
|
// Reserve 5 threads in case we need separate threads for 1) 2D physics 2) 3D physics 3) rendering 4) GPU texture compression, 5) all other tasks.
|
||||||
|
// We cannot safely increase the Vector size at runtime, so reserve enough up front, but only launch those needed.
|
||||||
|
threads.reserve(5);
|
||||||
|
#endif
|
||||||
threads.resize(p_thread_count);
|
threads.resize(p_thread_count);
|
||||||
|
|
||||||
Thread::Settings settings;
|
Thread::Settings settings;
|
||||||
@@ -862,7 +910,7 @@ void WorkerThreadPool::finish() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void WorkerThreadPool::_bind_methods() {
|
void WorkerThreadPool::_bind_methods() {
|
||||||
ClassDB::bind_method(D_METHOD("add_task", "action", "high_priority", "description"), &WorkerThreadPool::add_task, DEFVAL(false), DEFVAL(String()));
|
ClassDB::bind_method(D_METHOD("add_task", "action", "high_priority", "description"), &WorkerThreadPool::add_task_bind, DEFVAL(false), DEFVAL(String()));
|
||||||
ClassDB::bind_method(D_METHOD("is_task_completed", "task_id"), &WorkerThreadPool::is_task_completed);
|
ClassDB::bind_method(D_METHOD("is_task_completed", "task_id"), &WorkerThreadPool::is_task_completed);
|
||||||
ClassDB::bind_method(D_METHOD("wait_for_task_completion", "task_id"), &WorkerThreadPool::wait_for_task_completion);
|
ClassDB::bind_method(D_METHOD("wait_for_task_completion", "task_id"), &WorkerThreadPool::wait_for_task_completion);
|
||||||
ClassDB::bind_method(D_METHOD("get_caller_task_id"), &WorkerThreadPool::get_caller_task_id);
|
ClassDB::bind_method(D_METHOD("get_caller_task_id"), &WorkerThreadPool::get_caller_task_id);
|
||||||
|
|||||||
@@ -80,6 +80,7 @@ private:
|
|||||||
Semaphore done_semaphore; // For user threads awaiting.
|
Semaphore done_semaphore; // For user threads awaiting.
|
||||||
bool completed : 1;
|
bool completed : 1;
|
||||||
bool pending_notify_yield_over : 1;
|
bool pending_notify_yield_over : 1;
|
||||||
|
bool is_pump_task : 1;
|
||||||
Group *group = nullptr;
|
Group *group = nullptr;
|
||||||
SelfList<Task> task_elem;
|
SelfList<Task> task_elem;
|
||||||
uint32_t waiting_pool = 0;
|
uint32_t waiting_pool = 0;
|
||||||
@@ -92,6 +93,7 @@ private:
|
|||||||
Task() :
|
Task() :
|
||||||
completed(false),
|
completed(false),
|
||||||
pending_notify_yield_over(false),
|
pending_notify_yield_over(false),
|
||||||
|
is_pump_task(false),
|
||||||
task_elem(this) {}
|
task_elem(this) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -115,6 +117,7 @@ private:
|
|||||||
bool yield_is_over : 1;
|
bool yield_is_over : 1;
|
||||||
bool pre_exited_languages : 1;
|
bool pre_exited_languages : 1;
|
||||||
bool exited_languages : 1;
|
bool exited_languages : 1;
|
||||||
|
bool has_pump_task : 1; // Threads can only have one pump task.
|
||||||
Task *current_task = nullptr;
|
Task *current_task = nullptr;
|
||||||
Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
|
Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
|
||||||
ConditionVariable cond_var;
|
ConditionVariable cond_var;
|
||||||
@@ -124,7 +127,8 @@ private:
|
|||||||
signaled(false),
|
signaled(false),
|
||||||
yield_is_over(false),
|
yield_is_over(false),
|
||||||
pre_exited_languages(false),
|
pre_exited_languages(false),
|
||||||
exited_languages(false) {}
|
exited_languages(false),
|
||||||
|
has_pump_task(false) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
TightLocalVector<ThreadData> threads;
|
TightLocalVector<ThreadData> threads;
|
||||||
@@ -165,6 +169,7 @@ private:
|
|||||||
uint32_t notify_index = 0; // For rotating across threads, no help distributing load.
|
uint32_t notify_index = 0; // For rotating across threads, no help distributing load.
|
||||||
|
|
||||||
uint64_t last_task = 1;
|
uint64_t last_task = 1;
|
||||||
|
int pump_task_count = 0;
|
||||||
|
|
||||||
static HashMap<StringName, WorkerThreadPool *> named_pools;
|
static HashMap<StringName, WorkerThreadPool *> named_pools;
|
||||||
|
|
||||||
@@ -172,7 +177,7 @@ private:
|
|||||||
|
|
||||||
void _process_task(Task *task);
|
void _process_task(Task *task);
|
||||||
|
|
||||||
void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock);
|
void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock, bool p_pump_task);
|
||||||
void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count);
|
void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count);
|
||||||
|
|
||||||
bool _try_promote_low_priority_task();
|
bool _try_promote_low_priority_task();
|
||||||
@@ -188,7 +193,7 @@ private:
|
|||||||
static thread_local UnlockableLocks unlockable_locks[MAX_UNLOCKABLE_LOCKS];
|
static thread_local UnlockableLocks unlockable_locks[MAX_UNLOCKABLE_LOCKS];
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description);
|
TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description, bool p_pump_task = false);
|
||||||
GroupID _add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description);
|
GroupID _add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description);
|
||||||
|
|
||||||
template <typename C, typename M, typename U>
|
template <typename C, typename M, typename U>
|
||||||
@@ -237,7 +242,8 @@ public:
|
|||||||
return _add_task(Callable(), nullptr, nullptr, ud, p_high_priority, p_description);
|
return _add_task(Callable(), nullptr, nullptr, ud, p_high_priority, p_description);
|
||||||
}
|
}
|
||||||
TaskID add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority = false, const String &p_description = String());
|
TaskID add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority = false, const String &p_description = String());
|
||||||
TaskID add_task(const Callable &p_action, bool p_high_priority = false, const String &p_description = String());
|
TaskID add_task(const Callable &p_action, bool p_high_priority = false, const String &p_description = String(), bool p_pump_task = false);
|
||||||
|
TaskID add_task_bind(const Callable &p_action, bool p_high_priority = false, const String &p_description = String());
|
||||||
|
|
||||||
bool is_task_completed(TaskID p_task_id) const;
|
bool is_task_completed(TaskID p_task_id) const;
|
||||||
Error wait_for_task_completion(TaskID p_task_id);
|
Error wait_for_task_completion(TaskID p_task_id);
|
||||||
|
|||||||
@@ -223,7 +223,7 @@ void BetsyCompressor::_init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void BetsyCompressor::init() {
|
void BetsyCompressor::init() {
|
||||||
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &BetsyCompressor::_thread_loop), true);
|
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &BetsyCompressor::_thread_loop), true, "Betsy pump task", true);
|
||||||
command_queue.set_pump_task_id(tid);
|
command_queue.set_pump_task_id(tid);
|
||||||
command_queue.push(this, &BetsyCompressor::_assign_mt_ids, tid);
|
command_queue.push(this, &BetsyCompressor::_assign_mt_ids, tid);
|
||||||
command_queue.push_and_sync(this, &BetsyCompressor::_init);
|
command_queue.push_and_sync(this, &BetsyCompressor::_init);
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ void PhysicsServer2DWrapMT::end_sync() {
|
|||||||
|
|
||||||
void PhysicsServer2DWrapMT::init() {
|
void PhysicsServer2DWrapMT::init() {
|
||||||
if (create_thread) {
|
if (create_thread) {
|
||||||
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer2DWrapMT::_thread_loop), true);
|
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer2DWrapMT::_thread_loop), true, "Physics server 2D pump task", true);
|
||||||
command_queue.set_pump_task_id(tid);
|
command_queue.set_pump_task_id(tid);
|
||||||
command_queue.push(this, &PhysicsServer2DWrapMT::_assign_mt_ids, tid);
|
command_queue.push(this, &PhysicsServer2DWrapMT::_assign_mt_ids, tid);
|
||||||
command_queue.push_and_sync(physics_server_2d, &PhysicsServer2D::init);
|
command_queue.push_and_sync(physics_server_2d, &PhysicsServer2D::init);
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ void PhysicsServer3DWrapMT::end_sync() {
|
|||||||
|
|
||||||
void PhysicsServer3DWrapMT::init() {
|
void PhysicsServer3DWrapMT::init() {
|
||||||
if (create_thread) {
|
if (create_thread) {
|
||||||
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer3DWrapMT::_thread_loop), true);
|
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer3DWrapMT::_thread_loop), true, "Physics server 3D pump task", true);
|
||||||
command_queue.set_pump_task_id(tid);
|
command_queue.set_pump_task_id(tid);
|
||||||
command_queue.push(this, &PhysicsServer3DWrapMT::_assign_mt_ids, tid);
|
command_queue.push(this, &PhysicsServer3DWrapMT::_assign_mt_ids, tid);
|
||||||
command_queue.push_and_sync(physics_server_3d, &PhysicsServer3D::init);
|
command_queue.push_and_sync(physics_server_3d, &PhysicsServer3D::init);
|
||||||
|
|||||||
@@ -253,7 +253,7 @@ void RenderingServerDefault::init() {
|
|||||||
if (create_thread) {
|
if (create_thread) {
|
||||||
print_verbose("RenderingServerWrapMT: Starting render thread");
|
print_verbose("RenderingServerWrapMT: Starting render thread");
|
||||||
DisplayServer::get_singleton()->release_rendering_thread();
|
DisplayServer::get_singleton()->release_rendering_thread();
|
||||||
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &RenderingServerDefault::_thread_loop), true);
|
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &RenderingServerDefault::_thread_loop), true, "Rendering Server pump task", true);
|
||||||
command_queue.set_pump_task_id(tid);
|
command_queue.set_pump_task_id(tid);
|
||||||
command_queue.push(this, &RenderingServerDefault::_assign_mt_ids, tid);
|
command_queue.push(this, &RenderingServerDefault::_assign_mt_ids, tid);
|
||||||
command_queue.push_and_sync(this, &RenderingServerDefault::_init);
|
command_queue.push_and_sync(this, &RenderingServerDefault::_init);
|
||||||
|
|||||||
Reference in New Issue
Block a user