diff --git a/include/pocketpy/common/threads.h b/include/pocketpy/common/threads.h index 4be7458b..de988bf4 100644 --- a/include/pocketpy/common/threads.h +++ b/include/pocketpy/common/threads.h @@ -44,12 +44,11 @@ void c11_cond__broadcast(c11_cond_t* cond); typedef void (*c11_thrdpool_func_t)(void* arg); - typedef struct c11_thrdpool_tasks { + atomic_int sync_val; c11_thrdpool_func_t func; void** args; int length; - int sync_val; atomic_int current_index; atomic_int completed_count; } c11_thrdpool_tasks; @@ -64,10 +63,9 @@ typedef struct c11_thrdpool_worker { typedef struct c11_thrdpool { int length; c11_thrdpool_worker* workers; - atomic_bool is_busy; - - c11_mutex_t workers_mutex[2]; - c11_cond_t workers_cond[2]; + + c11_mutex_t workers_mutex; + c11_cond_t workers_cond; c11_thrdpool_tasks tasks; } c11_thrdpool; diff --git a/src/common/threads.c b/src/common/threads.c index 2b1e2a73..e76b8362 100644 --- a/src/common/threads.c +++ b/src/common/threads.c @@ -71,29 +71,22 @@ void c11_cond__broadcast(c11_cond_t* cond) { cnd_broadcast(cond); } #endif -static bool _thrdpool_worker_sync(c11_mutex_t* p_mutex, - c11_cond_t* p_cond, - int* p_sync_val, - int expected_sync_val) { - int index = (int)expected_sync_val; - c11_mutex__lock(p_mutex + index); - while(true) { - c11_cond__wait(p_cond + index, p_mutex + index); - if(*p_sync_val == -1) return false; - if(*p_sync_val == expected_sync_val) break; - } - c11_mutex__unlock(p_mutex + index); - return true; -} - static c11_thrd_retval_t _thrdpool_worker(void* arg) { c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg; c11_thrdpool_tasks* p_tasks = p_worker->p_tasks; while(true) { - if(!_thrdpool_worker_sync(p_worker->p_mutex, p_worker->p_cond, &p_tasks->sync_val, 0)) { - break; + c11_mutex__lock(p_worker->p_mutex); + while(true) { + c11_cond__wait(p_worker->p_cond, p_worker->p_mutex); + int sync_val = atomic_load(&p_tasks->sync_val); + if(sync_val == -1) { + c11_mutex__unlock(p_worker->p_mutex); + return 0; // force kill + } + if(sync_val == 1) break; } + c11_mutex__unlock(p_worker->p_mutex); // execute tasks while(true) { @@ -106,10 +99,6 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) { break; } } - - if(!_thrdpool_worker_sync(p_worker->p_mutex, p_worker->p_cond, &p_tasks->sync_val, 1)) { - break; - } } return 0; } @@ -117,17 +106,16 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) { void c11_thrdpool__ctor(c11_thrdpool* pool, int length) { pool->length = length; pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length); - atomic_store(&pool->is_busy, false); - c11_mutex__ctor(&pool->workers_mutex[0]); - c11_mutex__ctor(&pool->workers_mutex[1]); - c11_cond__ctor(&pool->workers_cond[0]); - c11_cond__ctor(&pool->workers_cond[1]); + c11_mutex__ctor(&pool->workers_mutex); + c11_cond__ctor(&pool->workers_cond); + + atomic_store(&pool->tasks.sync_val, 0); for(int i = 0; i < length; i++) { c11_thrdpool_worker* p_worker = &pool->workers[i]; - p_worker->p_mutex = pool->workers_mutex; - p_worker->p_cond = pool->workers_cond; + p_worker->p_mutex = &pool->workers_mutex; + p_worker->p_cond = &pool->workers_cond; p_worker->p_tasks = &pool->tasks; bool ok = c11_thrd__create(&p_worker->thread, _thrdpool_worker, p_worker); c11__rtassert(ok); @@ -135,14 +123,8 @@ void c11_thrdpool__ctor(c11_thrdpool* pool, int length) { } void c11_thrdpool__dtor(c11_thrdpool* pool) { - c11_mutex__lock(&pool->workers_mutex[0]); - c11_mutex__lock(&pool->workers_mutex[1]); - pool->tasks.sync_val = -1; - c11_mutex__unlock(&pool->workers_mutex[1]); - c11_mutex__unlock(&pool->workers_mutex[0]); - - c11_cond__broadcast(&pool->workers_cond[0]); - c11_cond__broadcast(&pool->workers_cond[1]); + atomic_store(&pool->tasks.sync_val, -1); + c11_cond__broadcast(&pool->workers_cond); for(int i = 0; i < pool->length; i++) { c11_thrdpool_worker* p_worker = &pool->workers[i]; @@ -150,40 +132,30 @@ void c11_thrdpool__dtor(c11_thrdpool* pool) { } PK_FREE(pool->workers); - c11_mutex__dtor(&pool->workers_mutex[0]); - c11_mutex__dtor(&pool->workers_mutex[1]); - c11_cond__dtor(&pool->workers_cond[0]); - c11_cond__dtor(&pool->workers_cond[1]); + c11_mutex__dtor(&pool->workers_mutex); + c11_cond__dtor(&pool->workers_cond); } void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args, int num_tasks) { if(num_tasks == 0) return; - bool expected = false; - while(!atomic_compare_exchange_weak(&pool->is_busy, &expected, true)) { - expected = false; + while(atomic_load(&pool->tasks.sync_val) != 0) { c11_thrd__yield(); } // assign tasks - c11_mutex__lock(&pool->workers_mutex[0]); + c11_mutex__lock(&pool->workers_mutex); pool->tasks.func = func; pool->tasks.args = args; pool->tasks.length = num_tasks; - pool->tasks.sync_val = 0; + atomic_store(&pool->tasks.sync_val, 1); atomic_store(&pool->tasks.current_index, 0); atomic_store(&pool->tasks.completed_count, 0); - c11_cond__broadcast(&pool->workers_cond[0]); - c11_mutex__unlock(&pool->workers_mutex[0]); + c11_cond__broadcast(&pool->workers_cond); + c11_mutex__unlock(&pool->workers_mutex); // wait for complete while(atomic_load(&pool->tasks.completed_count) < num_tasks) { c11_thrd__yield(); } - // notify workers to proceed - c11_mutex__lock(&pool->workers_mutex[1]); - pool->tasks.sync_val = 1; - c11_cond__broadcast(&pool->workers_cond[1]); - c11_mutex__unlock(&pool->workers_mutex[1]); - // mark as not busy - atomic_store(&pool->is_busy, false); + atomic_store(&pool->tasks.sync_val, 0); } #endif // PK_ENABLE_THREADS \ No newline at end of file