编辑:这个问题的答案有点复杂.Tl;dr确保您正确地计算Lazy Loading;下面代码中声明的许多变量都是全局声明和设置的,但是您的全局变量应该设置为None,并且只在实际的API调用中更改!


我都快疯了.

这是我的全部main.py.它可以通过functions-framework --target=api本地运行,也可以直接在Google Cloud上运行:

import functions_framework
import sqlalchemy
import threading
from google.cloud.sql.connector import Connector, IPTypes
from sqlalchemy.orm import sessionmaker, scoped_session

Base = sqlalchemy.orm.declarative_base()

class TestUsers(Base):
    __tablename__ = 'TestUsers'
    
    uuid = sqlalchemy.Column(sqlalchemy.String, primary_key=True)

cloud_sql_connection_name = "myproject-123456:asia-northeast3:tosmedb"

connector = Connector()

def getconn():
    connection = connector.connect(
        cloud_sql_connection_name,
        "pg8000",
        user="postgres",
        password="redacted",
        db="tosme",
        ip_type=IPTypes.PUBLIC,
    )
    return connection

def init_pool():
    engine_url = sqlalchemy.engine.url.URL.create(
        "postgresql+pg8000",
        username="postgres",
        password="redacted",
        host=cloud_sql_connection_name,
        database="tosme"
    )
    engine = sqlalchemy.create_engine(engine_url, creator=getconn)

    # Create tables if they don't exist
    Base.metadata.create_all(engine)
    return engine

engine = init_pool()

# Prepare a thread-safe Session maker
Session = scoped_session(sessionmaker(bind=engine))

print("Database initialized")

def run_concurrency_test():
    def get_user():
      with Session() as session:
          session.query(TestUsers).first()

    print("Simulating concurrent reads...")

    threads = []
    for i in range(2):
        thread = threading.Thread(target=get_user)
        threads.append(thread)
        thread.start()

    # Wait for all threads to complete
    for thread in threads:
        thread.join()
        print(f"Thread {thread.name} completed")

    print("Test passed - Threads all completed!\n")

run_concurrency_test()

@functions_framework.http
def api(request):
    print("API hit - Calling run_concurrency_test()...")
    run_concurrency_test()
    return "Success"

requirements.txt:

functions-framework==3.*
cloud-sql-python-connector[pg8000]==1.5.*
SQLAlchemy==2.*
pg8000==1.*

这非常简单--而且很管用!只要您有一个PostgreSQL实例,它就会根据需要创建TestUsers表,查询两次(同时通过线程!),每次您curl 它时,它都会正常工作.以下是一些示例输出:

Database initialized
Simulating concurrent reads...
Thread Thread-4 (get_user) completed
Thread Thread-5 (get_user) completed
Test passed - Threads all completed!

API hit - Calling run_concurrency_test()...
Simulating concurrent reads...
Thread Thread-7 (get_user) completed
Thread Thread-8 (get_user) completed
Test passed - Threads all completed!

However,如果我注释掉第一个对run_concurrency_test()的调用(即,不在api(request)中的调用),运行它并curl ,我得到以下结果:

Database initialized
API hit - Calling run_concurrency_test()...
Simulating concurrent reads...
Thread Thread-4 (get_user) completed

它卡住了!具体地说,它停留在session.query(TestUsers).first().当我第一次在api()之外运行并发测试时,它没有卡住.就我所知,我的代码是无状态的,并且是线程安全的.那么,到底是什么原因让它突然失效了呢?

推荐答案

带云功能的Cloud SQL Python Connector的正确详细用法请参阅此other SO post.

此处错误的原因与将Connector初始化为云函数请求上下文之外的全局变量有关.云功能只有在发出请求时才能访问计算和资源,否则就会缩减规模.Connector具有后台任务,运行这些任务是为了成功连接到Cloud SQL当您try 连接时,这些后台任务将被限制并导致您的错误,因为您试图在没有为您的函数分配CPU的情况下全局初始化它.

正是出于这个原因,上面的链接帖子展示了这一点.

NOTE:正如另一个答案提到的那样,不建议初始化getconn内部的Connector,这会在try 扩展流量时向您的代码中引入更多错误.它之所以起作用,是因为它保证Connector在云函数请求上下文中被初始化,但将在每个数据库连接上创建新的Connector.`连接器旨在跨连接共享,以支持可伸缩的解决方案,因此将其作为惰性全局变量是推荐方法的原因.

Python相关问答推荐

Django:如何将一个模型的唯一实例创建为另一个模型中的字段

Pandas基于另一列的价值的新列

如何最好地处理严重级联的json

有什么方法可以修复奇怪的y轴Python matplotlib图吗?

按日期和组增量计算总价值

Django文件上传不起作用:文件未出现在媒体目录或数据库中

Pandas滚动分钟,来自其他列的相应值

将列表中的元素替换为收件箱中的元素

GEKKO:已知延迟的延迟系统的参数估计

Altair -箱形图边界设置为黑色,中线设置为红色

如何将我的位置与光强度数据匹配到折射图案曲线中?

将行从一个DF添加到另一个DF

请从Python访问kivy子部件的功能需要帮助

通过交换 node 对链接列表进行 Select 排序

使用polars .滤镜进行切片速度比pandas .loc慢

Excel图表-使用openpyxl更改水平轴与Y轴相交的位置(Python)

如何让Flask 中的请求标签发挥作用

如何获得每个组的时间戳差异?

PyQt5,如何使每个对象的 colored颜色 不同?'

pysnmp—lextudio使用next()和getCmd()生成器导致TypeError:tuple对象不是迭代器''