我对多处理技术不是很有经验,但是今天我试着做一个小的http + api服务器,当我注意到我必须多次按下登录按钮才能进入我的帐户时,我试图在没有http. server的情况下重新创建这个问题,但是我做不到,所以这里是我代码的最小版本. 跳过的项目在队列中加起来,但是get()忽略了它们,我试着用它与block = false一起使用,但我得到了相同的结果.

from multiprocessing import Queue
import time
from uuid import uuid4
import threading
import requests
from http.server import SimpleHTTPRequestHandler
import socketserver
#Simulate request
def test():
    while input()!='q':
        try:requests.get('http://127.0.0.1:55467')
        except:pass
class database:
    def __init__(self):
        self.queue=Queue()
        self.elements={}
        threading.Thread(target=self.db_updater,daemon=True).start()
    #The function called in api calls
    def create(self,name,data):self.queue.put([name,data])
    #The loop that is supposed to update the database
    def db_updater(self):
        def create_element(element_name,data):self.elements[element_name]=data
        while True:
            change=self.queue.get()
            print('update received',change,'\nqueue length',self.queue.qsize())
            create_element(*change)
if __name__ == '__main__':
    db=database()
    threading.Thread(target=test,daemon=True).start()
    class MyHttpRequestHandler(SimpleHTTPRequestHandler):
        def log_message(*args):pass
        def do_POST(self):pass
        def do_GET(self):
            print('request received')
            db.create('testuser',{'token':str(uuid4()),'expire':int(time.time())+7200})
            self.send_response(200)
            self.end_headers()
    with socketserver.ForkingTCPServer(("127.0.0.1",55467),MyHttpRequestHandler) as httpd:httpd.serve_forever()

输出如下所示:

request received
update received ['testuser', {'token': '86520f1e-524f-4587-8e4f-b826a0b14012', 'expire': 1712054294}] 
queue length 1

request received

request received
update received ['testuser', {'token': 'a1c2a5e6-9390-4f31-a25a-df5e36f184ee', 'expire': 1712054295}] 
queue length 2

request received
update received ['testuser', {'token': '08eb6fb4-5f4e-4df3-af48-28e854249c3e', 'expire': 1712054295}] 
queue length 2

request received
update received ['testuser', {'token': 'dc3128ac-98d1-409b-9ca3-8c32f311b180', 'expire': 1712054295}] 
queue length 2

request received
update received ['testuser', {'token': '55cc39a9-74cd-4e8c-b1fd-8db11e0b07c1', 'expire': 1712054296}] 
queue length 2

request received

request received
update received ['testuser', {'token': 'a9779e1c-4a8e-45ee-8223-6022e94f67be', 'expire': 1712054296}] 
queue length 3

request received
update received ['testuser', {'token': '9f8e2ab4-3c2e-4c90-ae94-be0e946e1ca6', 'expire': 1712054296}] 
queue length 3

request received

request received
update received ['testuser', {'token': '6a0835c5-8568-4e53-bb9c-ec8ac9f134c0', 'expire': 1712054297}] 
queue length 4

推荐答案

你刚刚在CPython中发现了一个bug...连续Forking 会 destruct 队列上的互斥体,因为Python在Forking 后必须解锁所有互斥体.你能用Manager.Queue吗?它比较慢,但没有这个问题.

from multiprocessing import Manager

...

self.manager = Manager()
self.queue = self.manager.Queue()

如果可能的话,最好的替代方案是将消费者database放在自己的multiprocessing.Process中(而不是线程),这可以保护消费者免受整个互斥体损坏问题的影响.

否则,您需要避免使用ForkingServer,或使用其他替代multiprocessing.Queue,如消息代理.

最后一种 Select 是实现你自己的队列,在读取器端没有互斥锁,因为只有一个读取器存在.

Python相关问答推荐

类型错误:输入类型不支持ufuncisnan-在执行Mann-Whitney U测试时[SOLVED]

在Python Attrs包中,如何在field_Transformer函数中添加字段?

如何过滤包含2个指定子字符串的收件箱列名?

如何从.cgi网站刮一张表到rame?

无法定位元素错误404

如何请求使用Python将文件下载到带有登录名的门户网站?

如何创建一个缓冲区周围的一行与manim?

Django REST Framework:无法正确地将值注释到多对多模型,不断得到错误字段名称字段对模型无效'<><>

为一个组的每个子组绘制,

在Python 3中,如何让客户端打开一个套接字到服务器,发送一行JSON编码的数据,读回一行JSON编码的数据,然后继续?

在不同的帧B中判断帧A中的子字符串,每个帧的大小不同

在Google Drive中获取特定文件夹内的FolderID和文件夹名称

BeautifulSoup:超过24个字符(从a到z)的迭代失败:降低了首次深入了解数据集的复杂性:

如果有2个或3个,则从pandas列中删除空格

GPT python SDK引入了大量开销/错误超时

按条件添加小计列

当输入是字典时,`pandas. concat`如何工作?

我怎么才能用拉夫分拣呢?

Django更新视图未更新

try 在单个WITH_COLUMNS_SEQ操作中链接表达式时,使用Polars数据帧时出现ComputeError