编辑:这个问题的答案有点复杂.Tl;dr确保您正确地计算Lazy Loading;下面代码中声明的许多变量都是全局声明和设置的,但是您的全局变量应该设置为None
,并且只在实际的API调用中更改!
我都快疯了.
这是我的全部main.py
.它可以通过functions-framework --target=api
本地运行,也可以直接在Google Cloud上运行:
import functions_framework
import sqlalchemy
import threading
from google.cloud.sql.connector import Connector, IPTypes
from sqlalchemy.orm import sessionmaker, scoped_session
Base = sqlalchemy.orm.declarative_base()
class TestUsers(Base):
__tablename__ = 'TestUsers'
uuid = sqlalchemy.Column(sqlalchemy.String, primary_key=True)
cloud_sql_connection_name = "myproject-123456:asia-northeast3:tosmedb"
connector = Connector()
def getconn():
connection = connector.connect(
cloud_sql_connection_name,
"pg8000",
user="postgres",
password="redacted",
db="tosme",
ip_type=IPTypes.PUBLIC,
)
return connection
def init_pool():
engine_url = sqlalchemy.engine.url.URL.create(
"postgresql+pg8000",
username="postgres",
password="redacted",
host=cloud_sql_connection_name,
database="tosme"
)
engine = sqlalchemy.create_engine(engine_url, creator=getconn)
# Create tables if they don't exist
Base.metadata.create_all(engine)
return engine
engine = init_pool()
# Prepare a thread-safe Session maker
Session = scoped_session(sessionmaker(bind=engine))
print("Database initialized")
def run_concurrency_test():
def get_user():
with Session() as session:
session.query(TestUsers).first()
print("Simulating concurrent reads...")
threads = []
for i in range(2):
thread = threading.Thread(target=get_user)
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
print(f"Thread {thread.name} completed")
print("Test passed - Threads all completed!\n")
run_concurrency_test()
@functions_framework.http
def api(request):
print("API hit - Calling run_concurrency_test()...")
run_concurrency_test()
return "Success"
requirements.txt
:
functions-framework==3.*
cloud-sql-python-connector[pg8000]==1.5.*
SQLAlchemy==2.*
pg8000==1.*
这非常简单--而且很管用!只要您有一个PostgreSQL实例,它就会根据需要创建TestUsers表,查询两次(同时通过线程!),每次您curl 它时,它都会正常工作.以下是一些示例输出:
Database initialized
Simulating concurrent reads...
Thread Thread-4 (get_user) completed
Thread Thread-5 (get_user) completed
Test passed - Threads all completed!
API hit - Calling run_concurrency_test()...
Simulating concurrent reads...
Thread Thread-7 (get_user) completed
Thread Thread-8 (get_user) completed
Test passed - Threads all completed!
However,如果我注释掉第一个对run_concurrency_test()
的调用(即,不在api(request)
中的调用),运行它并curl ,我得到以下结果:
Database initialized
API hit - Calling run_concurrency_test()...
Simulating concurrent reads...
Thread Thread-4 (get_user) completed
它卡住了!具体地说,它停留在session.query(TestUsers).first()
.当我第一次在api()
之外运行并发测试时,它没有卡住.就我所知,我的代码是无状态的,并且是线程安全的.那么,到底是什么原因让它突然失效了呢?