您已经使用了FOR EACH STATEMENT
,这意味着触发器将在整个语句执行后触发一次,而不是每行触发一次.您可能需要修改您的函数,以循环遍历spark_table
中新插入的行,或者在test_table
中执行批量更新插入(UPDATE+INSERT).最后一个选项(Batch Upsert)如下所示:
CREATE OR REPLACE FUNCTION create_table_trigger()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO test_table
SELECT *, ST_GeomFromText(ST_AsText(col), 4326) as geom
FROM spark_table
ON CONFLICT (your_primary_key_here)
DO UPDATE SET col = EXCLUDED.col, geom = EXCLUDED.geom;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
由于Spark正在覆盖该表(本质上是一系列删除和插入操作),因此AFTER INSERT
触发器确实是相关的.(如"PostgreSQL Create Trigger After INSERT/UPDATE/DELETE"by Aqsa Yasin中所述)
触发因素仍将是:
CREATE TRIGGER insert_or_update_parcel_1
AFTER INSERT OR UPDATE
ON spark_table
FOR EACH STATEMENT
EXECUTE PROCEDURE create_table_trigger();
我只是确保使用相同的函数名create_table_trigger
,而不是test_trigger()
.
I need it to trigger once and then execute the statement which is taking the whole table and creating a new table if it does not exist.
The spark code essentially truncates the table and adds new rows.
The insert/update you provided would be the else part of the condition. The issue I am facing is that the trigger is working but in test_table there is only one row and not all the rows from spark_table. what I need is that the statement is execute after the last row has been inserted/updated
当我从pgadmin在spark_table上更新/插入时,触发器能够获取所有行,但使用spark时,test_table中只有一行.
因此,这个问题与Spark如何与PostgreSQL触发器交互有关.当Spark使用.mode("overwrite")
覆盖该表时,它先执行TRUNCATE
,然后执行批INSERT
.您的触发器设置为运行AFTER INSERT OR UPDATE
,理想情况下应该在插入所有行之后执行.然而,当操作源自Spark时,它的表现似乎有所不同.
您可以在现有触发器的基础上添加一个AFTER TRUNCATE
触发器,以确保触发器在表截断操作之后触发.
与使用.mode("overwrite")
in Spark不同,您可以try 使用单独的SQL命令手动截断PostgreSQL表,然后使用Spark设置为INSERT
和.mode("append")
.这将确保AFTER INSERT OR UPDATE
触发器的行为符合预期.
还可以考虑在触发器函数中添加调试语句,以捕获其执行过程中发生的情况.这可以帮助您更好地理解为什么当Spark是数据源时只处理一行.
这似乎是一个引发JDBC写入问题的问题.所有options(append, overwrite)
个似乎都在插入一行后执行触发器.
如果Spark的JDBC写入似乎是通过在插入一行之后执行触发器来导致问题的,您可以探索其他方法来规避此行为:
您可以手动管理事务并成批插入记录,然后提交,而不是使用.mode("overwrite")
.
或者:在Spark应用程序中使用SQL查询手动执行截断和插入操作,确保它们发生在单个事务中.这将使PostgreSQL的触发器能够正确执行.
或者:先将Spark DataFrame写入PostgreSQL中的临时表或临时表.然后使用PostgreSQL存储过程将数据从临时表移动到终结表,后者也可以执行触发器逻辑.
或者:使用外部作业(job)调度程序或PostgreSQL后台工作程序来判断表修改并异步执行所需的逻辑.您可以设置标志或时间戳来指示表已更新且需要处理的时间.
最后:在数据写入操作之后,使用SQL查询从您的Spark应用程序显式调用包含触发器逻辑的PostgreSQL函数.这确保了逻辑在数据写入之后立即执行.
这些替代方法应该为您提供对触发器或函数何时执行的更多控制.
I'd be really interested if you could elaborate the last option ("Calling the function from spark application").
I'd like to do it in a async fashion so that the spark cluster does not keep running while the Postgres function takes hours to complete.
I have used psycopg2 which keeps the connection until the function execution is complete.
从Spark应用程序异步调用PostgreSQL函数可以通过将函数执行卸载到单独的线程或进程来完成,这将释放主应用程序以继续其工作.
您可以创建一个PostgreSQL函数,以使用pg_background
extension或通过将工作排队以供后台工作程序稍后处理来异步运行.遗憾的是,原生PostgreSQL不提供内置的异步函数执行.
如果您在基于Python的Spark应用程序中使用psycopg2
library,则可以使用concurrent.futures
库在单独的线程中执行函数(如"Debugging pool usage"所示).然而,这仍将保持与PostgreSQL数据库的连接打开.
下面是一个使用concurrent.futures.ThreadPoolExecutor
的Python示例:
import psycopg2
import concurrent.futures
def call_pg_function():
conn = psycopg2.connect(database="your_db", user="your_user", password="your_password", host="your_host", port="your_port")
cur = conn.cursor()
cur.callproc('your_function_name', [param1, param2]) # Replace with your function and parameters
cur.close()
conn.close()
# Asynchronous call
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(call_pg_function)
但是:
如果PostgreSQL函数需要很长时间才能运行,并且您想要将Spark应用程序与数据库操作分离,请考虑使用异步任务队列like Celery.
- Spark应用程序将写入PostgreSQL表,然后将一个任务排队以运行该函数.
- 一个单独的Celery 工人将 Select 任务并执行函数,这将释放Spark应用程序以继续其他工作.
这是一种更复杂的方法,它将涉及像RabbitMQ或Redis(例如,pip install celery redis
)这样的消息代理.然后,您可以设置一个简单的Celery 任务来执行PostgreSQL函数.
创建一个新的Pythonfile celery_config.py
来存储您的Celery 设置.
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
创建一个新的Python文件tasks.py
,并设置您的Celery 实例和任务.
from celery import Celery
import psycopg2
# Initialize Celery
app = Celery('tasks')
app.config_from_object('celery_config')
@app.task
def execute_pg_function():
conn = psycopg2.connect(
database="your_db",
user="your_user",
password="your_password",
host="your_host",
port="your_port"
)
cur = conn.cursor()
cur.callproc('your_function_name', [param1, param2]) # Replace with your function and parameters
conn.commit()
cur.close()
conn.close()
启动一名Celery 工人来执行任务.
celery -A tasks worker --loglevel=info
在您的Spark应用程序中,您现在可以将要异步运行的任务排队.
from tasks import execute_pg_function
# Trigger the PostgreSQL function via Celery
execute_pg_function.apply_async()
您的Spark应用程序不会等待PostgreSQL函数完成.任务将由Celery 工人挑选并独立执行,实现您所希望的异步行为.
如果我没记错的话,Celery 进程会一直运行到函数结束吗?
是的,Celery 工人将 Select 任务并运行它直到完成,但这将与您的主Spark应用程序分开异步完成.Celery 工作进程将处理PostgreSQL函数的执行,并将保持活动状态,直到函数执行完成.但是,这不会阻止您的Spark应用程序,这是使用Celery 实现此目的的关键优势.
一旦您使用apply_async()
将任务入队,任务将被发送到消息代理,然后发送到Celery 工人.您的Spark应用程序不等待任务完成;它独立地继续自己的执行.这允许您的Spark应用程序执行其他任务,甚至在Celery 工人继续执行PostgreSQL函数时终止.