Starlette的run_in_threadpool()
在幕后使用anyio.to_thread.run_sync()
,它"将在单独的线程中运行sync阻塞函数,以确保主线程(运行协程的地方)不会被阻塞"--有关更多详细信息,请参阅this answer和AnyIO的Working with threads文档.调用anyio.to_thread.run_sync()
-在内部调用AsyncIOBackend.run_sync_in_worker_thread()
-将返回协程,该协程可以是await
ED以获得sync函数的最终结果(例如,result = await run_in_threadpool(...)
),因此,FastAPI仍将工作asynchronously.从Starlette的源代码中可以看到(上面给出了链接),run_in_threadpool()
看起来就像这样(同时支持sequence和keyword参数):
async def run_in_threadpool(
func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> T:
if kwargs: # pragma: no cover
# run_sync doesn't accept 'kwargs', so bind them in here
func = functools.partial(func, **kwargs)
return await anyio.to_thread.run_sync(func, *args)
如图AnyIO's documentation所示:
Adjusting the default maximum worker thread count
default AnyIO工作线程限制器的值为40
,这意味着
任何对to_thread.run_sync()
的呼叫在没有显式limiter
的情况下
参数将导致派生maximum of 100 threads.你可以的
如下所示调整此限制:
from anyio import to_thread
async def foo():
# Set the maximum number of worker threads to 60
to_thread.current_default_thread_limiter().total_tokens = 60
Note
AnyIO的默认线程池限制器不影响asyncio
上的默认线程池执行器.
由于FastAPI uses Startlette's concurrency
module用于在外部线程池中运行请求/阻塞函数,因此也应用线程限制器的默认值,即,最多40
个线程-参见相关的AsyncIOBackend.current_default_thread_limiter()
方法,该方法返回CapacityLimiter
和默认的线程数.如上所述,可以调整该值,即increasing the number of threads,这可能会导致性能结果的改善-始终是API预期并发服务的请求数量为depending.例如,如果您希望API一次服务不超过50个请求,那么将线程的最大数量设置为50-如果您有synchronous/blocking后台任务/StreamingResponse
的生成器(即,函数定义为普通的def
而不是async def
),或者也使用UploadFile
的操作,你可以根据需要添加更多的线程,因为FastAPI实际上在外部线程池中运行所有这些,使用run_in_threadpool
-这一切都在this answer中详细解释.
请注意,使用下面的方法(如here所述)对调整工作线程的数量具有相同的效果:
from anyio.lowlevel import RunVar
from anyio import CapacityLimiter
RunVar("_default_thread_limiter").set(CapacityLimiter(60))
但是,最好遵循AnyIO官方文档提供的方法(如前所述).在应用程序启动时使用lifespan
事件处理程序完成此操作也是一个好主意,如演示的here所示.
Working Example 1
from fastapi import FastAPI
from contextlib import asynccontextmanager
from anyio import to_thread
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
to_thread.current_default_thread_limiter().total_tokens = 60
yield
app = FastAPI(lifespan=lifespan)
@app.get("/sync")
def test_sync() -> None:
time.sleep(3)
print("sync")
@app.get('/get_available_threads')
async def get_available_threads():
return to_thread.current_default_thread_limiter().available_tokens
使用ApacheB边,您可以测试上面的示例,如下所示,它将总共发送1000
个请求,其中同时发送50
个请求(-n
:请求数,-c
:并发请求数):
ab -n 1000 -c 50 "http://localhost:8000/sync"
由于上面的/sync
端点是用Normal def
而不是async def
定义的,FastAPI将在幕后使用run_in_threadpool()
在单独的线程中运行它,并在await
中运行它,从而确保事件循环(因此,主线程)不会因为在该端点内执行的阻塞操作(阻塞IO或CPU阻塞)而被阻塞.
在上面的示例上运行性能测试时,如果您从浏览器调用/get_available_threads
端点,例如http://localhost:8000/get_available_threads
,您将看到线程available的数量始终为10或更多(因为在此测试中一次只使用50个线程,但线程限制器被设置为60
),这意味着将AnyIO的线程限制器上的最大线程数设置为远远高于您需要的数字,如其他答案和您最近的示例中所示的200
,不会带来任何性能改进;相反,您最终会得到许多不被使用的线程"驻留"在那里.正如前面所解释的,最大线程数应该取决于您的API预计要并发服务的请求数,以及FastAPI本身在线程池中秘密运行的任何其他阻塞任务/函数(当然,还有服务器机器可用的资源).
下面的示例是上面的same,但不是让FastAPI本身处理def
端点内的阻塞操作(通过在外部线程池中运行def
端点并对其执行await
),而是现在将端点定义为async def
(这意味着FastAPI将直接在事件循环中运行它),但在端点内部,run_in_threadpool()
被用来运行阻塞操作(返回await
able).对下面的示例执行基准测试将产生与上一个示例类似的结果.
Working Example 2
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
from contextlib import asynccontextmanager
from anyio import to_thread
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
to_thread.current_default_thread_limiter().total_tokens = 60
yield
app = FastAPI(lifespan=lifespan)
@app.get("/sync_async_run_in_tp")
async def test_sync_async_with_run_in_threadpool() -> None:
await run_in_threadpool(time.sleep, 3)
print("sync_async using FastAPI's run_in_threadpool")
@app.get('/get_available_threads')
async def get_available_threads():
return to_thread.current_default_thread_limiter().available_tokens
使用ApacheBch,您可以测试上面的示例,如下所示:
ab -n 1000 -c 50 "http://localhost:8000/sync_async_run_in_tp"
当使用asyncio
‘S loop.run_in_executor()
时-在使用asyncio.get_running_loop()
获得正在运行的事件循环之后-可以将None
传递给executor
参数,这将导致使用default执行器;即ThreadPoolExecutor
.Note当调用loop.run_in_executor()
并将None
传递给executor
参数时,这does not会在每次您这样做时创建ThreadPoolExecutor
的新实例;相反,在您第一次这样做时,ThreadPoolExecutor
只被初始化一次,但是对于将None
传递给executor
参数的loop.run_in_executor()
的后续调用,Pythonreuses就是ThreadPoolExecutor
的同一实例(因此,default执行器).这一点可以在source code of loop.run_in_executor()
号公路上看到.这意味着,当调用await loop.run_in_executor(None, ...)
时,可以创建的线程数是ThreadPoolExecutor
类中默认线程工作线程数的limited.
如ThreadPoolExecutor
的文档中所述--如其实现this article8/Lib/concurrent/futures/thread.py#L145" rel="nofollow noreferrer">here中所示--默认情况下,max_workers
参数设置为None
,在这种情况下,工作线程的数量是根据以下公式设置的:min(32, os.cpu_count() + 4)
.os.cpu_count()
函数返回当前系统中的logical个CPU的数量.如this article中所解释的,physical核是指在硬件(例如,芯片)中提供的CPU核的数量,而logical核是考虑超线程的CPU核after的数量.例如,如果您的机器有4个物理核心,每个都有超线程(大多数现代的CPU都有这一点),那么在默认情况下,Python将看到8个CPU,并将12个线程(8个CPU+4)分配给池(为了"避免在多核机器上消耗令人惊讶的大量资源",Python将线程数量限制为32;然而,当使用定制的ThreadPoolExecutor
而不是default个时,人们总是可以自己调整max_workers
参数).您可以判断系统上的默认工作线程数,如下所示:
import concurrent.futures
# create a thread pool with the default number of worker threads
pool = concurrent.futures.ThreadPoolExecutor()
# report the number of worker threads chosen by default
# Note: `_max_workers` is a protected variable and may change in the future
print(pool._max_workers)
现在,如您的原始示例所示,您使用的不是定制的ThreadPoolExecutor
,而是在每次请求到达时通过调用await loop.run_in_executor(None, time.sleep, 3)
(在sync_async_func()
函数内,由/test/sync_async
端点触发)来使用default ThreadPoolExecutor
.假设您的机器有4个启用了超线程的物理核心(如前面的示例所述),那么default ThreadPoolExecutor
的默认工作线程数将是12.这意味着,根据您的原始示例和触发await loop.run_in_executor(None, time.sleep, 3)
函数的/test/sync_async
端点,您的应用程序一次只能处理12个并发请求.与使用run_in_threadpool()
相比,这是在性能结果中观察到的差异的main reason,后者默认情况下分配了40
个线程.
解决此问题的一种方法是在每次请求到达时创建一个ThreadPoolExecutor
的新实例(您自己创建,而不是使用default执行器),并在任务完成后终止它(使用with
语句),如下所示:
import concurrent.futures
import asyncio
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
await loop.run_in_executor(pool, time.sleep, 3)
虽然这应该可以很好地工作,但最好在应用程序启动时实例化一次ThreadPoolExecutor
,根据需要调整工作线程的数量,并在需要时重新使用该执行器.话虽如此,根据您可能为该任务使用的阻塞任务和/或外部库,如果在重用ThreadPoolExecutor
时任务完成后遇到内存泄漏--即不再需要但未释放的内存--您可能会发现,每次创建ThreadPoolExecutor
的新实例更合适,如上所述(然而,请注意,如果这是ProcessPoolExecutor
,则反复创建和销毁许多进程可能会变成computationally expensive).
下面是一个完整的工作示例,演示了如何创建可重用的自定义ThreadPoolExecutor
.从您的浏览器(例如http://localhost:8000/get_active_threads
)调用/get_active_threads
端点,同时使用ApacheBch运行性能测试(使用50
个并发请求,如您的问题中所述,如下所示),您将看到active个线程的数量从未超过51
(50个并发线程+1,这是主线程),尽管在下面的示例中将max_workers
参数设置为60
.这很简单,因为在此性能测试中,应用程序从不需要同时处理超过50
个请求.此外,ThreadPoolExecutor
不会旋转新的线程,如果有空闲线程可用(从而节省资源)-请参见relevant implementation part.因此,同样,如果您从未期望您的FastAPI应用程序一次处理超过50个请求,那么将ThreadPoolExecutor
初始化为max_workers=100
(如您最近的更新所示)将是不必要的.
Working Example
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import concurrent.futures
import threading
import asyncio
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
pool = concurrent.futures.ThreadPoolExecutor(max_workers=60)
yield {'pool': pool}
pool.shutdown()
app = FastAPI(lifespan=lifespan)
@app.get("/sync_async")
async def test_sync_async(request: Request) -> None:
loop = asyncio.get_running_loop()
await loop.run_in_executor(request.state.pool, time.sleep, 3)
print("sync_async")
@app.get('/get_active_threads')
async def get_active_threads():
return threading.active_count()
使用ApacheBch,您可以测试上面的示例,如下所示:
ab -n 1000 -c 50 "http://localhost:8000/sync_async"
结束语
一般来说,只要有可能,您应该始终致力于使用asynchronous代码(即使用async
/await
),因为async
代码(也称为协程)在事件循环中运行,该事件循环在主线程中运行并执行该线程中的所有任务.这意味着只有one个线程可以锁定解释器.但是,在处理sync阻塞IO绑定任务时,您可以(1)使用def
定义您的端点并让FastAPI在后台处理它,如前面和this answer中所述,或者(2)使用async def
定义您的端点并使用run_in_threadpool()
在单独的线程中运行阻塞任务并将其用于await
,或者(3)使用asyncio
‘S loop.run_in_executor()
和自定义的(最好是可重用的)ThreadPoolExecutor
,根据需要调整工作进程的数量.当需要执行阻塞CPU限制的任务时,虽然在外部线程中运行这类任务并使它们await
可以成功地防止事件循环被阻塞,但是,它不会提供并行运行代码所期望的性能改进.因此,对于受CPU限制的任务,您可能会 Select 使用ProcessPoolExecutor
(在一般使用进程时,您需要显式地使用if __name__ == '__main__'
保护入口点)-使用ProcessPoolExecutor
的示例可以在this answer中找到.
要在后台运行任务,而不等待它们完成,以便继续在端点中执行其余代码,您可以使用FastAPI的BackgroundTasks
,如图here和here所示.如果后台任务函数定义为async def
,FastAPI将直接在事件循环中运行它,而如果它定义为Normal def
,FastAPI将使用run_in_threadpool()
和await
返回的协程(与API端点的概念相同).当您需要在后台运行async def
函数,但不一定要在返回FastAPI响应后触发它时(BackgroundTasks
中的情况),另一种 Select 是使用asyncio.create_task()
,如this answer和this answer所示.如果您需要使用perform heavy background computation,并且不一定需要由相同的进程运行,那么使用其他更大的工具(如Celery )可能会使您受益.
最后,关于optimal/maximum number of worker threads,我建议阅读this article(有关ThreadPoolExecutor
的更多详细信息,请参阅this article).正如文章中所解释的:
这对线程中limit the number%的工作线程很重要
池到您希望完成的异步任务的数量、based
on系统中的资源或您希望完成的资源的数量
打算在您的任务中使用.
或者,您可能希望increase the number%的工作线程
戏剧性地,given the greater capacity在您打算的资源中
来使用.
[.]
在中有more threads than CPUs个(物理或逻辑)是很常见的
你的系统.原因是线程用于IO绑定任务,而不是
受CPU限制的任务.这意味着线程用于等待的任务
对于响应相对较慢的资源,如硬盘、DVD
驱动器、打印机、网络连接等.
因此,it is not uncommon要有几十个,几百个,甚至
您的应用程序中有数千个线程,depending on your specific
needs.有超过一个或几千个线程是不常见的.
如果您需要这么多线程,那么替代解决方案可能是
首选,如AsyncIO
.
另外,在同一篇文章中:
Does the Number of Threads in the ThreadPoolExecutor
Match the Number of CPUs or Cores?
ThreadPoolExecutor
中的工作线程数与系统中的CPU或CPU核心数之比为not
related.
您可以配置线程的数量based on的数量
您需要执行的任务、本地系统资源的数量
有可用的(例如,内存),以及您的资源的限制
打算在您的任务内进行访问(例如,连接到远程
服务器).
How Many Threads Should I Use?
如果您有数百个任务,您可能应该设置
线程数等于任务数.
如果您有数千个任务,您可能应该将
丝数在数百或1000之间.
如果您的应用程序打算在
将来,您可以测试不同数量的线程并比较总体
执行时间,然后 Select 给出的线程数
大概是最好的表现.您可能想要在
这些测试是通过随机睡眠操作进行的.
What Is the Maximum Number of Worker Threads in the ThreadPoolExecutor
?
中没有工作线程的最大数量.
ThreadPoolExecutor
.
但是,您的系统将有一个数量上限
您可以基于how much main memory (RAM) you have
available创建线程.
在超过主内存之前,您将达到一个递减点
在添加新线程和执行更多任务方面的返回.这
是因为您的操作系统必须在线程之间切换,
拨打了context switching.由于一次有太多活动的线程,您的
程序在上下文切换上花费的时间可能比实际执行的时间多
任务.
对于许多应用程序来说,合理的上限是数百个线程
也许只有几千根线.超过几千个线程在一个
现代系统可能会导致太多的上下文切换,这取决于
您的系统和正在执行的任务的类型.