From 10c81469c0e99a6c2293265eca0bf3b89ea39e43 Mon Sep 17 00:00:00 2001 From: blueloveTH Date: Mon, 24 Nov 2025 15:09:02 +0800 Subject: [PATCH] add threads pool impl --- include/pocketpy/common/threads.h | 44 +++++++++- src/common/name.c | 2 +- src/common/threads.c | 135 +++++++++++++++++++++++++++++- src/modules/pkpy.c | 8 +- 4 files changed, 178 insertions(+), 11 deletions(-) diff --git a/include/pocketpy/common/threads.h b/include/pocketpy/common/threads.h index d1d1fa51..8004311c 100644 --- a/include/pocketpy/common/threads.h +++ b/include/pocketpy/common/threads.h @@ -12,14 +12,54 @@ #define PK_USE_PTHREADS 1 typedef pthread_t c11_thrd_t; typedef void* c11_thrd_retval_t; +typedef pthread_mutex_t c11_mutex_t; +typedef pthread_cond_t c11_cond_t; #else #include #define PK_USE_PTHREADS 0 typedef thrd_t c11_thrd_t; typedef int c11_thrd_retval_t; +typedef mtx_t c11_mutex_t; +typedef cnd_t c11_cond_t; #endif -bool c11_thrd_create(c11_thrd_t* thrd, c11_thrd_retval_t (*func)(void*), void* arg); -void c11_thrd_yield(); +typedef c11_thrd_retval_t (*c11_thrd_func_t)(void*); + +bool c11_thrd__create(c11_thrd_t* thrd, c11_thrd_func_t func, void* arg); +void c11_thrd__yield(); +void c11_thrd__join(c11_thrd_t thrd); +c11_thrd_t c11_thrd__current(); +bool c11_thrd__equal(c11_thrd_t a, c11_thrd_t b); + +void c11_mutex__ctor(c11_mutex_t* mutex); +void c11_mutex__dtor(c11_mutex_t* mutex); +void c11_mutex__lock(c11_mutex_t* mutex); +void c11_mutex__unlock(c11_mutex_t* mutex); + +void c11_cond__ctor(c11_cond_t* cond); +void c11_cond__dtor(c11_cond_t* cond); +void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex); +void c11_cond__signal(c11_cond_t* cond); + +typedef struct c11_thrdpool_worker { + c11_thrd_t thread; + c11_mutex_t mutex; + c11_cond_t cond; + + c11_thrd_func_t func; + void* arg; + + bool should_exit; +} c11_thrdpool_worker; + +typedef struct c11_thrdpool { + int length; + c11_thrdpool_worker* workers; + c11_thrd_t main_thread; +} c11_thrdpool; + +void c11_thrdpool__ctor(c11_thrdpool* pool, int length); +void c11_thrdpool__dtor(c11_thrdpool* pool); +bool c11_thrdpool__create(c11_thrdpool* pool, c11_thrd_func_t func, void* arg); #endif \ No newline at end of file diff --git a/src/common/name.c b/src/common/name.c index 46d0a863..1973d8d2 100644 --- a/src/common/name.c +++ b/src/common/name.c @@ -47,7 +47,7 @@ void pk_names_finalize() { py_Name py_namev(c11_sv name) { #if PK_ENABLE_THREADS while(atomic_flag_test_and_set(&pk_string_table.lock)) { - c11_thrd_yield(); + c11_thrd__yield(); } #endif uint64_t hash = c11_sv__hash(name); diff --git a/src/common/threads.c b/src/common/threads.c index a2bb62cc..28cc1a4c 100644 --- a/src/common/threads.c +++ b/src/common/threads.c @@ -1,25 +1,152 @@ #include "pocketpy/common/threads.h" +#include "pocketpy/common/utils.h" #if PK_ENABLE_THREADS #if PK_USE_PTHREADS -bool c11_thrd_create(c11_thrd_t* thrd, c11_thrd_retval_t (*func)(void*), void* arg) { +bool c11_thrd__create(c11_thrd_t* thrd, c11_thrd_func_t func, void* arg) { int res = pthread_create(thrd, NULL, func, arg); return res == 0; } -void c11_thrd_yield() { sched_yield(); } +void c11_thrd__yield() { sched_yield(); } + +void c11_thrd__join(c11_thrd_t thrd) { pthread_join(thrd, NULL); } + +c11_thrd_t c11_thrd__current() { return pthread_self(); } + +bool c11_thrd__equal(c11_thrd_t a, c11_thrd_t b) { return pthread_equal(a, b); } + +void c11_mutex__ctor(c11_mutex_t* mutex) { pthread_mutex_init(mutex, NULL); } + +void c11_mutex__dtor(c11_mutex_t* mutex) { pthread_mutex_destroy(mutex); } + +void c11_mutex__lock(c11_mutex_t* mutex) { pthread_mutex_lock(mutex); } + +void c11_mutex__unlock(c11_mutex_t* mutex) { pthread_mutex_unlock(mutex); } + +void c11_cond__ctor(c11_cond_t* cond) { pthread_cond_init(cond, NULL); } + +void c11_cond__dtor(c11_cond_t* cond) { pthread_cond_destroy(cond); } + +void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex) { pthread_cond_wait(cond, mutex); } + +void c11_cond__signal(c11_cond_t* cond) { pthread_cond_signal(cond); } #else -bool c11_thrd_create(c11_thrd_t* thrd, c11_thrd_retval_t (*func)(void*), void* arg) { +bool c11_thrd__create(c11_thrd_t* thrd, c11_thrd_func_t func, void* arg) { int res = thrd_create(thrd, func, arg); return res == thrd_success; } -void c11_thrd_yield() { thrd_yield(); } +void c11_thrd__yield() { thrd_yield(); } + +void c11_thrd__join(c11_thrd_t thrd) { thrd_join(thrd, NULL); } + +c11_thrd_t c11_thrd__current() { return thrd_current(); } + +bool c11_thrd__equal(c11_thrd_t a, c11_thrd_t b) { return thrd_equal(a, b); } + +void c11_mutex__ctor(c11_mutex_t* mutex) { mtx_init(mutex, mtx_plain); } + +void c11_mutex__dtor(c11_mutex_t* mutex) { mtx_destroy(mutex); } + +void c11_mutex__lock(c11_mutex_t* mutex) { mtx_lock(mutex); } + +void c11_mutex__unlock(c11_mutex_t* mutex) { mtx_unlock(mutex); } + +void c11_cond__ctor(c11_cond_t* cond) { cnd_init(cond); } + +void c11_cond__dtor(c11_cond_t* cond) { cnd_destroy(cond); } + +void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex) { cnd_wait(cond, mutex); } + +void c11_cond__signal(c11_cond_t* cond) { cnd_signal(cond); } #endif +static c11_thrd_retval_t _thrdpool_worker(void* arg) { + c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg; + while(true) { + c11_mutex__lock(&p_worker->mutex); + while(p_worker->func == NULL && !p_worker->should_exit) { + c11_cond__wait(&p_worker->cond, &p_worker->mutex); + } + + if(p_worker->should_exit) { + c11_mutex__unlock(&p_worker->mutex); + break; + } + + c11_thrd_func_t func = p_worker->func; + void* arg = p_worker->arg; + p_worker->func = NULL; + p_worker->arg = NULL; + c11_mutex__unlock(&p_worker->mutex); + + func(arg); + } + return 0; +} + +void c11_thrdpool__ctor(c11_thrdpool* pool, int length) { + pool->length = length; + pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length); + pool->main_thread = c11_thrd__current(); + for(int i = 0; i < length; i++) { + c11_thrdpool_worker* p_worker = &pool->workers[i]; + + c11_mutex__ctor(&p_worker->mutex); + c11_cond__ctor(&p_worker->cond); + p_worker->func = NULL; + p_worker->arg = NULL; + p_worker->should_exit = false; + + bool ok = c11_thrd__create(&p_worker->thread, _thrdpool_worker, p_worker); + c11__rtassert(ok); + } +} + +void c11_thrdpool__dtor(c11_thrdpool* pool) { + for(int i = 0; i < pool->length; i++) { + c11_thrdpool_worker* p_worker = &pool->workers[i]; + c11_mutex__lock(&p_worker->mutex); + p_worker->should_exit = true; + c11_cond__signal(&p_worker->cond); + c11_mutex__unlock(&p_worker->mutex); + } + + for(int i = 0; i < pool->length; i++) { + c11_thrdpool_worker* p_worker = &pool->workers[i]; + c11_thrd__join(p_worker->thread); + c11_mutex__dtor(&p_worker->mutex); + c11_cond__dtor(&p_worker->cond); + } + + PK_FREE(pool->workers); +} + +bool c11_thrdpool__create(c11_thrdpool* pool, c11_thrd_func_t func, void* arg) { + // must be called from the main thread + c11_thrd_t curr_thread = c11_thrd__current(); + c11__rtassert(c11_thrd__equal(curr_thread, pool->main_thread)); + + // find the 1st idle worker + for(int i = 0; i < pool->length; i++) { + c11_thrdpool_worker* p_worker = &pool->workers[i]; + c11_mutex__lock(&p_worker->mutex); + if(p_worker->func == NULL) { + p_worker->func = func; + p_worker->arg = arg; + c11_cond__signal(&p_worker->cond); + c11_mutex__unlock(&p_worker->mutex); + return true; // Task assigned + } + c11_mutex__unlock(&p_worker->mutex); + } + return false; // no idle worker found +} + #endif // PK_ENABLE_THREADS \ No newline at end of file diff --git a/src/modules/pkpy.c b/src/modules/pkpy.c index 0e8116fb..29d2f1e9 100644 --- a/src/modules/pkpy.c +++ b/src/modules/pkpy.c @@ -212,7 +212,7 @@ static bool ComputeThread_wait_for_done(int argc, py_Ref argv) { PY_CHECK_ARGC(1); c11_ComputeThread* self = py_touserdata(argv); while(!atomic_load(&self->is_done)) { - c11_thrd_yield(); + c11_thrd__yield(); } py_newnone(py_retval()); return true; @@ -308,7 +308,7 @@ static bool ComputeThread_submit_exec(int argc, py_Ref argv) { c11_ComputeThread__reset_job(self, job, ComputeThreadJobExec__dtor); /**************************/ atomic_store(&self->is_done, false); - bool ok = c11_thrd_create(&self->thread, ComputeThreadJob_exec, job); + bool ok = c11_thrd__create(&self->thread, ComputeThreadJob_exec, job); if(!ok) { atomic_store(&self->is_done, true); return OSError("thrd_create() failed"); @@ -331,7 +331,7 @@ static bool ComputeThread_submit_eval(int argc, py_Ref argv) { c11_ComputeThread__reset_job(self, job, ComputeThreadJobExec__dtor); /**************************/ atomic_store(&self->is_done, false); - bool ok = c11_thrd_create(&self->thread, ComputeThreadJob_exec, job); + bool ok = c11_thrd__create(&self->thread, ComputeThreadJob_exec, job); if(!ok) { atomic_store(&self->is_done, true); return OSError("thrd_create() failed"); @@ -368,7 +368,7 @@ static bool ComputeThread_submit_call(int argc, py_Ref argv) { c11_ComputeThread__reset_job(self, job, ComputeThreadJobCall__dtor); /**************************/ atomic_store(&self->is_done, false); - bool ok = c11_thrd_create(&self->thread, ComputeThreadJob_call, job); + bool ok = c11_thrd__create(&self->thread, ComputeThreadJob_call, job); if(!ok) { atomic_store(&self->is_done, true); return OSError("thrd_create() failed");