From 4fc18c921d788500c57d7beb473e11b23260b942 Mon Sep 17 00:00:00 2001 From: blueloveTH Date: Mon, 24 Nov 2025 19:14:34 +0800 Subject: [PATCH] [no ci] backup --- include/pocketpy/common/threads.h | 10 ++++---- src/common/threads.c | 40 ++++++++++++++++++------------- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/include/pocketpy/common/threads.h b/include/pocketpy/common/threads.h index 607cb3b3..a84731bd 100644 --- a/include/pocketpy/common/threads.h +++ b/include/pocketpy/common/threads.h @@ -53,20 +53,22 @@ typedef struct c11_thrdpool_tasks { } c11_thrdpool_tasks; typedef struct c11_thrdpool_worker { - c11_mutex_t mutex; + c11_mutex_t* p_mutex; c11_cond_t* p_cond; - c11_thrdpool_tasks* tasks; - bool should_exit; + c11_thrdpool_tasks* p_tasks; + bool should_exit; c11_thrd_t thread; } c11_thrdpool_worker; typedef struct c11_thrdpool { int length; c11_thrdpool_worker* workers; - c11_thrdpool_tasks tasks; atomic_bool is_busy; + + c11_mutex_t workers_mutex; c11_cond_t workers_cond; + c11_thrdpool_tasks tasks; } c11_thrdpool; void c11_thrdpool__ctor(c11_thrdpool* pool, int length); diff --git a/src/common/threads.c b/src/common/threads.c index ea033778..95bfc4c5 100644 --- a/src/common/threads.c +++ b/src/common/threads.c @@ -73,26 +73,32 @@ void c11_cond__broadcast(c11_cond_t* cond) { cnd_broadcast(cond); } static c11_thrd_retval_t _thrdpool_worker(void* arg) { c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg; - c11_thrdpool_tasks* tasks = p_worker->tasks; while(true) { // wait for tasks - c11_mutex__lock(&p_worker->mutex); - c11_cond__wait(p_worker->p_cond, &p_worker->mutex); + c11_mutex__lock(p_worker->p_mutex); + while(!p_worker->p_tasks && !p_worker->should_exit) { + c11_cond__wait(p_worker->p_cond, p_worker->p_mutex); + } if(p_worker->should_exit) { - c11_mutex__unlock(&p_worker->mutex); + c11_mutex__unlock(p_worker->p_mutex); break; } - c11_mutex__unlock(&p_worker->mutex); + + c11_thrdpool_tasks* p_tasks = p_worker->p_tasks; + c11_mutex__unlock(p_worker->p_mutex); // execute tasks while(true) { - int arg_index = atomic_fetch_add(&tasks->current_index, 1); - if(arg_index < tasks->length) { - void* arg = tasks->args[arg_index]; - tasks->func(arg); - atomic_fetch_add(&tasks->completed_count, 1); + int arg_index = atomic_fetch_add(&p_tasks->current_index, 1); + if(arg_index < p_tasks->length) { + void* arg = p_tasks->args[arg_index]; + p_tasks->func(arg); + atomic_fetch_add(&p_tasks->completed_count, 1); } else { + c11_mutex__lock(p_worker->p_mutex); + p_worker->p_tasks = NULL; + c11_mutex__unlock(p_worker->p_mutex); break; } } @@ -104,14 +110,16 @@ void c11_thrdpool__ctor(c11_thrdpool* pool, int length) { pool->length = length; pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length); atomic_store(&pool->is_busy, false); + + c11_mutex__ctor(&pool->workers_mutex); c11_cond__ctor(&pool->workers_cond); for(int i = 0; i < length; i++) { c11_thrdpool_worker* p_worker = &pool->workers[i]; - c11_mutex__ctor(&p_worker->mutex); + p_worker->p_mutex = &pool->workers_mutex; p_worker->p_cond = &pool->workers_cond; - p_worker->tasks = &pool->tasks; + p_worker->p_tasks = &pool->tasks; p_worker->should_exit = false; bool ok = c11_thrd__create(&p_worker->thread, _thrdpool_worker, p_worker); @@ -122,19 +130,17 @@ void c11_thrdpool__ctor(c11_thrdpool* pool, int length) { void c11_thrdpool__dtor(c11_thrdpool* pool) { for(int i = 0; i < pool->length; i++) { c11_thrdpool_worker* p_worker = &pool->workers[i]; - c11_mutex__lock(&p_worker->mutex); - p_worker->should_exit = true; - c11_mutex__unlock(&p_worker->mutex); + atomic_store(&p_worker->should_exit, true); } c11_cond__broadcast(&pool->workers_cond); for(int i = 0; i < pool->length; i++) { c11_thrdpool_worker* p_worker = &pool->workers[i]; c11_thrd__join(p_worker->thread); - c11_mutex__dtor(&p_worker->mutex); } PK_FREE(pool->workers); + c11_mutex__dtor(&pool->workers_mutex); c11_cond__dtor(&pool->workers_cond); } @@ -146,6 +152,7 @@ void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args c11_thrd__yield(); } // assign tasks + c11_mutex__lock(&pool->workers_mutex); pool->tasks.func = func; pool->tasks.args = args; pool->tasks.length = num_tasks; @@ -153,6 +160,7 @@ void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args atomic_store(&pool->tasks.completed_count, 0); // wake up all workers c11_cond__broadcast(&pool->workers_cond); + c11_mutex__unlock(&pool->workers_mutex); // wait for complete while(atomic_load(&pool->tasks.completed_count) < num_tasks) { c11_thrd__yield();