mirror of
https://github.com/pocketpy/pocketpy
synced 2025-12-08 03:00:16 +00:00
Compare commits
4 Commits
4fc18c921d
...
450f0263a2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
450f0263a2 | ||
|
|
8a4338faf9 | ||
|
|
558eb10854 | ||
|
|
fa878636bb |
19
build_g_threads.sh
Normal file
19
build_g_threads.sh
Normal file
@ -0,0 +1,19 @@
|
||||
set -e
|
||||
|
||||
# python prebuild.py
|
||||
|
||||
SRC=$(find src/ -name "*.c")
|
||||
|
||||
FLAGS="-std=c11 -lm -ldl -lpthread -Iinclude -O0 -Wfatal-errors -g -DDEBUG -DPK_ENABLE_OS=1"
|
||||
|
||||
SANITIZE_FLAGS="-fsanitize=undefined,thread -fno-sanitize=function"
|
||||
|
||||
if [ "$(uname)" == "Darwin" ]; then
|
||||
SANITIZE_FLAGS="-fsanitize=undefined,thread"
|
||||
fi
|
||||
|
||||
SRC2=${1:-src2/test_threads.c}
|
||||
|
||||
echo "Compiling C files..."
|
||||
clang $FLAGS $SANITIZE_FLAGS $SRC $SRC2 -o main
|
||||
|
||||
@ -45,6 +45,7 @@ void c11_cond__broadcast(c11_cond_t* cond);
|
||||
typedef void (*c11_thrdpool_func_t)(void* arg);
|
||||
|
||||
typedef struct c11_thrdpool_tasks {
|
||||
atomic_int sync_val;
|
||||
c11_thrdpool_func_t func;
|
||||
void** args;
|
||||
int length;
|
||||
@ -53,19 +54,20 @@ typedef struct c11_thrdpool_tasks {
|
||||
} c11_thrdpool_tasks;
|
||||
|
||||
typedef struct c11_thrdpool_worker {
|
||||
int index;
|
||||
atomic_int* p_ready_workers_num;
|
||||
c11_mutex_t* p_mutex;
|
||||
c11_cond_t* p_cond;
|
||||
c11_thrdpool_tasks* p_tasks;
|
||||
|
||||
bool should_exit;
|
||||
c11_thrd_t thread;
|
||||
} c11_thrdpool_worker;
|
||||
|
||||
typedef struct c11_thrdpool {
|
||||
int length;
|
||||
atomic_int ready_workers_num;
|
||||
|
||||
c11_thrdpool_worker* workers;
|
||||
atomic_bool is_busy;
|
||||
|
||||
|
||||
c11_mutex_t workers_mutex;
|
||||
c11_cond_t workers_cond;
|
||||
c11_thrdpool_tasks tasks;
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
#include "pocketpy/common/threads.h"
|
||||
#include "pocketpy/common/utils.h"
|
||||
#include <stdarg.h>
|
||||
|
||||
#if PK_ENABLE_THREADS
|
||||
|
||||
@ -71,23 +72,55 @@ void c11_cond__broadcast(c11_cond_t* cond) { cnd_broadcast(cond); }
|
||||
|
||||
#endif
|
||||
|
||||
#define C11_THRDPOOL_DEBUG 0
|
||||
|
||||
#if C11_THRDPOOL_DEBUG
|
||||
int64_t time_ns();
|
||||
|
||||
static void c11_thrdpool_debug_log(int index, const char* format, ...) {
|
||||
va_list args;
|
||||
va_start(args, format);
|
||||
char buf[512];
|
||||
int n = sprintf(buf, "[%.6f - Worker %2d] ", time_ns() / 1e9, index);
|
||||
vsprintf(buf + n, format, args);
|
||||
printf("%s\n", buf);
|
||||
va_end(args);
|
||||
}
|
||||
#else
|
||||
#define c11_thrdpool_debug_log(...) \
|
||||
do { \
|
||||
} while(0)
|
||||
#endif
|
||||
|
||||
static c11_thrd_retval_t _thrdpool_worker(void* arg) {
|
||||
c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg;
|
||||
c11_thrdpool_tasks* p_tasks = p_worker->p_tasks;
|
||||
|
||||
while(true) {
|
||||
// wait for tasks
|
||||
c11_thrdpool_debug_log(p_worker->index, "Waiting for mutex lock...");
|
||||
c11_mutex__lock(p_worker->p_mutex);
|
||||
while(!p_worker->p_tasks && !p_worker->should_exit) {
|
||||
c11_cond__wait(p_worker->p_cond, p_worker->p_mutex);
|
||||
}
|
||||
if(p_worker->should_exit) {
|
||||
atomic_fetch_add(p_worker->p_ready_workers_num, 1);
|
||||
c11_thrdpool_debug_log(p_worker->index, "Mutex locked, checking for tasks...");
|
||||
if(atomic_load(&p_tasks->sync_val) == -1) {
|
||||
c11_mutex__unlock(p_worker->p_mutex);
|
||||
break;
|
||||
return 0; // force kill
|
||||
}
|
||||
while(true) {
|
||||
c11_cond__wait(p_worker->p_cond, p_worker->p_mutex);
|
||||
int sync_val = atomic_load(&p_tasks->sync_val);
|
||||
c11_thrdpool_debug_log(p_worker->index,
|
||||
"Woke up from condition variable, sync_val=%d",
|
||||
sync_val);
|
||||
if(sync_val == 1) break;
|
||||
if(sync_val == -1) {
|
||||
c11_mutex__unlock(p_worker->p_mutex);
|
||||
return 0; // force kill
|
||||
}
|
||||
}
|
||||
|
||||
c11_thrdpool_tasks* p_tasks = p_worker->p_tasks;
|
||||
c11_mutex__unlock(p_worker->p_mutex);
|
||||
atomic_fetch_sub(p_worker->p_ready_workers_num, 1);
|
||||
|
||||
c11_thrdpool_debug_log(p_worker->index, "Received tasks, starting execution...");
|
||||
// execute tasks
|
||||
while(true) {
|
||||
int arg_index = atomic_fetch_add(&p_tasks->current_index, 1);
|
||||
@ -96,76 +129,88 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) {
|
||||
p_tasks->func(arg);
|
||||
atomic_fetch_add(&p_tasks->completed_count, 1);
|
||||
} else {
|
||||
c11_mutex__lock(p_worker->p_mutex);
|
||||
p_worker->p_tasks = NULL;
|
||||
c11_mutex__unlock(p_worker->p_mutex);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
c11_thrdpool_debug_log(p_worker->index,
|
||||
"Execution complete, waiting for `sync_val` reset...");
|
||||
while(true) {
|
||||
int sync_val = atomic_load(&p_tasks->sync_val);
|
||||
if(sync_val == 0) break;
|
||||
if(sync_val == -1) return 0; // force kill
|
||||
c11_thrd__yield();
|
||||
}
|
||||
c11_thrdpool_debug_log(p_worker->index,
|
||||
"`sync_val` reset detected, waiting for next tasks...");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void c11_thrdpool__ctor(c11_thrdpool* pool, int length) {
|
||||
pool->length = length;
|
||||
atomic_store(&pool->ready_workers_num, 0);
|
||||
pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length);
|
||||
atomic_store(&pool->is_busy, false);
|
||||
|
||||
c11_mutex__ctor(&pool->workers_mutex);
|
||||
c11_cond__ctor(&pool->workers_cond);
|
||||
|
||||
atomic_store(&pool->tasks.sync_val, 0);
|
||||
|
||||
for(int i = 0; i < length; i++) {
|
||||
c11_thrdpool_worker* p_worker = &pool->workers[i];
|
||||
|
||||
p_worker->index = i;
|
||||
p_worker->p_ready_workers_num = &pool->ready_workers_num;
|
||||
p_worker->p_mutex = &pool->workers_mutex;
|
||||
p_worker->p_cond = &pool->workers_cond;
|
||||
p_worker->p_tasks = &pool->tasks;
|
||||
p_worker->should_exit = false;
|
||||
|
||||
bool ok = c11_thrd__create(&p_worker->thread, _thrdpool_worker, p_worker);
|
||||
c11__rtassert(ok);
|
||||
}
|
||||
}
|
||||
|
||||
void c11_thrdpool__dtor(c11_thrdpool* pool) {
|
||||
for(int i = 0; i < pool->length; i++) {
|
||||
c11_thrdpool_worker* p_worker = &pool->workers[i];
|
||||
atomic_store(&p_worker->should_exit, true);
|
||||
}
|
||||
c11_mutex__lock(&pool->workers_mutex);
|
||||
atomic_store(&pool->tasks.sync_val, -1);
|
||||
c11_thrdpool_debug_log(-1, "Terminating all workers...");
|
||||
c11_cond__broadcast(&pool->workers_cond);
|
||||
c11_mutex__unlock(&pool->workers_mutex);
|
||||
|
||||
for(int i = 0; i < pool->length; i++) {
|
||||
c11_thrdpool_worker* p_worker = &pool->workers[i];
|
||||
c11_thrd__join(p_worker->thread);
|
||||
}
|
||||
|
||||
PK_FREE(pool->workers);
|
||||
c11_mutex__dtor(&pool->workers_mutex);
|
||||
c11_cond__dtor(&pool->workers_cond);
|
||||
PK_FREE(pool->workers);
|
||||
}
|
||||
|
||||
void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args, int num_tasks) {
|
||||
if(num_tasks == 0) return;
|
||||
bool expected = false;
|
||||
while(!atomic_compare_exchange_weak(&pool->is_busy, &expected, true)) {
|
||||
expected = false;
|
||||
c11_thrdpool_debug_log(-1, "c11_thrdpool__map() called on %d tasks...", num_tasks);
|
||||
while(atomic_load(&pool->ready_workers_num) < pool->length) {
|
||||
c11_thrd__yield();
|
||||
}
|
||||
c11_thrdpool_debug_log(-1, "All %d workers are ready.", pool->length);
|
||||
|
||||
// assign tasks
|
||||
c11_mutex__lock(&pool->workers_mutex);
|
||||
pool->tasks.func = func;
|
||||
pool->tasks.args = args;
|
||||
pool->tasks.length = num_tasks;
|
||||
atomic_store(&pool->tasks.sync_val, 1);
|
||||
atomic_store(&pool->tasks.current_index, 0);
|
||||
atomic_store(&pool->tasks.completed_count, 0);
|
||||
// wake up all workers
|
||||
c11_cond__broadcast(&pool->workers_cond);
|
||||
c11_mutex__unlock(&pool->workers_mutex);
|
||||
// wait for complete
|
||||
c11_thrdpool_debug_log(-1, "Waiting for %d tasks to complete...", num_tasks);
|
||||
while(atomic_load(&pool->tasks.completed_count) < num_tasks) {
|
||||
c11_thrd__yield();
|
||||
}
|
||||
atomic_store(&pool->is_busy, false);
|
||||
atomic_store(&pool->tasks.sync_val, 0);
|
||||
c11_thrdpool_debug_log(-1, "All %d tasks completed, `sync_val` was reset.", num_tasks);
|
||||
}
|
||||
|
||||
#endif // PK_ENABLE_THREADS
|
||||
51
src2/test_threads.c
Normal file
51
src2/test_threads.c
Normal file
@ -0,0 +1,51 @@
|
||||
#include "pocketpy/common/threads.h"
|
||||
#include <stdio.h>
|
||||
|
||||
int64_t time_ns();
|
||||
|
||||
static void func(void* arg) {
|
||||
long long* val = (long long*)arg;
|
||||
long long sum = 0;
|
||||
for(int i = 0; i < 1000000; i++) {
|
||||
sum += *val;
|
||||
}
|
||||
*val = sum;
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
int threads_num = 16;
|
||||
if(argc == 2) threads_num = atoi(argv[1]);
|
||||
printf("Using %d threads in the thread pool.\n", threads_num);
|
||||
|
||||
c11_thrdpool pool;
|
||||
c11_thrdpool__ctor(&pool, threads_num);
|
||||
|
||||
int num_tasks = 2000;
|
||||
long long* data = PK_MALLOC(sizeof(long long) * num_tasks);
|
||||
void** args = PK_MALLOC(sizeof(void*) * num_tasks);
|
||||
|
||||
for(int i = 0; i < 4; i++) {
|
||||
for(int i = 0; i < num_tasks; i++) {
|
||||
data[i] = i;
|
||||
args[i] = &data[i];
|
||||
}
|
||||
|
||||
printf("==> %dth run\n", i + 1);
|
||||
int64_t start_ns = time_ns();
|
||||
c11_thrdpool__map(&pool, func, args, num_tasks);
|
||||
int64_t end_ns = time_ns();
|
||||
double elapsed = (end_ns - start_ns) / 1e9;
|
||||
printf(" Results: %lld, %lld, %lld, %lld, %lld\n",
|
||||
data[0],
|
||||
data[1],
|
||||
data[2],
|
||||
data[100],
|
||||
data[400]);
|
||||
printf(" Elapsed time for %d tasks: %.6f seconds\n", num_tasks, elapsed);
|
||||
}
|
||||
|
||||
c11_thrdpool__dtor(&pool);
|
||||
PK_FREE(args);
|
||||
PK_FREE(data);
|
||||
return 0;
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user