我有下面的程序,使用Python C API.它创建了许多线程(NUM_THREADS
个常数).在每个线程中都有一个无限循环,它执行一个非常简单的操作:创建一个Python字典,键id
设置为线程id,然后将字典转储成字符串并打印出来(使用json
Python模块中的dumps
函数).然后线程等待WAIT_TIME
秒并再次执行相同的操作.
// g++ -g -o multithread multithread.cpp -I/usr/include/python3.11/ -lpython3.11 -lpthread
#include <Python.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
// WAIT_TIME is in seconds
#define NUM_THREADS 20
#define WAIT_TIME 1
// Global semaphore declaration
sem_t semaphore;
// Global JSON module object, to be accesses in every thread
PyObject* jsonModule;
// Function to be executed by each thread
void* thread_function(void* arg) {
long thread_id = (long)arg;
while(true) {
sem_wait(&semaphore); // mark 1
PyObject* myDict = Py_BuildValue("{s:i}", "id", thread_id);
PyObject* result = PyObject_CallMethod(jsonModule, "dumps", "O", myDict);
PyObject* repr = PyObject_Repr(result);
const char* result_str = PyUnicode_AsUTF8(repr);
printf("Thread %ld result: %s\n", thread_id, result_str);
Py_XDECREF(result);
Py_XDECREF(myDict);
Py_XDECREF(repr);
sem_post(&semaphore); // mark 2
sleep(WAIT_TIME);
}
pthread_exit(NULL);
}
int main() {
pthread_t threads[NUM_THREADS];
int i;
// Initialize the Python interpreter
Py_Initialize();
// Import json module
jsonModule = PyImport_ImportModule("json");
// Initialize the semaphore
sem_init(&semaphore, 0, 1);
// Create threads
for (i = 0; i < NUM_THREADS; ++i) {
if (pthread_create(&threads[i], NULL, thread_function, (void*)(long)i) != 0) {
fprintf(stderr, "Error creating thread\n");
return 1;
}
}
// Join threads
for (i = 0; i < NUM_THREADS; ++i) {
if (pthread_join(threads[i], NULL) != 0) {
fprintf(stderr, "Error joining thread\n");
return 1;
}
}
// Free resources (never reach this point, but added for simmetry)
Py_XDECREF(jsonModule);
// Finalize the Python interpreter
Py_Finalize();
// Destroy the semaphore
sem_destroy(&semaphore);
printf("All threads have completed\n");
return 0;
}
根据我的经验,只要在开始调用Py*函数之前获取信号量,程序就可以工作.换句话说,只要使用mark 1
点和mark 2
点中的线.
如果我删除了mark 1
和mark 2
语句(因此删除了信号量基排除),那么程序最终很快崩溃.查看生成的core
文件的回溯,似乎问题出在PyObject_CallMethod()
函数的调用中.
(gdb) bt
#0 0x00007fb315289c19 in ?? () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#1 0x00007fb31526aac6 in ?? () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#2 0x00007fb31517d80b in ?? () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#3 0x00007fb31517ddd9 in PyObject_CallMethod () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#4 0x000055e1a763f2ef in thread_function (arg=0x11) at multithread.cpp:24
#5 0x00007fb314ea8134 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#6 0x00007fb314f287dc in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
这有点令人惊讶,因为所有的PyObject*
个变量都是线程函数(myDict
、result
和repr
)的局部变量.线程唯一非本地的PyObject*
变量是模块本身的变量(jsonModule
).是那个引起问题的人吗?
这是否意味着Python C库不是线程安全的,所以不能同时运行一个以上的Py * 函数?有没有其他替代品我已经使用的(即,信号量在我自己的代码中实现)?对于这类程序有什么好的实现模式(即使用Python C API的多线程)?
提前感谢!