This commit is contained in:
blueloveTH 2025-11-24 23:13:54 +08:00
parent fa878636bb
commit 558eb10854
2 changed files with 30 additions and 60 deletions

View File

@ -44,12 +44,11 @@ void c11_cond__broadcast(c11_cond_t* cond);
typedef void (*c11_thrdpool_func_t)(void* arg); typedef void (*c11_thrdpool_func_t)(void* arg);
typedef struct c11_thrdpool_tasks { typedef struct c11_thrdpool_tasks {
atomic_int sync_val;
c11_thrdpool_func_t func; c11_thrdpool_func_t func;
void** args; void** args;
int length; int length;
int sync_val;
atomic_int current_index; atomic_int current_index;
atomic_int completed_count; atomic_int completed_count;
} c11_thrdpool_tasks; } c11_thrdpool_tasks;
@ -64,10 +63,9 @@ typedef struct c11_thrdpool_worker {
typedef struct c11_thrdpool { typedef struct c11_thrdpool {
int length; int length;
c11_thrdpool_worker* workers; c11_thrdpool_worker* workers;
atomic_bool is_busy;
c11_mutex_t workers_mutex[2]; c11_mutex_t workers_mutex;
c11_cond_t workers_cond[2]; c11_cond_t workers_cond;
c11_thrdpool_tasks tasks; c11_thrdpool_tasks tasks;
} c11_thrdpool; } c11_thrdpool;

View File

@ -71,29 +71,22 @@ void c11_cond__broadcast(c11_cond_t* cond) { cnd_broadcast(cond); }
#endif #endif
static bool _thrdpool_worker_sync(c11_mutex_t* p_mutex,
c11_cond_t* p_cond,
int* p_sync_val,
int expected_sync_val) {
int index = (int)expected_sync_val;
c11_mutex__lock(p_mutex + index);
while(true) {
c11_cond__wait(p_cond + index, p_mutex + index);
if(*p_sync_val == -1) return false;
if(*p_sync_val == expected_sync_val) break;
}
c11_mutex__unlock(p_mutex + index);
return true;
}
static c11_thrd_retval_t _thrdpool_worker(void* arg) { static c11_thrd_retval_t _thrdpool_worker(void* arg) {
c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg; c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg;
c11_thrdpool_tasks* p_tasks = p_worker->p_tasks; c11_thrdpool_tasks* p_tasks = p_worker->p_tasks;
while(true) { while(true) {
if(!_thrdpool_worker_sync(p_worker->p_mutex, p_worker->p_cond, &p_tasks->sync_val, 0)) { c11_mutex__lock(p_worker->p_mutex);
break; while(true) {
c11_cond__wait(p_worker->p_cond, p_worker->p_mutex);
int sync_val = atomic_load(&p_tasks->sync_val);
if(sync_val == -1) {
c11_mutex__unlock(p_worker->p_mutex);
return 0; // force kill
} }
if(sync_val == 1) break;
}
c11_mutex__unlock(p_worker->p_mutex);
// execute tasks // execute tasks
while(true) { while(true) {
@ -106,10 +99,6 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) {
break; break;
} }
} }
if(!_thrdpool_worker_sync(p_worker->p_mutex, p_worker->p_cond, &p_tasks->sync_val, 1)) {
break;
}
} }
return 0; return 0;
} }
@ -117,17 +106,16 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) {
void c11_thrdpool__ctor(c11_thrdpool* pool, int length) { void c11_thrdpool__ctor(c11_thrdpool* pool, int length) {
pool->length = length; pool->length = length;
pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length); pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length);
atomic_store(&pool->is_busy, false);
c11_mutex__ctor(&pool->workers_mutex[0]); c11_mutex__ctor(&pool->workers_mutex);
c11_mutex__ctor(&pool->workers_mutex[1]); c11_cond__ctor(&pool->workers_cond);
c11_cond__ctor(&pool->workers_cond[0]);
c11_cond__ctor(&pool->workers_cond[1]); atomic_store(&pool->tasks.sync_val, 0);
for(int i = 0; i < length; i++) { for(int i = 0; i < length; i++) {
c11_thrdpool_worker* p_worker = &pool->workers[i]; c11_thrdpool_worker* p_worker = &pool->workers[i];
p_worker->p_mutex = pool->workers_mutex; p_worker->p_mutex = &pool->workers_mutex;
p_worker->p_cond = pool->workers_cond; p_worker->p_cond = &pool->workers_cond;
p_worker->p_tasks = &pool->tasks; p_worker->p_tasks = &pool->tasks;
bool ok = c11_thrd__create(&p_worker->thread, _thrdpool_worker, p_worker); bool ok = c11_thrd__create(&p_worker->thread, _thrdpool_worker, p_worker);
c11__rtassert(ok); c11__rtassert(ok);
@ -135,14 +123,8 @@ void c11_thrdpool__ctor(c11_thrdpool* pool, int length) {
} }
void c11_thrdpool__dtor(c11_thrdpool* pool) { void c11_thrdpool__dtor(c11_thrdpool* pool) {
c11_mutex__lock(&pool->workers_mutex[0]); atomic_store(&pool->tasks.sync_val, -1);
c11_mutex__lock(&pool->workers_mutex[1]); c11_cond__broadcast(&pool->workers_cond);
pool->tasks.sync_val = -1;
c11_mutex__unlock(&pool->workers_mutex[1]);
c11_mutex__unlock(&pool->workers_mutex[0]);
c11_cond__broadcast(&pool->workers_cond[0]);
c11_cond__broadcast(&pool->workers_cond[1]);
for(int i = 0; i < pool->length; i++) { for(int i = 0; i < pool->length; i++) {
c11_thrdpool_worker* p_worker = &pool->workers[i]; c11_thrdpool_worker* p_worker = &pool->workers[i];
@ -150,40 +132,30 @@ void c11_thrdpool__dtor(c11_thrdpool* pool) {
} }
PK_FREE(pool->workers); PK_FREE(pool->workers);
c11_mutex__dtor(&pool->workers_mutex[0]); c11_mutex__dtor(&pool->workers_mutex);
c11_mutex__dtor(&pool->workers_mutex[1]); c11_cond__dtor(&pool->workers_cond);
c11_cond__dtor(&pool->workers_cond[0]);
c11_cond__dtor(&pool->workers_cond[1]);
} }
void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args, int num_tasks) { void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args, int num_tasks) {
if(num_tasks == 0) return; if(num_tasks == 0) return;
bool expected = false; while(atomic_load(&pool->tasks.sync_val) != 0) {
while(!atomic_compare_exchange_weak(&pool->is_busy, &expected, true)) {
expected = false;
c11_thrd__yield(); c11_thrd__yield();
} }
// assign tasks // assign tasks
c11_mutex__lock(&pool->workers_mutex[0]); c11_mutex__lock(&pool->workers_mutex);
pool->tasks.func = func; pool->tasks.func = func;
pool->tasks.args = args; pool->tasks.args = args;
pool->tasks.length = num_tasks; pool->tasks.length = num_tasks;
pool->tasks.sync_val = 0; atomic_store(&pool->tasks.sync_val, 1);
atomic_store(&pool->tasks.current_index, 0); atomic_store(&pool->tasks.current_index, 0);
atomic_store(&pool->tasks.completed_count, 0); atomic_store(&pool->tasks.completed_count, 0);
c11_cond__broadcast(&pool->workers_cond[0]); c11_cond__broadcast(&pool->workers_cond);
c11_mutex__unlock(&pool->workers_mutex[0]); c11_mutex__unlock(&pool->workers_mutex);
// wait for complete // wait for complete
while(atomic_load(&pool->tasks.completed_count) < num_tasks) { while(atomic_load(&pool->tasks.completed_count) < num_tasks) {
c11_thrd__yield(); c11_thrd__yield();
} }
// notify workers to proceed atomic_store(&pool->tasks.sync_val, 0);
c11_mutex__lock(&pool->workers_mutex[1]);
pool->tasks.sync_val = 1;
c11_cond__broadcast(&pool->workers_cond[1]);
c11_mutex__unlock(&pool->workers_mutex[1]);
// mark as not busy
atomic_store(&pool->is_busy, false);
} }
#endif // PK_ENABLE_THREADS #endif // PK_ENABLE_THREADS