From 944821431736e96fdf00432d2f49251c8b679d8c Mon Sep 17 00:00:00 2001 From: blueloveTH Date: Sun, 13 Apr 2025 18:28:27 +0800 Subject: [PATCH] test impl of `pkpy.ComputeThread` --- build.sh | 2 +- build_g.sh | 2 +- build_g_32.sh | 2 +- include/pocketpy/common/str.h | 3 + include/typings/pkpy.pyi | 23 ++- src/common/str.c | 14 ++ src/modules/pkpy.c | 309 +++++++++++++++++++++++++++++++++ src2/multi_vm_isolate.c | 49 ++++++ tests/{98_lz4.py => 72_lz4.py} | 0 tests/98_thread.py | 30 ++++ 10 files changed, 429 insertions(+), 5 deletions(-) create mode 100644 src2/multi_vm_isolate.c rename tests/{98_lz4.py => 72_lz4.py} (100%) create mode 100644 tests/98_thread.py diff --git a/build.sh b/build.sh index d1bf9431..7dbbfc54 100644 --- a/build.sh +++ b/build.sh @@ -20,7 +20,7 @@ SRC2=${1:-src2/main.c} echo "> Compiling and linking source files... " -clang -std=c11 -O2 -Wfatal-errors -Iinclude -DNDEBUG -o main $SRC $SRC2 -lm -ldl +clang -std=c11 -O2 -Wfatal-errors -Iinclude -DNDEBUG -o main $SRC $SRC2 -lm -ldl -lpthread if [ $? -eq 0 ]; then echo "Build completed. Type \"./main\" to enter REPL." diff --git a/build_g.sh b/build_g.sh index 3cdf16b2..1a02c970 100644 --- a/build_g.sh +++ b/build_g.sh @@ -4,7 +4,7 @@ set -e SRC=$(find src/ -name "*.c") -FLAGS="-std=c11 -lm -ldl -Iinclude -O0 -Wfatal-errors -g -DDEBUG -DPK_ENABLE_OS=1" +FLAGS="-std=c11 -lm -ldl -lpthread -Iinclude -O0 -Wfatal-errors -g -DDEBUG -DPK_ENABLE_OS=1" SANITIZE_FLAGS="-fsanitize=address,leak,undefined" diff --git a/build_g_32.sh b/build_g_32.sh index e2190287..cb1fd246 100644 --- a/build_g_32.sh +++ b/build_g_32.sh @@ -4,7 +4,7 @@ set -e SRC=$(find src/ -name "*.c") -FLAGS="-std=c11 -lm -ldl -Iinclude -O0 -Wfatal-errors -g -DDEBUG -DPK_ENABLE_OS=1" +FLAGS="-std=c11 -lm -ldl -lpthread -Iinclude -O0 -Wfatal-errors -g -DDEBUG -DPK_ENABLE_OS=1" SANITIZE_FLAGS="-fsanitize=address,leak,undefined" diff --git a/include/pocketpy/common/str.h b/include/pocketpy/common/str.h index d2d09d11..e9757c25 100644 --- a/include/pocketpy/common/str.h +++ b/include/pocketpy/common/str.h @@ -68,6 +68,9 @@ int c11__u8_header(unsigned char c, bool suppress); int c11__u8_value(int u8bytes, const char* data); int c11__u32_to_u8(uint32_t utf32_char, char utf8_output[4]); +char* c11_strdup(const char* str); +unsigned char* c11_memdup(const unsigned char* data, int size); + typedef enum IntParsingResult { IntParsing_SUCCESS, IntParsing_FAILURE, diff --git a/include/typings/pkpy.pyi b/include/typings/pkpy.pyi index 36ed66d9..1c3de4ea 100644 --- a/include/typings/pkpy.pyi +++ b/include/typings/pkpy.pyi @@ -1,4 +1,4 @@ -from typing import Self +from typing import Self, Literal from linalg import vec2, vec2i class TValue[T]: @@ -16,4 +16,23 @@ def memory_usage() -> str: """Return a summary of the memory usage.""" def is_user_defined_type(t: type) -> bool: - """Check if a type is user-defined. This means the type was created by executing python `class` statement.""" \ No newline at end of file + """Check if a type is user-defined. This means the type was created by executing python `class` statement.""" + +def currentvm() -> int: + """Return the current VM index.""" + + +class ComputeThread: + def __init__(self, vm_index: Literal[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]): ... + + @property + def is_done(self) -> bool: ... + + def join(self) -> None: ... + + def last_error(self) -> str | None: ... + def last_retval(self): ... + + def exec(self, source: str) -> None: ... + def eval(self, source: str) -> None: ... + def call(self, eval_src: str, *args, **kwargs) -> None: ... diff --git a/src/common/str.c b/src/common/str.c index 9b5a7aa1..9186f835 100644 --- a/src/common/str.c +++ b/src/common/str.c @@ -348,6 +348,20 @@ int c11__u32_to_u8(uint32_t utf32_char, char utf8_output[4]) { return length; } +char* c11_strdup(const char* str){ + int len = strlen(str); + char* dst = PK_MALLOC(len + 1); + memcpy(dst, str, len); + dst[len] = '\0'; + return dst; +} + +unsigned char* c11_memdup(const unsigned char* src, int size) { + unsigned char* dst = PK_MALLOC(size); + memcpy(dst, src, size); + return dst; +} + IntParsingResult c11__parse_uint(c11_sv text, int64_t* out, int base) { *out = 0; diff --git a/src/modules/pkpy.c b/src/modules/pkpy.c index f1a76c2f..913afabe 100644 --- a/src/modules/pkpy.c +++ b/src/modules/pkpy.c @@ -7,6 +7,9 @@ #include "pocketpy/common/sstream.h" #include "pocketpy/interpreter/vm.h" +#include +#include + #define DEF_TVALUE_METHODS(T, Field) \ static bool TValue_##T##__new__(int argc, py_Ref argv) { \ PY_CHECK_ARGC(2); \ @@ -63,6 +66,308 @@ static bool pkpy_is_user_defined_type(int argc, py_Ref argv) { return true; } +static bool pkpy_currentvm(int argc, py_Ref argv) { + PY_CHECK_ARGC(0); + py_newint(py_retval(), py_currentvm()); + return true; +} + +typedef struct c11_ComputeThread c11_ComputeThread; + +typedef struct { + c11_ComputeThread* self; + char* eval_src; + unsigned char* args_data; + int args_size; + unsigned char* kwargs_data; + int kwargs_size; +} ComputeThreadJobCall; + +typedef struct { + c11_ComputeThread* self; + char* source; + enum py_CompileMode mode; +} ComputeThreadJobExec; + +static void ComputeThreadJobCall__dtor(void* arg) { + ComputeThreadJobCall* self = arg; + PK_FREE(self->eval_src); + PK_FREE(self->args_data); + PK_FREE(self->kwargs_data); +} + +static void ComputeThreadJobExec__dtor(void* arg) { + ComputeThreadJobExec* self = arg; + PK_FREE(self->source); +} + +typedef struct c11_ComputeThread { + int vm_index; + atomic_bool is_done; + unsigned char* last_retval_data; + int last_retval_size; + char* last_error; + + thrd_t thread; + void* job; + void (*job_dtor)(void*); +} c11_ComputeThread; + +static void + c11_ComputeThread__reset_job(c11_ComputeThread* self, void* job, void (*job_dtor)(void*)) { + if(self->job) { + self->job_dtor(self->job); + PK_FREE(self->job); + } + self->job = job; + self->job_dtor = job_dtor; +} + +static bool _pk_compute_thread_flags[16]; + +static void c11_ComputeThread__dtor(c11_ComputeThread* self) { + if(!self->is_done) { + c11__abort("ComputeThread(%d) is not done yet!! But the object was deleted.", + self->vm_index); + } + c11_ComputeThread__reset_job(self, NULL, NULL); + _pk_compute_thread_flags[self->vm_index] = false; +} + +static void c11_ComputeThread__on_job_begin(c11_ComputeThread* self) { + if(self->last_retval_data) { + PK_FREE(self->last_retval_data); + self->last_retval_data = NULL; + self->last_retval_size = 0; + } + if(self->last_error) { + PK_FREE(self->last_error); + self->last_error = NULL; + } + py_switchvm(self->vm_index); +} + +static bool ComputeThread__new__(int argc, py_Ref argv) { + c11_ComputeThread* self = + py_newobject(py_retval(), py_totype(argv), 0, sizeof(c11_ComputeThread)); + self->vm_index = 0; + self->is_done = true; + self->last_retval_data = NULL; + self->last_retval_size = 0; + self->last_error = NULL; + self->job = NULL; + self->job_dtor = NULL; + return true; +} + +static bool ComputeThread__init__(int argc, py_Ref argv) { + PY_CHECK_ARGC(2); + PY_CHECK_ARG_TYPE(1, tp_int); + c11_ComputeThread* self = py_touserdata(py_arg(0)); + int index = py_toint(py_arg(1)); + if(index >= 1 && index < 16) { + if(_pk_compute_thread_flags[index]) { + return ValueError("vm_index %d is already in use", index); + } + _pk_compute_thread_flags[index] = true; + self->vm_index = index; + } else { + return ValueError("vm_index %d is out of range", index); + } + py_newnone(py_retval()); + return true; +} + +static bool ComputeThread_is_done(int argc, py_Ref argv) { + PY_CHECK_ARGC(1); + c11_ComputeThread* self = py_touserdata(argv); + py_newbool(py_retval(), self->is_done); + return true; +} + +static bool ComputeThread_join(int argc, py_Ref argv) { + PY_CHECK_ARGC(1); + c11_ComputeThread* self = py_touserdata(argv); + while(!self->is_done) + thrd_yield(); + py_newnone(py_retval()); + return true; +} + +static bool ComputeThread_last_error(int argc, py_Ref argv) { + PY_CHECK_ARGC(1); + c11_ComputeThread* self = py_touserdata(argv); + if(!self->is_done) return OSError("thread is not done yet"); + if(self->last_error) { + py_newstr(py_retval(), self->last_error); + } else { + py_newnone(py_retval()); + } + return true; +} + +static bool ComputeThread_last_retval(int argc, py_Ref argv) { + PY_CHECK_ARGC(1); + c11_ComputeThread* self = py_touserdata(argv); + if(!self->is_done) return OSError("thread is not done yet"); + if(self->last_retval_data == NULL) return ValueError("no retval available"); + return py_pickle_loads(self->last_retval_data, self->last_retval_size); +} + +static int ComputeThreadJob_call(void* arg) { + ComputeThreadJobCall* job = arg; + c11_ComputeThread* self = job->self; + c11_ComputeThread__on_job_begin(self); + + py_StackRef p0 = py_peek(0); + + if(!py_pusheval(job->eval_src, NULL)) goto __ERROR; + // [callable] + if(!py_pickle_loads(job->args_data, job->args_size)) goto __ERROR; + py_push(py_retval()); + // [callable, args] + if(!py_pickle_loads(job->kwargs_data, job->kwargs_size)) goto __ERROR; + py_push(py_retval()); + // [callable, args, kwargs] + if(!py_smarteval("_0(*_1, **_2)", NULL, py_peek(-3), py_peek(-2), py_peek(-1))) goto __ERROR; + + py_shrink(3); + if(!py_pickle_dumps(py_retval())) goto __ERROR; + int retval_size; + unsigned char* retval_data = py_tobytes(py_retval(), &retval_size); + self->last_retval_data = c11_memdup(retval_data, retval_size); + self->last_retval_size = retval_size; + self->is_done = true; + return 0; + +__ERROR: + self->last_error = py_formatexc(); + self->is_done = true; + py_clearexc(p0); + py_newnone(py_retval()); + return 0; +} + +static int ComputeThreadJob_exec(void* arg) { + ComputeThreadJobExec* job = arg; + c11_ComputeThread* self = job->self; + c11_ComputeThread__on_job_begin(self); + + py_StackRef p0 = py_peek(0); + if(!py_exec(job->source, "", job->mode, NULL)) goto __ERROR; + if(!py_pickle_dumps(py_retval())) goto __ERROR; + int retval_size; + unsigned char* retval_data = py_tobytes(py_retval(), &retval_size); + self->last_retval_data = c11_memdup(retval_data, retval_size); + self->last_retval_size = retval_size; + self->is_done = true; + return 0; + +__ERROR: + self->last_error = py_formatexc(); + self->is_done = true; + py_clearexc(p0); + return 0; +} + +static bool ComputeThread_exec(int argc, py_Ref argv) { + PY_CHECK_ARGC(2); + c11_ComputeThread* self = py_touserdata(py_arg(0)); + if(!self->is_done) return OSError("thread is not done yet"); + PY_CHECK_ARG_TYPE(1, tp_str); + const char* source = py_tostr(py_arg(1)); + /**************************/ + ComputeThreadJobExec* job = PK_MALLOC(sizeof(ComputeThreadJobExec)); + job->self = self; + job->source = c11_strdup(source); + job->mode = EXEC_MODE; + c11_ComputeThread__reset_job(self, job, ComputeThreadJobExec__dtor); + /**************************/ + self->is_done = false; + int res = thrd_create(&self->thread, ComputeThreadJob_exec, job); + if(res != thrd_success) { + self->is_done = true; + return OSError("thrd_create() failed"); + } + py_newnone(py_retval()); + return true; +} + +static bool ComputeThread_eval(int argc, py_Ref argv) { + PY_CHECK_ARGC(2); + c11_ComputeThread* self = py_touserdata(py_arg(0)); + if(!self->is_done) return OSError("thread is not done yet"); + PY_CHECK_ARG_TYPE(1, tp_str); + const char* source = py_tostr(py_arg(1)); + /**************************/ + ComputeThreadJobExec* job = PK_MALLOC(sizeof(ComputeThreadJobExec)); + job->self = self; + job->source = c11_strdup(source); + job->mode = EVAL_MODE; + c11_ComputeThread__reset_job(self, job, ComputeThreadJobExec__dtor); + /**************************/ + self->is_done = false; + int res = thrd_create(&self->thread, ComputeThreadJob_exec, job); + if(res != thrd_success) { + self->is_done = true; + return OSError("thrd_create() failed"); + } + py_newnone(py_retval()); + return true; +} + +static bool ComputeThread_call(int argc, py_Ref argv) { + PY_CHECK_ARGC(4); + c11_ComputeThread* self = py_touserdata(py_arg(0)); + if(!self->is_done) return OSError("thread is not done yet"); + PY_CHECK_ARG_TYPE(1, tp_str); + PY_CHECK_ARG_TYPE(2, tp_tuple); + PY_CHECK_ARG_TYPE(3, tp_dict); + // eval_src + const char* eval_src = py_tostr(py_arg(1)); + // *args + if(!py_pickle_dumps(py_arg(2))) return false; + int args_size; + unsigned char* args_data = py_tobytes(py_retval(), &args_size); + // *kwargs + if(!py_pickle_dumps(py_arg(3))) return false; + int kwargs_size; + unsigned char* kwargs_data = py_tobytes(py_retval(), &kwargs_size); + /**************************/ + ComputeThreadJobCall* job = PK_MALLOC(sizeof(ComputeThreadJobCall)); + job->self = self; + job->eval_src = c11_strdup(eval_src); + job->args_data = c11_memdup(args_data, args_size); + job->args_size = args_size; + job->kwargs_data = c11_memdup(kwargs_data, kwargs_size); + job->kwargs_size = kwargs_size; + c11_ComputeThread__reset_job(self, job, ComputeThreadJobCall__dtor); + /**************************/ + self->is_done = false; + int res = thrd_create(&self->thread, ComputeThreadJob_call, job); + if(res != thrd_success) { + self->is_done = true; + return OSError("thrd_create() failed"); + } + py_newnone(py_retval()); + return true; +} + +static void pk_ComputeThread__register(py_Ref mod) { + py_Type type = py_newtype("ComputeThread", tp_object, mod, (py_Dtor)c11_ComputeThread__dtor); + + py_bindmagic(type, __new__, ComputeThread__new__); + py_bindmagic(type, __init__, ComputeThread__init__); + py_bindproperty(type, "is_done", ComputeThread_is_done, NULL); + py_bindmethod(type, "join", ComputeThread_join); + py_bindmethod(type, "last_error", ComputeThread_last_error); + py_bindmethod(type, "last_retval", ComputeThread_last_retval); + + py_bindmethod(type, "exec", ComputeThread_exec); + py_bindmethod(type, "eval", ComputeThread_eval); + py_bind(py_tpobject(type), "call(self, eval_src, *args, **kwargs)", ComputeThread_call); +} + void pk__add_module_pkpy() { py_Ref mod = py_newmodule("pkpy"); @@ -99,6 +404,10 @@ void pk__add_module_pkpy() { py_bindfunc(mod, "memory_usage", pkpy_memory_usage); py_bindfunc(mod, "is_user_defined_type", pkpy_is_user_defined_type); + + py_bindfunc(mod, "currentvm", pkpy_currentvm); + + pk_ComputeThread__register(mod); } #undef DEF_TVALUE_METHODS \ No newline at end of file diff --git a/src2/multi_vm_isolate.c b/src2/multi_vm_isolate.c new file mode 100644 index 00000000..de9d833f --- /dev/null +++ b/src2/multi_vm_isolate.c @@ -0,0 +1,49 @@ +#include "pocketpy.h" +#include "threads.h" +#include + +int run_huge_job_in_vm1(void* arg) { + py_switchvm(1); + bool ok = py_exec((const char*)arg, "", EXEC_MODE, NULL); + if(!ok) { + py_printexc(); + return 1; + } + return 0; +} + +int main() { + py_initialize(); + + bool ok = py_exec("print('Hello world from VM0!')", "", EXEC_MODE, NULL); + if(!ok) { + py_printexc(); + return 1; + } + + printf("main vm index: %d\n", py_currentvm()); + + char* job_string = + "import time\n" + "res = 0\n" + "time.sleep(3)\n" + "res = 100\n" + "print('Huge job done!')\n" + "print('Result:', res)\n"; + + thrd_t thread1; + thrd_create(&thread1, run_huge_job_in_vm1, job_string); + + for(int i = 0; i < 5; i++) { + thrd_sleep(&(struct timespec){.tv_sec = 1, .tv_nsec = 0}, NULL); + printf("main vm index: %d\n", py_currentvm()); + } + + int thrd_res; + thrd_join(thread1, &thrd_res); + printf("Thread result: %d\n", thrd_res); + + py_finalize(); + + return 0; +} \ No newline at end of file diff --git a/tests/98_lz4.py b/tests/72_lz4.py similarity index 100% rename from tests/98_lz4.py rename to tests/72_lz4.py diff --git a/tests/98_thread.py b/tests/98_thread.py new file mode 100644 index 00000000..1a9883b3 --- /dev/null +++ b/tests/98_thread.py @@ -0,0 +1,30 @@ +from pkpy import ComputeThread +import time + +thread_1 = ComputeThread(1) +thread_2 = ComputeThread(2) + +for t in [thread_1, thread_2]: + t.exec(''' +def func(a): + from pkpy import currentvm + print("Hello from thread", currentvm(), "a =", a) + for i in range(500000): + if i % 100000 == 0: + print(i, "from thread", currentvm()) + return a +''') + +thread_1.join() +thread_2.join() + +thread_1.call('func', [1, 2, 3]) +thread_2.call('func', [4, 5, 6]) + +while not thread_1.is_done or not thread_2.is_done: + print("Waiting for threads to finish...") + time.sleep(1) + +print("Thread 1 last return value:", thread_1.last_retval()) +print("Thread 2 last return value:", thread_2.last_retval()) +