diff --git a/include/pocketpy/common/threads.h b/include/pocketpy/common/threads.h index 8004311c..607cb3b3 100644 --- a/include/pocketpy/common/threads.h +++ b/include/pocketpy/common/threads.h @@ -40,26 +40,37 @@ void c11_cond__ctor(c11_cond_t* cond); void c11_cond__dtor(c11_cond_t* cond); void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex); void c11_cond__signal(c11_cond_t* cond); +void c11_cond__broadcast(c11_cond_t* cond); + +typedef void (*c11_thrdpool_func_t)(void* arg); + +typedef struct c11_thrdpool_tasks { + c11_thrdpool_func_t func; + void** args; + int length; + atomic_int current_index; + atomic_int completed_count; +} c11_thrdpool_tasks; typedef struct c11_thrdpool_worker { - c11_thrd_t thread; c11_mutex_t mutex; - c11_cond_t cond; - - c11_thrd_func_t func; - void* arg; - + c11_cond_t* p_cond; + c11_thrdpool_tasks* tasks; bool should_exit; + + c11_thrd_t thread; } c11_thrdpool_worker; typedef struct c11_thrdpool { int length; c11_thrdpool_worker* workers; - c11_thrd_t main_thread; + c11_thrdpool_tasks tasks; + atomic_bool is_busy; + c11_cond_t workers_cond; } c11_thrdpool; void c11_thrdpool__ctor(c11_thrdpool* pool, int length); void c11_thrdpool__dtor(c11_thrdpool* pool); -bool c11_thrdpool__create(c11_thrdpool* pool, c11_thrd_func_t func, void* arg); +void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args, int num_tasks); #endif \ No newline at end of file diff --git a/src/common/threads.c b/src/common/threads.c index 28cc1a4c..ea033778 100644 --- a/src/common/threads.c +++ b/src/common/threads.c @@ -34,6 +34,8 @@ void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex) { pthread_cond_wait(co void c11_cond__signal(c11_cond_t* cond) { pthread_cond_signal(cond); } +void c11_cond__broadcast(c11_cond_t* cond) { pthread_cond_broadcast(cond); } + #else bool c11_thrd__create(c11_thrd_t* thrd, c11_thrd_func_t func, void* arg) { @@ -65,28 +67,35 @@ void c11_cond__wait(c11_cond_t* cond, c11_mutex_t* mutex) { cnd_wait(cond, mutex void c11_cond__signal(c11_cond_t* cond) { cnd_signal(cond); } +void c11_cond__broadcast(c11_cond_t* cond) { cnd_broadcast(cond); } + #endif static c11_thrd_retval_t _thrdpool_worker(void* arg) { c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg; - while(true) { - c11_mutex__lock(&p_worker->mutex); - while(p_worker->func == NULL && !p_worker->should_exit) { - c11_cond__wait(&p_worker->cond, &p_worker->mutex); - } + c11_thrdpool_tasks* tasks = p_worker->tasks; + while(true) { + // wait for tasks + c11_mutex__lock(&p_worker->mutex); + c11_cond__wait(p_worker->p_cond, &p_worker->mutex); if(p_worker->should_exit) { c11_mutex__unlock(&p_worker->mutex); break; } - - c11_thrd_func_t func = p_worker->func; - void* arg = p_worker->arg; - p_worker->func = NULL; - p_worker->arg = NULL; c11_mutex__unlock(&p_worker->mutex); - func(arg); + // execute tasks + while(true) { + int arg_index = atomic_fetch_add(&tasks->current_index, 1); + if(arg_index < tasks->length) { + void* arg = tasks->args[arg_index]; + tasks->func(arg); + atomic_fetch_add(&tasks->completed_count, 1); + } else { + break; + } + } } return 0; } @@ -94,14 +103,15 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) { void c11_thrdpool__ctor(c11_thrdpool* pool, int length) { pool->length = length; pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length); - pool->main_thread = c11_thrd__current(); + atomic_store(&pool->is_busy, false); + c11_cond__ctor(&pool->workers_cond); + for(int i = 0; i < length; i++) { c11_thrdpool_worker* p_worker = &pool->workers[i]; c11_mutex__ctor(&p_worker->mutex); - c11_cond__ctor(&p_worker->cond); - p_worker->func = NULL; - p_worker->arg = NULL; + p_worker->p_cond = &pool->workers_cond; + p_worker->tasks = &pool->tasks; p_worker->should_exit = false; bool ok = c11_thrd__create(&p_worker->thread, _thrdpool_worker, p_worker); @@ -114,39 +124,40 @@ void c11_thrdpool__dtor(c11_thrdpool* pool) { c11_thrdpool_worker* p_worker = &pool->workers[i]; c11_mutex__lock(&p_worker->mutex); p_worker->should_exit = true; - c11_cond__signal(&p_worker->cond); c11_mutex__unlock(&p_worker->mutex); } + c11_cond__broadcast(&pool->workers_cond); for(int i = 0; i < pool->length; i++) { c11_thrdpool_worker* p_worker = &pool->workers[i]; c11_thrd__join(p_worker->thread); c11_mutex__dtor(&p_worker->mutex); - c11_cond__dtor(&p_worker->cond); } PK_FREE(pool->workers); + c11_cond__dtor(&pool->workers_cond); } -bool c11_thrdpool__create(c11_thrdpool* pool, c11_thrd_func_t func, void* arg) { - // must be called from the main thread - c11_thrd_t curr_thread = c11_thrd__current(); - c11__rtassert(c11_thrd__equal(curr_thread, pool->main_thread)); - - // find the 1st idle worker - for(int i = 0; i < pool->length; i++) { - c11_thrdpool_worker* p_worker = &pool->workers[i]; - c11_mutex__lock(&p_worker->mutex); - if(p_worker->func == NULL) { - p_worker->func = func; - p_worker->arg = arg; - c11_cond__signal(&p_worker->cond); - c11_mutex__unlock(&p_worker->mutex); - return true; // Task assigned - } - c11_mutex__unlock(&p_worker->mutex); +void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args, int num_tasks) { + if(num_tasks == 0) return; + bool expected = false; + while(!atomic_compare_exchange_weak(&pool->is_busy, &expected, true)) { + expected = false; + c11_thrd__yield(); } - return false; // no idle worker found + // assign tasks + pool->tasks.func = func; + pool->tasks.args = args; + pool->tasks.length = num_tasks; + atomic_store(&pool->tasks.current_index, 0); + atomic_store(&pool->tasks.completed_count, 0); + // wake up all workers + c11_cond__broadcast(&pool->workers_cond); + // wait for complete + while(atomic_load(&pool->tasks.completed_count) < num_tasks) { + c11_thrd__yield(); + } + atomic_store(&pool->is_busy, false); } #endif // PK_ENABLE_THREADS \ No newline at end of file