我在一个停靠容器中运行FastApi,该容器有8个可用的CPU,我想通过python多处理来利用它们.

在docker容器外运行我的程序,一切都像我预期的那样工作(见下文),但是当我构建和运行容器时,子进程不会启动.

下面是我的应用程序中的相关代码

import multiprocessing
from multiprocessing import Process, Manager, Queue
from contextlib import asynccontextmanager
from fastapi import FastAPI

CLIENT_CORES = 5
connection = "mongo_connection_string"

@asynccontextmanager
async def lifespan(app: FastAPI):
    
    manager = multiprocessing.Manager()

    global dbWorkers
    global clientWorkers

    global clientQueue
    global clientResponse

    

    #resources
    dbQueue = Queue()
    clientQueue = Queue()
    clientResponse = manager.dict()
    psetObjects = manager.dict()
    dbWorkers = []
    clientWorkers = []

    #start client and db workers
    config_processes(dbQueue,clientQueue,clientResponse,psetObjects)

    yield
    
    graceful_shutdown()

def config_processes(dbQueue,clientQueue,clientResponse,psetObjects):

    #db
    start_db_workers(dbQueue,psetObjects)

    #client
    start_client_workers(clientQueue,psetObjects,dbQueue,clientResponse)


def start_db_workers(dbQueue,psetObjects):
    print("Building db Workers")

    for p in range(DB_PROCS):
        dbworker = Process(target=db_worker,args=(connection,dbQueue,psetObjects,p))
        dbWorkers.append(dbworker)
        dbworker.start()

    print("db Pool of {} db Worker(s) Built -OK".format(DB_PROCS))


def start_client_workers(clientQueue,psetObjects,dbQueue,clientResponse):
    print("Building client Workers")
    print("CLIENT CORES: {}".format(CLIENT_CORES))
    
    for p in range(CLIENT_CORES):
        clientWorker = Process(target=client_worker,args=(clientQueue,psetObjects,dbQueue,clientResponse,p),name="client-worker-{}".format(p))
        clientWorkers.append(clientWorker)
        clientWorker.start()
    
    print("Client Pool of {} Worker(s) Built -OK".format(len(clientWorkers)))

    
def db_worker(connection,dbQueue,pset_objects,n):
    dbworker = DbWorker(connection,dbQueue,pset_objects,n)
    print("built dbworker {}".format(n))
    dbworker.listen()

def client_worker(clientQueue,psetObjects,dbQueue,clientResponse,p):
    clientworker = ClientWorker(clientQueue,psetObjects,dbQueue,clientResponse,p)
    print("built client worker {}".format(p))
    clientworker.listen()

在Docker中运行时,日志(log)显示:

INFO:     Started server process [7]
INFO:     Waiting for application startup.
CPUS: 8
Building db Workers
db Pool of 1 db Worker(s) Built -OK
Building client Workers
CLIENT CORES: 5
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:80 (Press CTRL+C to quit)

也就是说,我们永远不会看到client_worker()db_worker()目标永远不会被调用,start_client_workers()永远不会完成.直接使用uvicorn运行我得到:

CPUS: 8
INFO:     Started server process [40230]
INFO:     Waiting for application startup.
Building db Workers
db Pool of 1 db Worker(s) Built -OK
Building client Workers
CLIENT CORES: 5
Client Pool of 5 Worker(s) Built -OK
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
built client worker 4
built client worker 2
built client worker 1
built client worker 0
built client worker 3
built dbworker 0

我注意到后一个例子中服务器进程的数量非常荒谬,我想知道我是否在try 使用FastApi的多处理时做错了什么-尽管我理解,例如,从here开始,这是(或应该是)好的,并且在任何情况下,我的问题是Docker与外部行为之间的差异.

有没有办法用这种方法获得我想要的集装箱化行为,或者我应该寻求重构以使用Gunicorn、 docker 群或类似的方法?如果是后者,你能告诉我怎么做才好吗?谢谢.

-根据@datawookie的答复编辑2024年2月12日

我已经(准确地)在一个新的环境中实现了下面给出的答案,非常奇怪的是,我看到了以下内容:

INFO:     Started server process [1]
INFO:     Waiting for application startup.
Building db Workers
db Pool of 2 db Worker(s) Built -OK
Building client Workers
CLIENT CORES: 5
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
built client worker 0
built dbworker 1
built client worker 1
built dbworker 0
built client worker 3
built client worker 4
built client worker 2

值得注意的是,尽管我看到了从client_worker()开始的所有打印语句,但我没有看到对print(f"Client Pool of {len(clientWorkers)} Worker(s) Built -OK")的最终调用,这在@datawookie的S的回答中很明显.这与我在自己的应用程序环境中看到的行为相呼应--即最后一个Process.start()调用似乎没有完成--更奇怪的是,无论我为CLIENT_CORES Select 什么值,都会发生这种情况--对Process.start()的最后一个调用总是不会返回.

当我添加应用程序的完整逻辑(如最初的问题所示,构建一个ClientWorker对象并调用.listen()方法)时,事情变得更加奇怪.在本例中,输出为:

INFO:     Started server process [7]
INFO:     Waiting for application startup.
CPUS: 8
Building db Workers
db Pool of 1 db Worker(s) Built -OK
Building client Workers
CLIENT CORES: 5
built dbworker 0
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:80 (Press CTRL+C to quit)

但如果我像这样注释掉.listen()电话:

def client_worker(clientQueue,psetObjects,dbQueue,clientResponse,p):
    clientworker = ClientWorker(clientQueue,psetObjects,dbQueue,clientResponse,p)
    print("built client worker {}".format(p))
    # clientworker.listen()

我明白了:

INFO:     Started server process [6]
INFO:     Waiting for application startup.
CPUS: 8
Building db Workers
db Pool of 1 db Worker(s) Built -OK
Building client Workers
CLIENT CORES: 5
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:80 (Press CTRL+C to quit)
built client worker 0
built dbworker 0
built client worker 2
built client worker 1
built client worker 3
built client worker 4

为清楚起见,ClientWorker.listen()的形式如下:

def listen(self):
    print("started")
    while self.stay_alive == True:
        try:
            args = self.clientQueue.get(block=True)
            #do_work(args)
        except:
           #report_error()

再说一次,在uvicorn中一切都很好,请谁来解释一下 docker 环境中发生了什么?!

推荐答案

这里的问题是打印缓冲.下面是代码的工作版本.

GB Dockerfile

FROM python:3.11.4-slim

ENV PYTHONUNBUFFERED=1

COPY requirements.txt .

RUN pip3 install -r requirements.txt

COPY main.py .

CMD ["uvicorn", "main:app"]

增加的关键字是ENV PYTHONUNBUFFERED=1,这将强制立即打印.

GB requirements.txt

fastapi==0.109.2
uvicorn==0.27.1

GB main.py

import multiprocessing
from multiprocessing import Process, Queue, Manager
from fastapi import FastAPI

CLIENT_CORES = 5
DB_PROCS = 2
connection = "mongo_connection_string"

app = FastAPI()

def start_lifespan_resources():
    manager = Manager()

    global dbWorkers
    global clientWorkers
    global clientQueue
    global clientResponse

    dbQueue = Queue()
    clientQueue = Queue()
    clientResponse = manager.dict()
    psetObjects = manager.dict()
    dbWorkers = []
    clientWorkers = []

    config_processes(dbQueue, clientQueue, clientResponse, psetObjects)

def stop_lifespan_resources():
    for worker in dbWorkers + clientWorkers:
        worker.terminate()
        worker.join()

    print("All workers terminated.")

def config_processes(dbQueue, clientQueue, clientResponse, psetObjects):
    start_db_workers(dbQueue, psetObjects)
    start_client_workers(clientQueue, psetObjects, dbQueue, clientResponse)

def start_db_workers(dbQueue, psetObjects):
    print("Building db Workers")
    for p in range(DB_PROCS):
        dbworker = Process(target=db_worker, args=(connection, dbQueue, psetObjects, p))
        dbWorkers.append(dbworker)
        dbworker.start()
    print(f"db Pool of {DB_PROCS} db Worker(s) Built -OK")

def start_client_workers(clientQueue, psetObjects, dbQueue, clientResponse):
    print("Building client Workers")
    print(f"CLIENT CORES: {CLIENT_CORES}")
    for p in range(CLIENT_CORES):
        clientWorker = Process(target=client_worker, args=(clientQueue, psetObjects, dbQueue, clientResponse, p), name=f"client-worker-{p}")
        clientWorkers.append(clientWorker)
        clientWorker.start()
    print(f"Client Pool of {len(clientWorkers)} Worker(s) Built -OK")

def db_worker(connection, dbQueue, pset_objects, n):
    print(f"built dbworker {n}")

def client_worker(clientQueue, psetObjects, dbQueue, clientResponse, p):
    print(f"built client worker {p}")

@app.on_event("startup")
def startup_event():
    start_lifespan_resources()

@app.on_event("shutdown")
def shutdown_event():
    stop_lifespan_resources()

下面的屏幕截图显示了构建和启动然后停止一个容器的图像.我从另一个终端运行docker stop,从而优雅地终止了该进程.所有进程都在Docker中启动.

enter image description here

Python相关问答推荐

Python多处理:当我在一个巨大的pandas数据框架上启动许多进程时,程序就会陷入困境

如何标记Spacy中不包含特定符号的单词?

Vectorize多个头寸的止盈/止盈回溯测试pythonpandas

将pandas Dataframe转换为3D numpy矩阵

如何获取numpy数组的特定索引值?

ThreadPoolExecutor和单个线程的超时

Odoo 16使用NTFS使字段只读

Scrapy和Great Expectations(great_expectations)—不合作

如何使用Numpy. stracards重新编写滚动和?

在pandas数据框中计算相对体积比指标,并添加指标值作为新列

在pandas/python中计数嵌套类别

提高算法效率的策略?

Python pint将1/华氏度转换为1/摄氏度°°

语法错误:文档. evaluate:表达式不是合法表达式

freq = inject在pandas中做了什么?''它与freq = D有什么不同?''

Python日志(log)库如何有效地获取lineno和funcName?

Pythonquests.get(Url)返回Colab中的空内容

关于数字S种子序列内部工作原理的困惑

用LAKEF划分实木地板AWS Wrangler

Fake pathlib.使用pyfakefs的类变量中的路径'