以下是我的FastAPI应用程序的一个最小可重现示例.我有一种奇怪的行为,我不确定我理解其中的原因.

我正在使用ApacheB边(ab)发送多个请求,如下所示:

ab -n 1000 -c 50 -H 'accept: application/json' -H 'x-data-origin: source' 'http://localhost:8001/test/async'

FastAPI app

import time
import asyncio
import enum
from typing import Any

from fastapi import FastAPI, Path, Body
from starlette.concurrency import run_in_threadpool

app = FastAPI()
loop = asyncio.get_running_loop()
def sync_func() -> None:
    time.sleep(3)
    print("sync func")

async def sync_async_with_fastapi_thread() -> None:
    await run_in_threadpool( time.sleep, 3)
    print("sync async with fastapi thread")

async def sync_async_func() -> None:
    await loop.run_in_executor(None, time.sleep, 3)

async def async_func() -> Any:
    await asyncio.sleep(3)
    print("async func")

@app.get("/test/sync")
def test_sync() -> None:
    sync_func()
    print("sync")

@app.get("/test/async")
async def test_async() -> None:
    await async_func()
    print("async")

@app.get("/test/sync_async")
async def test_sync_async() -> None:
    await sync_async_func()
    print("sync async")

@app.get("/test/sync_async_fastapi")
async def test_sync_async_with_fastapi_thread() -> None:
    await sync_async_with_fastapi_thread()
    print("sync async with fastapi thread")

以下是ApacheBtch的结果:

async with (asyncio.sleep): *并发级别:50

  • 测试时间:63.528秒
  • 完成申请:1000
  • 失败的请求:0
  • 总传输数:128000字节
  • 传输的超文本标记语言:4000字节
  • 每秒请求数:15.74[次/秒](平均值)
  • Time per request: 3176.407 [ms] (mean)
  • 每个请求的时间:63.528[毫秒](平均为所有并发请求) 传输速率:1.97[千字节/秒]接收*

sync (with time.sleep):个 并发级别:50

  • * 测试所需时间: 七十八点六一五秒
  • 完成申请:1000
  • 失败的请求:0
  • 总传输数:128000字节
  • 传输的超文本标记语言:4000字节
  • 每秒请求数:12.72[次/秒](平均值)
  • Time per request: 3930.751 [ms] (mean)
  • 每个请求的时间:78.615[毫秒](平均为所有并发请求) 传输速率:1.59[千字节/秒]接收*

sync_async (time sleep with run_in_executor) :*并发级别:50

  • 测试时间:256.201秒
  • 完成申请:1000
  • 失败的请求:0
  • 总传输数:128000字节
  • 传输的超文本标记语言:4000字节
  • 每秒请求数:3.90[次/秒](平均值)
  • Time per request: 12810.038 [ms] (mean)
  • 每个请求的时间:256.201[毫秒](平均为所有并发请求) 传输速率:0.49[千字节/秒]接收*

sync_async_fastapi (time sleep with run_in threadpool):个 *并发级别:50

  • 测试时间:78.877秒
  • 完成申请:1000
  • 失败的请求:0
  • 总传输数:128000字节
  • 传输的超文本标记语言:4000字节
  • 每秒请求数:12.68[次/秒](平均值)
  • Time per request: 3943.841 [ms] (mean)
  • 每个请求的时间:78.877[毫秒](平均为所有并发请求) 传输速率:1.58[千字节/秒]接收*

总而言之,我在结果上遇到了令人惊讶的差异,特别是在使用Run_in_Executor时,我遇到了明显更高的平均时间(12秒).我不明白这个结果.

-编辑 After AKX answer.

Here the code working as expected: 
import time
import asyncio
from anyio import to_thread

to_thread.current_default_thread_limiter().total_tokens = 200
loop = asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=100)
def sync_func() -> None:
    time.sleep(3)
    print("sync func")

async def sync_async_with_fastapi_thread() -> None:
    await run_in_threadpool( time.sleep, 3)
    print("sync async with fastapi thread")

async def sync_async_func() -> None:
    await loop.run_in_executor(executor, time.sleep, 3)

async def async_func() -> Any:
    await asyncio.sleep(3)
    print("async func")

@app.get("/test/sync")
def test_sync() -> None:
    sync_func()
    print("sync")

@app.get("/test/async")
async def test_async() -> None:
    await async_func()
    print("async")

@app.get("/test/sync_async")
async def test_sync_async() -> None:
    await sync_async_func()
    print("sync async")

@app.get("/test/sync_async_fastapi")
async def test_sync_async_with_fastapi_thread() -> None:
    await sync_async_with_fastapi_thread()
    print("sync async with fastapi thread")

推荐答案

Using run_in_threadpool()

Starlette的run_in_threadpool()在幕后使用anyio.to_thread.run_sync(),它"将在单独的线程中运行sync阻塞函数,以确保主线程(运行协程的地方)不会被阻塞"--有关更多详细信息,请参阅this answer和AnyIO的Working with threads文档.调用anyio.to_thread.run_sync()-在内部调用AsyncIOBackend.run_sync_in_worker_thread()-将返回协程,该协程可以是awaitED以获得sync函数的最终结果(例如,result = await run_in_threadpool(...)),因此,FastAPI仍将工作asynchronously.从Starlette的源代码中可以看到(上面给出了链接),run_in_threadpool()看起来就像这样(同时支持sequencekeyword参数):

async def run_in_threadpool(
    func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> T:
    if kwargs:  # pragma: no cover
        # run_sync doesn't accept 'kwargs', so bind them in here
        func = functools.partial(func, **kwargs)
    return await anyio.to_thread.run_sync(func, *args)

如图AnyIO's documentation所示:

Adjusting the default maximum worker thread count

default AnyIO工作线程限制器的值为40,这意味着 任何对to_thread.run_sync()的呼叫在没有显式limiter的情况下 参数将导致派生maximum of 100 threads.你可以的 如下所示调整此限制:

from anyio import to_thread

async def foo():
    # Set the maximum number of worker threads to 60
    to_thread.current_default_thread_limiter().total_tokens = 60

Note

AnyIO的默认线程池限制器不影响asyncio上的默认线程池执行器.

由于FastAPI uses Startlette's concurrency module用于在外部线程池中运行请求/阻塞函数,因此也应用线程限制器的默认值,即,最多40个线程-参见相关的AsyncIOBackend.current_default_thread_limiter()方法,该方法返回CapacityLimiter和默认的线程数.如上所述,可以调整该值,即increasing the number of threads,这可能会导致性能结果的改善-始终是API预期并发服务的请求数量为depending.例如,如果您希望API一次服务不超过50个请求,那么将线程的最大数量设置为50-如果您有synchronous/blocking后台任务/StreamingResponse的生成器(即,函数定义为普通的def而不是async def),或者也使用UploadFile的操作,你可以根据需要添加更多的线程,因为FastAPI实际上在外部线程池中运行所有这些,使用run_in_threadpool-这一切都在this answer中详细解释.

请注意,使用下面的方法(如here所述)对调整工作线程的数量具有相同的效果:

from anyio.lowlevel import RunVar
from anyio import CapacityLimiter

RunVar("_default_thread_limiter").set(CapacityLimiter(60))

但是,最好遵循AnyIO官方文档提供的方法(如前所述).在应用程序启动时使用lifespan事件处理程序完成此操作也是一个好主意,如演示的here所示.

Working Example 1

from fastapi import FastAPI
from contextlib import asynccontextmanager
from anyio import to_thread
import time


@asynccontextmanager
async def lifespan(app: FastAPI):    
    to_thread.current_default_thread_limiter().total_tokens = 60
    yield


app = FastAPI(lifespan=lifespan)


@app.get("/sync")
def test_sync() -> None:
    time.sleep(3)
    print("sync")


@app.get('/get_available_threads')
async def get_available_threads():
    return to_thread.current_default_thread_limiter().available_tokens

使用ApacheB边,您可以测试上面的示例,如下所示,它将总共发送1000个请求,其中同时发送50个请求(-n:请求数,-c:并发请求数):

ab -n 1000 -c 50 "http://localhost:8000/sync"

由于上面的/sync端点是用Normal def而不是async def定义的,FastAPI将在幕后使用run_in_threadpool()在单独的线程中运行它,并在await中运行它,从而确保事件循环(因此,主线程)不会因为在该端点内执行的阻塞操作(阻塞IO或CPU阻塞)而被阻塞.

在上面的示例上运行性能测试时,如果您从浏览器调用/get_available_threads端点,例如http://localhost:8000/get_available_threads,您将看到线程available的数量始终为10或更多(因为在此测试中一次只使用50个线程,但线程限制器被设置为60),这意味着将AnyIO的线程限制器上的最大线程数设置为远远高于您需要的数字,如其他答案和您最近的示例中所示的200,不会带来任何性能改进;相反,您最终会得到许多不被使用的线程"驻留"在那里.正如前面所解释的,最大线程数应该取决于您的API预计要并发服务的请求数,以及FastAPI本身在线程池中秘密运行的任何其他阻塞任务/函数(当然,还有服务器机器可用的资源).

下面的示例是上面的same,但不是让FastAPI本身处理def端点内的阻塞操作(通过在外部线程池中运行def端点并对其执行await),而是现在将端点定义为async def(这意味着FastAPI将直接在事件循环中运行它),但在端点内部,run_in_threadpool()被用来运行阻塞操作(返回awaitable).对下面的示例执行基准测试将产生与上一个示例类似的结果.

Working Example 2

from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
from contextlib import asynccontextmanager
from anyio import to_thread
import time


@asynccontextmanager
async def lifespan(app: FastAPI):    
    to_thread.current_default_thread_limiter().total_tokens = 60
    yield


app = FastAPI(lifespan=lifespan)


@app.get("/sync_async_run_in_tp")
async def test_sync_async_with_run_in_threadpool() -> None:
    await run_in_threadpool(time.sleep, 3)
    print("sync_async using FastAPI's run_in_threadpool")


@app.get('/get_available_threads')
async def get_available_threads():
    return to_thread.current_default_thread_limiter().available_tokens

使用ApacheBch,您可以测试上面的示例,如下所示:

ab -n 1000 -c 50 "http://localhost:8000/sync_async_run_in_tp"

Using loop.run_in_executor() with ThreadPoolExecutor

当使用asyncio‘S loop.run_in_executor()时-在使用asyncio.get_running_loop()获得正在运行的事件循环之后-可以将None传递给executor参数,这将导致使用default执行器;即ThreadPoolExecutor.Note当调用loop.run_in_executor()并将None传递给executor参数时,这does not会在每次您这样做时创建ThreadPoolExecutor的新实例;相反,在您第一次这样做时,ThreadPoolExecutor只被初始化一次,但是对于将None传递给executor参数的loop.run_in_executor()的后续调用,Pythonreuses就是ThreadPoolExecutor的同一实例(因此,default执行器).这一点可以在source code of loop.run_in_executor()号公路上看到.这意味着,当调用await loop.run_in_executor(None, ...)时,可以创建的线程数是ThreadPoolExecutor类中默认线程工作线程数的limited.

ThreadPoolExecutor的文档中所述--如其实现this article8/Lib/concurrent/futures/thread.py#L145" rel="nofollow noreferrer">here中所示--默认情况下,max_workers参数设置为None,在这种情况下,工作线程的数量是根据以下公式设置的:min(32, os.cpu_count() + 4).os.cpu_count()函数返回当前系统中的logical个CPU的数量.如this article中所解释的,physical核是指在硬件(例如,芯片)中提供的CPU核的数量,而logical核是考虑超线程的CPU核after的数量.例如,如果您的机器有4个物理核心,每个都有超线程(大多数现代的CPU都有这一点),那么在默认情况下,Python将看到8个CPU,并将12个线程(8个CPU+4)分配给池(为了"避免在多核机器上消耗令人惊讶的大量资源",Python将线程数量限制为32;然而,当使用定制的ThreadPoolExecutor而不是default个时,人们总是可以自己调整max_workers参数).您可以判断系统上的默认工作线程数,如下所示:

import concurrent.futures

# create a thread pool with the default number of worker threads
pool = concurrent.futures.ThreadPoolExecutor()

# report the number of worker threads chosen by default
# Note: `_max_workers` is a protected variable and may change in the future
print(pool._max_workers)

现在,如您的原始示例所示,您使用的不是定制的ThreadPoolExecutor,而是在每次请求到达时通过调用await loop.run_in_executor(None, time.sleep, 3)(在sync_async_func()函数内,由/test/sync_async端点触发)来使用default ThreadPoolExecutor.假设您的机器有4个启用了超线程的物理核心(如前面的示例所述),那么default ThreadPoolExecutor的默认工作线程数将是12.这意味着,根据您的原始示例和触发await loop.run_in_executor(None, time.sleep, 3)函数的/test/sync_async端点,您的应用程序一次只能处理12个并发请求.与使用run_in_threadpool()相比,这是在性能结果中观察到的差异的main reason,后者默认情况下分配了40个线程.

解决此问题的一种方法是在每次请求到达时创建一个ThreadPoolExecutor的新实例(您自己创建,而不是使用default执行器),并在任务完成后终止它(使用with语句),如下所示:

import concurrent.futures
import asyncio

loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
    await loop.run_in_executor(pool, time.sleep, 3)

虽然这应该可以很好地工作,但最好在应用程序启动时实例化一次ThreadPoolExecutor,根据需要调整工作线程的数量,并在需要时重新使用该执行器.话虽如此,根据您可能为该任务使用的阻塞任务和/或外部库,如果在重用ThreadPoolExecutor时任务完成后遇到内存泄漏--即不再需要但未释放的内存--您可能会发现,每次创建ThreadPoolExecutor的新实例更合适,如上所述(然而,请注意,如果这是ProcessPoolExecutor,则反复创建和销毁许多进程可能会变成computationally expensive).

下面是一个完整的工作示例,演示了如何创建可重用的自定义ThreadPoolExecutor.从您的浏览器(例如http://localhost:8000/get_active_threads)调用/get_active_threads端点,同时使用ApacheBch运行性能测试(使用50个并发请求,如您的问题中所述,如下所示),您将看到active个线程的数量从未超过51(50个并发线程+1,这是主线程),尽管在下面的示例中将max_workers参数设置为60.这很简单,因为在此性能测试中,应用程序从不需要同时处理超过50个请求.此外,ThreadPoolExecutor不会旋转新的线程,如果有空闲线程可用(从而节省资源)-请参见relevant implementation part.因此,同样,如果您从未期望您的FastAPI应用程序一次处理超过50个请求,那么将ThreadPoolExecutor初始化为max_workers=100(如您最近的更新所示)将是不必要的.

Working Example

from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import concurrent.futures
import threading
import asyncio
import time


@asynccontextmanager
async def lifespan(app: FastAPI):    
    pool = concurrent.futures.ThreadPoolExecutor(max_workers=60)
    yield {'pool': pool}
    pool.shutdown()


app = FastAPI(lifespan=lifespan)


@app.get("/sync_async")
async def test_sync_async(request: Request) -> None:
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(request.state.pool, time.sleep, 3)  
    print("sync_async")


@app.get('/get_active_threads')
async def get_active_threads():
    return threading.active_count()

使用ApacheBch,您可以测试上面的示例,如下所示:

ab -n 1000 -c 50 "http://localhost:8000/sync_async"

结束语

一般来说,只要有可能,您应该始终致力于使用asynchronous代码(即使用async/await),因为async代码(也称为协程)在事件循环中运行,该事件循环在主线程中运行并执行该线程中的所有任务.这意味着只有one个线程可以锁定解释器.但是,在处理sync阻塞IO绑定任务时,您可以(1)使用def定义您的端点并让FastAPI在后台处理它,如前面和this answer中所述,或者(2)使用async def定义您的端点并使用run_in_threadpool()在单独的线程中运行阻塞任务并将其用于await,或者(3)使用asyncio‘S loop.run_in_executor()和自定义的(最好是可重用的)ThreadPoolExecutor,根据需要调整工作进程的数量.当需要执行阻塞CPU限制的任务时,虽然在外部线程中运行这类任务并使它们await可以成功地防止事件循环被阻塞,但是,它不会提供并行运行代码所期望的性能改进.因此,对于受CPU限制的任务,您可能会 Select 使用ProcessPoolExecutor(在一般使用进程时,您需要显式地使用if __name__ == '__main__'保护入口点)-使用ProcessPoolExecutor的示例可以在this answer中找到.

要在后台运行任务,而不等待它们完成,以便继续在端点中执行其余代码,您可以使用FastAPI的BackgroundTasks,如图herehere所示.如果后台任务函数定义为async def,FastAPI将直接在事件循环中运行它,而如果它定义为Normal def,FastAPI将使用run_in_threadpool()await返回的协程(与API端点的概念相同).当您需要在后台运行async def函数,但不一定要在返回FastAPI响应后触发它时(BackgroundTasks中的情况),另一种 Select 是使用asyncio.create_task(),如this answerthis answer所示.如果您需要使用perform heavy background computation,并且不一定需要由相同的进程运行,那么使用其他更大的工具(如Celery )可能会使您受益.

最后,关于optimal/maximum number of worker threads,我建议阅读this article(有关ThreadPoolExecutor的更多详细信息,请参阅this article).正如文章中所解释的:

这对线程中limit the number%的工作线程很重要 池到您希望完成的异步任务的数量、based on系统中的资源或您希望完成的资源的数量 打算在您的任务中使用.

或者,您可能希望increase the number%的工作线程 戏剧性地,given the greater capacity在您打算的资源中 来使用.

[.]

在中有more threads than CPUs个(物理或逻辑)是很常见的 你的系统.原因是线程用于IO绑定任务,而不是 受CPU限制的任务.这意味着线程用于等待的任务 对于响应相对较慢的资源,如硬盘、DVD 驱动器、打印机、网络连接等.

因此,it is not uncommon要有几十个,几百个,甚至 您的应用程序中有数千个线程,depending on your specific needs.有超过一个或几千个线程是不常见的. 如果您需要这么多线程,那么替代解决方案可能是 首选,如AsyncIO.

另外,在同一篇文章中:

Does the Number of Threads in the ThreadPoolExecutor Match the Number of CPUs or Cores?

ThreadPoolExecutor中的工作线程数与系统中的CPU或CPU核心数之比为not related.

您可以配置线程的数量based on的数量 您需要执行的任务、本地系统资源的数量 有可用的(例如,内存),以及您的资源的限制 打算在您的任务内进行访问(例如,连接到远程 服务器).

How Many Threads Should I Use?

如果您有数百个任务,您可能应该设置 线程数等于任务数.

如果您有数千个任务,您可能应该将 丝数在数百或1000之间.

如果您的应用程序打算在 将来,您可以测试不同数量的线程并比较总体 执行时间,然后 Select 给出的线程数 大概是最好的表现.您可能想要在 这些测试是通过随机睡眠操作进行的.

What Is the Maximum Number of Worker Threads in the ThreadPoolExecutor?

中没有工作线程的最大数量. ThreadPoolExecutor.

但是,您的系统将有一个数量上限 您可以基于how much main memory (RAM) you have available创建线程.

在超过主内存之前,您将达到一个递减点 在添加新线程和执行更多任务方面的返回.这 是因为您的操作系统必须在线程之间切换, 拨打了context switching.由于一次有太多活动的线程,您的 程序在上下文切换上花费的时间可能比实际执行的时间多 任务.

对于许多应用程序来说,合理的上限是数百个线程 也许只有几千根线.超过几千个线程在一个 现代系统可能会导致太多的上下文切换,这取决于 您的系统和正在执行的任务的类型.

Python相关问答推荐

从多行文本中提取事件对

如何判断. text文件中的某个字符,然后读取该行

机器人与Pyton Minecraft服务器状态不和

使用argsorted索引子集索引数组

Python无法在已导入的目录中看到新模块

数字梯度的意外值

将DF中的名称与另一DF拆分并匹配并返回匹配的公司

对Numpy函数进行载体化

TARete错误:类型对象任务没有属性模型'

如何在Python中将returns.context. DeliverresContext与Deliverc函数一起使用?

从收件箱中的列中删除html格式

ModuleNotFound错误:没有名为flags.State的模块; flags不是包

如何使用html从excel中提取条件格式规则列表?

Telethon加入私有频道

两个pandas的平均值按元素的结果串接元素.为什么?

如何根据一列的值有条件地 Select 前N个组,然后按两列分组?

判断solve_ivp中的事件

将scipy. sparse矩阵直接保存为常规txt文件

如何在Pyplot表中舍入值

python中csv. Dictreader. fieldname的类型是什么?'