From 450f0263a2ace53009e6887c2adadecf22492d00 Mon Sep 17 00:00:00 2001 From: blueloveTH Date: Tue, 25 Nov 2025 01:03:55 +0800 Subject: [PATCH] backup --- build_g_threads.sh | 19 +++++++++++ include/pocketpy/common/threads.h | 4 +++ src/common/threads.c | 54 ++++++++++++++++++++++++++++++- src2/test_threads.c | 51 +++++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 build_g_threads.sh create mode 100644 src2/test_threads.c diff --git a/build_g_threads.sh b/build_g_threads.sh new file mode 100644 index 00000000..ffb1ed9c --- /dev/null +++ b/build_g_threads.sh @@ -0,0 +1,19 @@ +set -e + +# python prebuild.py + +SRC=$(find src/ -name "*.c") + +FLAGS="-std=c11 -lm -ldl -lpthread -Iinclude -O0 -Wfatal-errors -g -DDEBUG -DPK_ENABLE_OS=1" + +SANITIZE_FLAGS="-fsanitize=undefined,thread -fno-sanitize=function" + +if [ "$(uname)" == "Darwin" ]; then + SANITIZE_FLAGS="-fsanitize=undefined,thread" +fi + +SRC2=${1:-src2/test_threads.c} + +echo "Compiling C files..." +clang $FLAGS $SANITIZE_FLAGS $SRC $SRC2 -o main + diff --git a/include/pocketpy/common/threads.h b/include/pocketpy/common/threads.h index de988bf4..a78a1621 100644 --- a/include/pocketpy/common/threads.h +++ b/include/pocketpy/common/threads.h @@ -54,6 +54,8 @@ typedef struct c11_thrdpool_tasks { } c11_thrdpool_tasks; typedef struct c11_thrdpool_worker { + int index; + atomic_int* p_ready_workers_num; c11_mutex_t* p_mutex; c11_cond_t* p_cond; c11_thrdpool_tasks* p_tasks; @@ -62,6 +64,8 @@ typedef struct c11_thrdpool_worker { typedef struct c11_thrdpool { int length; + atomic_int ready_workers_num; + c11_thrdpool_worker* workers; c11_mutex_t workers_mutex; diff --git a/src/common/threads.c b/src/common/threads.c index 5d01b1bc..6a0c158a 100644 --- a/src/common/threads.c +++ b/src/common/threads.c @@ -1,5 +1,6 @@ #include "pocketpy/common/threads.h" #include "pocketpy/common/utils.h" +#include #if PK_ENABLE_THREADS @@ -71,15 +72,45 @@ void c11_cond__broadcast(c11_cond_t* cond) { cnd_broadcast(cond); } #endif +#define C11_THRDPOOL_DEBUG 0 + +#if C11_THRDPOOL_DEBUG +int64_t time_ns(); + +static void c11_thrdpool_debug_log(int index, const char* format, ...) { + va_list args; + va_start(args, format); + char buf[512]; + int n = sprintf(buf, "[%.6f - Worker %2d] ", time_ns() / 1e9, index); + vsprintf(buf + n, format, args); + printf("%s\n", buf); + va_end(args); +} +#else +#define c11_thrdpool_debug_log(...) \ + do { \ + } while(0) +#endif + static c11_thrd_retval_t _thrdpool_worker(void* arg) { c11_thrdpool_worker* p_worker = (c11_thrdpool_worker*)arg; c11_thrdpool_tasks* p_tasks = p_worker->p_tasks; while(true) { + c11_thrdpool_debug_log(p_worker->index, "Waiting for mutex lock..."); c11_mutex__lock(p_worker->p_mutex); + atomic_fetch_add(p_worker->p_ready_workers_num, 1); + c11_thrdpool_debug_log(p_worker->index, "Mutex locked, checking for tasks..."); + if(atomic_load(&p_tasks->sync_val) == -1) { + c11_mutex__unlock(p_worker->p_mutex); + return 0; // force kill + } while(true) { c11_cond__wait(p_worker->p_cond, p_worker->p_mutex); int sync_val = atomic_load(&p_tasks->sync_val); + c11_thrdpool_debug_log(p_worker->index, + "Woke up from condition variable, sync_val=%d", + sync_val); if(sync_val == 1) break; if(sync_val == -1) { c11_mutex__unlock(p_worker->p_mutex); @@ -87,7 +118,9 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) { } } c11_mutex__unlock(p_worker->p_mutex); + atomic_fetch_sub(p_worker->p_ready_workers_num, 1); + c11_thrdpool_debug_log(p_worker->index, "Received tasks, starting execution..."); // execute tasks while(true) { int arg_index = atomic_fetch_add(&p_tasks->current_index, 1); @@ -100,15 +133,23 @@ static c11_thrd_retval_t _thrdpool_worker(void* arg) { } } - while(atomic_load(&p_tasks->sync_val) == 0) { + c11_thrdpool_debug_log(p_worker->index, + "Execution complete, waiting for `sync_val` reset..."); + while(true) { + int sync_val = atomic_load(&p_tasks->sync_val); + if(sync_val == 0) break; + if(sync_val == -1) return 0; // force kill c11_thrd__yield(); } + c11_thrdpool_debug_log(p_worker->index, + "`sync_val` reset detected, waiting for next tasks..."); } return 0; } void c11_thrdpool__ctor(c11_thrdpool* pool, int length) { pool->length = length; + atomic_store(&pool->ready_workers_num, 0); pool->workers = PK_MALLOC(sizeof(c11_thrdpool_worker) * length); c11_mutex__ctor(&pool->workers_mutex); @@ -118,6 +159,8 @@ void c11_thrdpool__ctor(c11_thrdpool* pool, int length) { for(int i = 0; i < length; i++) { c11_thrdpool_worker* p_worker = &pool->workers[i]; + p_worker->index = i; + p_worker->p_ready_workers_num = &pool->ready_workers_num; p_worker->p_mutex = &pool->workers_mutex; p_worker->p_cond = &pool->workers_cond; p_worker->p_tasks = &pool->tasks; @@ -129,6 +172,7 @@ void c11_thrdpool__ctor(c11_thrdpool* pool, int length) { void c11_thrdpool__dtor(c11_thrdpool* pool) { c11_mutex__lock(&pool->workers_mutex); atomic_store(&pool->tasks.sync_val, -1); + c11_thrdpool_debug_log(-1, "Terminating all workers..."); c11_cond__broadcast(&pool->workers_cond); c11_mutex__unlock(&pool->workers_mutex); @@ -144,6 +188,12 @@ void c11_thrdpool__dtor(c11_thrdpool* pool) { void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args, int num_tasks) { if(num_tasks == 0) return; + c11_thrdpool_debug_log(-1, "c11_thrdpool__map() called on %d tasks...", num_tasks); + while(atomic_load(&pool->ready_workers_num) < pool->length) { + c11_thrd__yield(); + } + c11_thrdpool_debug_log(-1, "All %d workers are ready.", pool->length); + // assign tasks c11_mutex__lock(&pool->workers_mutex); pool->tasks.func = func; @@ -155,10 +205,12 @@ void c11_thrdpool__map(c11_thrdpool* pool, c11_thrdpool_func_t func, void** args c11_cond__broadcast(&pool->workers_cond); c11_mutex__unlock(&pool->workers_mutex); // wait for complete + c11_thrdpool_debug_log(-1, "Waiting for %d tasks to complete...", num_tasks); while(atomic_load(&pool->tasks.completed_count) < num_tasks) { c11_thrd__yield(); } atomic_store(&pool->tasks.sync_val, 0); + c11_thrdpool_debug_log(-1, "All %d tasks completed, `sync_val` was reset.", num_tasks); } #endif // PK_ENABLE_THREADS \ No newline at end of file diff --git a/src2/test_threads.c b/src2/test_threads.c new file mode 100644 index 00000000..22e4df62 --- /dev/null +++ b/src2/test_threads.c @@ -0,0 +1,51 @@ +#include "pocketpy/common/threads.h" +#include + +int64_t time_ns(); + +static void func(void* arg) { + long long* val = (long long*)arg; + long long sum = 0; + for(int i = 0; i < 1000000; i++) { + sum += *val; + } + *val = sum; +} + +int main(int argc, char** argv) { + int threads_num = 16; + if(argc == 2) threads_num = atoi(argv[1]); + printf("Using %d threads in the thread pool.\n", threads_num); + + c11_thrdpool pool; + c11_thrdpool__ctor(&pool, threads_num); + + int num_tasks = 2000; + long long* data = PK_MALLOC(sizeof(long long) * num_tasks); + void** args = PK_MALLOC(sizeof(void*) * num_tasks); + + for(int i = 0; i < 4; i++) { + for(int i = 0; i < num_tasks; i++) { + data[i] = i; + args[i] = &data[i]; + } + + printf("==> %dth run\n", i + 1); + int64_t start_ns = time_ns(); + c11_thrdpool__map(&pool, func, args, num_tasks); + int64_t end_ns = time_ns(); + double elapsed = (end_ns - start_ns) / 1e9; + printf(" Results: %lld, %lld, %lld, %lld, %lld\n", + data[0], + data[1], + data[2], + data[100], + data[400]); + printf(" Elapsed time for %d tasks: %.6f seconds\n", num_tasks, elapsed); + } + + c11_thrdpool__dtor(&pool); + PK_FREE(args); + PK_FREE(data); + return 0; +} \ No newline at end of file