我在Docker上使用气流和Postgres.我通过以下连接执行DML命令(外部气流):

 # Constructor method initializing the connection to DB and creating cursor
    def __init__(self, db="projeto-pos", user="postgres", host="localhost", password=my_psswd, port="5433"):
        self.conn = psycopg2.connect(
            database=db, host=host, port=port, password=password, user=user)
        self.cur = self.conn.cursor()

我正在使用Docker,Postgres的容器可以在本地主机上找到:5433.我可以从Python中完美地使用Pgadmin和执行DML命令.

我实例化了Postgress(db ojbect)类,并在DAG中创建了以下任务:

insert_into = PythonOperator(
        task_id='insert_into',
        python_callable=db.insert_into
    )

但是,当我打开气流UI时,会弹出以下消息,我无法运行DAG:

Broken DAG: [/opt/airflow/dags/projeto_api.py] Traceback (most recent call last):
  File "/opt/airflow/dags/functions/db_manipulation.py", line 10, in __init__
    database=db, host=host, port=port, password=password, user=user)
  File "/home/airflow/.local/lib/python3.7/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: connection to server at "localhost" (127.0.0.1), port 5433 failed: Connection refused
    Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (::1), port 5433 failed: Cannot assign requested address
    Is the server running on that host and accepting TCP/IP connections?

我使用this Dockerfile来实现气流.我也try 了在气流用户界面的"连接"部分,但我得到了相同的消息.我确信连接已启动并正在运行.

推荐答案

postgres的主机名是"postgres"而不是"localhost".

该名称由docker compose中postgres服务的名称定义.

services:
  postgres:
    image: postgres:13

此外,您可以在docker compose中看到连接字符串(粗体)

AIRFLOW\uu DATABASE\uu SQL\u ALCHEMY\u CONN:postgresql+psycopg2://AIRFLOW:airflow@postgres/气流

Python相关问答推荐

从DataFrame.apply创建DataFrame

如何从FDaGrid实例中删除某些函数?

滚动和,句号来自Pandas列

抓取rotowire MLB球员新闻并使用Python形成表格

对整个 pyramid 进行分组与对 pyramid 列子集进行分组

Julia CSV for Python中的等效性Pandas index_col参数

NumPy中条件嵌套for循环的向量化

如何在turtle中不使用write()来绘制填充字母(例如OEG)

如何杀死一个进程,我的Python可执行文件以sudo启动?

如何在两列上groupBy,并使用pyspark计算每个分组列的平均总价值

处理Gekko的非最优解

使用Python异步地持久跟踪用户输入

Beautifulsoup:遍历一个列表,从a到z,并解析数据,以便将其存储在pdf中.

如何过滤组s最大和最小行使用`transform`'

如何在一组行中找到循环?

获取PANDA GROUP BY转换中的组的名称

如何在Airflow执行日期中保留日期并将时间转换为00:00

有没有办法让Re.Sub报告它所做的每一次替换?

Python日志(log)库如何有效地获取lineno和funcName?

Pandas:计数器的滚动和,复位