我正在try 在一个表被外部Spark程序覆盖后在Postgres上创建触发器.基本上,Spark覆盖postgres上的一个表,然后触发器使用该表在另一个表上插入记录,并进行一些修改.将在此方案中触发工作.如果是,我应该使用UPDATE还是INSERT触发器?

编辑: Spark和SQL的过程.

来自皮斯帕克的写字表:

df.write.format("jdbc")
        .option("truncate","true")
        .option("driver", "org.postgresql.Driver")
        .option("url", postgres_host)
        .option("user", postgres_user)
        .option("password", postgres_password)
        .option("dbtable", table_name)
        .mode("overwrite")
        .save()

触发器和函数示例:

CREATE
    OR REPLACE FUNCTION test_trigger() 
    RETURNS TRIGGER AS $func$ BEGIN IF NOT EXISTS (
    SELECT
    FROM
        information_schema.tables
    WHERE
        table_schema = 'public'
        AND table_name = 'test_table'
) THEN EXECUTE 'CREATE TABLE IF NOT EXISTS test_table as
            SELECT *,ST_GeomFromText(ST_AsText(col), 4326) as geom 
            FROM  spark_table';
END IF;
RETURN NULL;
END;

$func$ LANGUAGE plpgsql;

CREATE TRIGGER insert_or_update_parcel_1
  AFTER INSERT OR UPDATE
  ON spark_table
  FOR EACH STATEMENT
  EXECUTE PROCEDURE create_table_trigger();

我已经测试过了,触发器可以工作,但它只从覆盖的表中取出一行,而不是所有的行.

推荐答案

您已经使用了FOR EACH STATEMENT,这意味着触发器将在整个语句执行后触发一次,而不是每行触发一次.您可能需要修改您的函数,以循环遍历spark_table中新插入的行,或者在test_table中执行批量更新插入(UPDATE+INSERT).最后一个选项(Batch Upsert)如下所示:

CREATE OR REPLACE FUNCTION create_table_trigger()
RETURNS TRIGGER AS $$
BEGIN
  INSERT INTO test_table
  SELECT *, ST_GeomFromText(ST_AsText(col), 4326) as geom
  FROM spark_table
  ON CONFLICT (your_primary_key_here)
  DO UPDATE SET col = EXCLUDED.col, geom = EXCLUDED.geom;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;

由于Spark正在覆盖该表(本质上是一系列删除和插入操作),因此AFTER INSERT触发器确实是相关的.(如"PostgreSQL Create Trigger After INSERT/UPDATE/DELETE"by Aqsa Yasin中所述)

触发因素仍将是:

CREATE TRIGGER insert_or_update_parcel_1
AFTER INSERT OR UPDATE
ON spark_table
FOR EACH STATEMENT
EXECUTE PROCEDURE create_table_trigger();

我只是确保使用相同的函数名create_table_trigger,而不是test_trigger().


I need it to trigger once and then execute the statement which is taking the whole table and creating a new table if it does not exist.
The spark code essentially truncates the table and adds new rows.
The insert/update you provided would be the else part of the condition. The issue I am facing is that the trigger is working but in test_table there is only one row and not all the rows from spark_table. what I need is that the statement is execute after the last row has been inserted/updated

当我从pgadmin在spark_table上更新/插入时,触发器能够获取所有行,但使用spark时,test_table中只有一行.

因此,这个问题与Spark如何与PostgreSQL触发器交互有关.当Spark使用.mode("overwrite")覆盖该表时,它先执行TRUNCATE,然后执行批INSERT.您的触发器设置为运行AFTER INSERT OR UPDATE,理想情况下应该在插入所有行之后执行.然而,当操作源自Spark时,它的表现似乎有所不同.

您可以在现有触发器的基础上添加一个AFTER TRUNCATE触发器,以确保触发器在表截断操作之后触发.

与使用.mode("overwrite") in Spark不同,您可以try 使用单独的SQL命令手动截断PostgreSQL表,然后使用Spark设置为INSERT.mode("append").这将确保AFTER INSERT OR UPDATE触发器的行为符合预期.

还可以考虑在触发器函数中添加调试语句,以捕获其执行过程中发生的情况.这可以帮助您更好地理解为什么当Spark是数据源时只处理一行.

这似乎是一个引发JDBC写入问题的问题.所有options(append, overwrite)个似乎都在插入一行后执行触发器.

如果Spark的JDBC写入似乎是通过在插入一行之后执行触发器来导致问题的,您可以探索其他方法来规避此行为:

  • 您可以手动管理事务并成批插入记录,然后提交,而不是使用.mode("overwrite").

  • 或者:在Spark应用程序中使用SQL查询手动执行截断和插入操作,确保它们发生在单个事务中.这将使PostgreSQL的触发器能够正确执行.

  • 或者:先将Spark DataFrame写入PostgreSQL中的临时表或临时表.然后使用PostgreSQL存储过程将数据从临时表移动到终结表,后者也可以执行触发器逻辑.

  • 或者:使用外部作业(job)调度程序或PostgreSQL后台工作程序来判断表修改并异步执行所需的逻辑.您可以设置标志或时间戳来指示表已更新且需要处理的时间.

  • 最后:在数据写入操作之后,使用SQL查询从您的Spark应用程序显式调用包含触发器逻辑的PostgreSQL函数.这确保了逻辑在数据写入之后立即执行.

这些替代方法应该为您提供对触发器或函数何时执行的更多控制.


I'd be really interested if you could elaborate the last option ("Calling the function from spark application").
I'd like to do it in a async fashion so that the spark cluster does not keep running while the Postgres function takes hours to complete.
I have used psycopg2 which keeps the connection until the function execution is complete.

从Spark应用程序异步调用PostgreSQL函数可以通过将函数执行卸载到单独的线程或进程来完成,这将释放主应用程序以继续其工作.

您可以创建一个PostgreSQL函数,以使用pg_background extension或通过将工作排队以供后台工作程序稍后处理来异步运行.遗憾的是,原生PostgreSQL不提供内置的异步函数执行.

如果您在基于Python的Spark应用程序中使用psycopg2 library,则可以使用concurrent.futures库在单独的线程中执行函数(如"Debugging pool usage"所示).然而,这仍将保持与PostgreSQL数据库的连接打开.

下面是一个使用concurrent.futures.ThreadPoolExecutor的Python示例:

import psycopg2
import concurrent.futures

def call_pg_function():
    conn = psycopg2.connect(database="your_db", user="your_user", password="your_password", host="your_host", port="your_port")
    cur = conn.cursor()
    cur.callproc('your_function_name', [param1, param2])  # Replace with your function and parameters
    cur.close()
    conn.close()

# Asynchronous call
with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(call_pg_function)

但是:

如果PostgreSQL函数需要很长时间才能运行,并且您想要将Spark应用程序与数据库操作分离,请考虑使用异步任务队列like Celery.

  1. Spark应用程序将写入PostgreSQL表,然后将一个任务排队以运行该函数.
  2. 一个单独的Celery 工人将 Select 任务并执行函数,这将释放Spark应用程序以继续其他工作.

这是一种更复杂的方法,它将涉及像RabbitMQRedis(例如,pip install celery redis)这样的消息代理.然后,您可以设置一个简单的Celery 任务来执行PostgreSQL函数.

创建一个新的Pythonfile celery_config.py来存储您的Celery 设置.

BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

创建一个新的Python文件tasks.py,并设置您的Celery 实例和任务.

from celery import Celery
import psycopg2

# Initialize Celery
app = Celery('tasks')
app.config_from_object('celery_config')

@app.task
def execute_pg_function():
    conn = psycopg2.connect(
        database="your_db",
        user="your_user",
        password="your_password",
        host="your_host",
        port="your_port"
    )
    cur = conn.cursor()
    cur.callproc('your_function_name', [param1, param2])  # Replace with your function and parameters
    conn.commit()
    cur.close()
    conn.close()

启动一名Celery 工人来执行任务.

celery -A tasks worker --loglevel=info

在您的Spark应用程序中,您现在可以将要异步运行的任务排队.

from tasks import execute_pg_function

# Trigger the PostgreSQL function via Celery
execute_pg_function.apply_async()

您的Spark应用程序不会等待PostgreSQL函数完成.任务将由Celery 工人挑选并独立执行,实现您所希望的异步行为.

如果我没记错的话,Celery 进程会一直运行到函数结束吗?

是的,Celery 工人将 Select 任务并运行它直到完成,但这将与您的主Spark应用程序分开异步完成.Celery 工作进程将处理PostgreSQL函数的执行,并将保持活动状态,直到函数执行完成.但是,这不会阻止您的Spark应用程序,这是使用Celery 实现此目的的关键优势.

一旦您使用apply_async()将任务入队,任务将被发送到消息代理,然后发送到Celery 工人.您的Spark应用程序不等待任务完成;它独立地继续自己的执行.这允许您的Spark应用程序执行其他任务,甚至在Celery 工人继续执行PostgreSQL函数时终止.

Postgresql相关问答推荐

转换失败:(—122.763091,49.04676)转换为地理(位置)""

在输入稍有错误的PostgreSQL表中进行快速字符串搜索

横向联接返回的行数太多

Postgres 13.8 -如何在对数据执行窗口操作时返回所有行

PostgreSQL:函数结果表内冲突(...)上的";中的字段名称

为什么Postgres在打印时能完全缩短时间跨度?

将数组的所有元素循环到jsonb中并修改值

如何判断上次在 TimescaleDB 上运行连续聚合作业(job)的时间

在 postgresql 数据库 timestampz 中保留用户偏移量

如何让 Flask SQLAlchemy 重用数据库连接?

如何检索 PostgreSQL 数据库的 comments ?

判断值是否存在于列中

plpgsql:使用 2 个 OUT 参数调用函数

postgresql 分组和内部连接

使用 pg-promise 插入多条记录

使用 PostGIS 查找给定点的 n 个最近邻?

PostgreSQL 条件 where 子句

如何在 psql 中设置默认显示模式

PostgreSQL regexp_replace() 只保留一个空格

在 pg_restore 期间排除表