我对气流是个新手.我已经在ubuntu VM上安装了Airflow,并配置了连接属性以连接到我的azure SQL数据库.我的DAG中有以下代码:

from airflow import DAG
from datetime import datetime
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator

with DAG ('user_processing',start_date=datetime(2023,3,10), schedule_interval='@daily',catchup=False) as dag:
    create_table = MsSqlOperator(
        task_id = "create_table",
        mssql_conn_id="mssql",
        sql = """
        CREATE TABLE user_schema.user_det (
        id int,
        first_name VARCHAR(200),
        last_name VARCHAR(200)
        )
        """
    )

当我try 使用命令运行任务时- airflow tasks test user_processing create_table 2023-10-02个 ,我得到了一个错误-

[2023-10-03T16:54:08.684+0000] {dagbag.py:539} INFO - Filling up the DagBag from /home/azureuser/airflow/dags
[2023-10-03T16:54:08.744+0000] {example_python_operator.py:89} WARNING - The virtalenv_python example task requires virtualenv, please install it.
[2023-10-03T16:54:08.754+0000] {tutorial_taskflow_api_virtualenv.py:29} WARNING - The tutorial_taskflow_api_virtualenv example DAG requires virtualenv, please install it.
[2023-10-03T16:54:08.876+0000] {example_local_kubernetes_executor.py:39} WARNING - Could not import DAGs in example_local_kubernetes_executor.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/airflow/example_dags/example_local_kubernetes_executor.py", line 37, in <module>
    from kubernetes.client import models as k8s
ModuleNotFoundError: No module named 'kubernetes'
[2023-10-03T16:54:08.877+0000] {example_local_kubernetes_executor.py:40} WARNING - Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]
[2023-10-03T16:54:08.950+0000] {example_kubernetes_executor.py:38} WARNING - The example_kubernetes_executor example DAG requires the kubernetes provider. Please install it with: pip install apache-airflow[cncf.kubernetes]
[2023-10-03T16:54:09.115+0000] {workday.py:36} WARNING - Could not import pandas. Holidays will not be considered.
[2023-10-03T16:54:09.146+0000] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: user_processing.create_table __airflow_temporary_run_2023-10-03T14:54:14.207818+00:00__ [None]>
[2023-10-03T16:54:09.151+0000] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: user_processing.create_table __airflow_temporary_run_2023-10-03T14:54:14.207818+00:00__ [None]>
[2023-10-03T16:54:09.151+0000] {taskinstance.py:1359} INFO - Starting attempt 1 of 1
[2023-10-03T16:54:09.151+0000] {taskinstance.py:1428} WARNING - cannot record queued_duration for task create_table because previous state change time has not been saved
[2023-10-03T16:54:09.152+0000] {taskinstance.py:1380} INFO - Executing <Task(MsSqlOperator): create_table> on 2023-10-02 00:00:00+00:00
[2023-10-03T16:54:09.172+0000] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='user_processing' AIRFLOW_CTX_TASK_ID='create_table' AIRFLOW_CTX_EXECUTION_DATE='2023-10-02T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2023-10-03T14:54:14.207818+00:00__'
[2023-10-03T16:54:09.174+0000] {sql.py:274} INFO - Executing:
        CREATE TABLE user_schema.user_det (
        id int,
        first_name VARCHAR(200),
        last_name VARCHAR(200)
        )

[2023-10-03T16:54:09.177+0000] {base.py:73} INFO - Using connection ID 'mssql' for task execution.
[2023-10-03T16:54:09.323+0000] {base.py:73} INFO - Using connection ID 'mssql' for task execution.
[2023-10-03T16:54:10.704+0000] {taskinstance.py:1935} ERROR - Task failed with exception
Traceback (most recent call last):
  File "src/pymssql/_pymssql.pyx", line 647, in pymssql._pymssql.connect
  File "src/pymssql/_mssql.pyx", line 2109, in pymssql._mssql.connect
  File "src/pymssql/_mssql.pyx", line 701, in pymssql._mssql.MSSQLConnection.__init__
  File "src/pymssql/_mssql.pyx", line 1818, in pymssql._mssql.maybe_raise_MSSQLDatabaseException
  File "src/pymssql/_mssql.pyx", line 1835, in pymssql._mssql.raise_MSSQLDatabaseException
pymssql._mssql.MSSQLDatabaseException: (18456, b"Login failed for user 'rohan1997'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n")

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/airflow/providers/common/sql/operators/sql.py", line 280, in execute
    output = hook.run(
  File "/usr/local/lib/python3.8/dist-packages/airflow/providers/common/sql/hooks/sql.py", line 385, in run
    with closing(self.get_conn()) as conn:
  File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/providers/microsoft/mssql/hooks/mssql.py", line 98, in get_conn
    conn = pymssql.connect(
  File "src/pymssql/_pymssql.pyx", line 653, in pymssql._pymssql.connect
pymssql._pymssql.OperationalError: (18456, b"Login failed for user 'rohan1997'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n")
[2023-10-03T16:54:10.716+0000] {taskinstance.py:1398} INFO - Marking task as FAILED. dag_id=user_processing, task_id=create_table, execution_date=20231002T000000, start_date=, end_date=20231003T165410
Traceback (most recent call last):
  File "src/pymssql/_pymssql.pyx", line 647, in pymssql._pymssql.connect
  File "src/pymssql/_mssql.pyx", line 2109, in pymssql._mssql.connect
  File "src/pymssql/_mssql.pyx", line 701, in pymssql._mssql.MSSQLConnection.__init__
  File "src/pymssql/_mssql.pyx", line 1818, in pymssql._mssql.maybe_raise_MSSQLDatabaseException
  File "src/pymssql/_mssql.pyx", line 1835, in pymssql._mssql.raise_MSSQLDatabaseException
pymssql._mssql.MSSQLDatabaseException: (18456, b"Login failed for user 'rohan1997'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n")

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/dist-packages/airflow/__main__.py", line 59, in main
    args.func(args)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/cli.py", line 113, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 620, in task_test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 77, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1832, in run
    self._run_raw_task(
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1516, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode, session=session)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1679, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1742, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/providers/common/sql/operators/sql.py", line 280, in execute
    output = hook.run(
  File "/usr/local/lib/python3.8/dist-packages/airflow/providers/common/sql/hooks/sql.py", line 385, in run
    with closing(self.get_conn()) as conn:
  File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/providers/microsoft/mssql/hooks/mssql.py", line 98, in get_conn
    conn = pymssql.connect(
  File "src/pymssql/_pymssql.pyx", line 653, in pymssql._pymssql.connect
pymssql._pymssql.OperationalError: (18456, b"Login failed for user 'rohan1997'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n").

我非常确定我在气流连接中正确配置了连接.这是我在AirFlow-Airflow Connection Object中配置的连接对象.这是我的数据库连接字符串:Driver={ODBC Driver 18 for SQL Server};Server=tcp:mydbserver.database.windows.net,1433;Database=mydatabase;Uid=loginname;Pwd={your_password_here};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;

以下代码运行正常(无气流运行):-

import pymssql


try:
    # Establish a database connection
    connection = pymssql.connect(server='server.database.windows.net',port='1433', user='username', password='pwd', database='database')

    # Create a cursor object for executing SQL commands
    cursor = connection.cursor()

    # Example: Execute a SQL query
    cursor.execute("SELECT * FROM user_schema.demo")

    # Fetch and print results
    rows = cursor.fetchall()
    for row in rows:
        print(row)

    # Close the cursor and connection
    cursor.close()
    connection.close()

except pymssql.Error as e:
    print(f"Error: {str(e)}")

我现在想不出该怎么解决这个问题.

我已经try 将该VM的IP添加到azure SQL服务器防火墙.

推荐答案

最后,我终于明白了为什么我会得到这个例外: b"Login failed for user 'username'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n"通过试错法.

每当我们try 通过Airflow UI添加新连接时,我们都无法获得指定数据库名称的选项,如this image中所示.因此,SQL服务器不知道要使用哪个数据库,这就是它拒绝该连接的原因.

现在的解决方案是: Add the database name in the task as well as the query本身,如这image中所描绘的.这将告诉服务器您要使用的数据库的名称.因此,您将不会面临这一例外.

Python相关问答推荐

在Python和matlab中显示不同 colored颜色 的图像

Pythind 11无法弄清楚如何访问tuple元素

Gekko:Spring-Mass系统的参数识别

为什么符号没有按顺序添加?

无法通过python-jira访问jira工作日志(log)中的 comments

按顺序合并2个词典列表

如何在Django基于类的视图中有效地使用UTE和RST HTIP方法?

如何在Python数据框架中加速序列的符号化

部分视图的DataFrame

使用groupby方法移除公共子字符串

将pandas导出到CSV数据,但在此之前,将日期按最小到最大排序

如何合并两个列表,并获得每个索引值最高的列表名称?

python中csv. Dictreader. fieldname的类型是什么?'

ruamel.yaml dump:如何阻止map标量值被移动到一个新的缩进行?

Python Mercury离线安装

如何在验证文本列表时使正则表达式无序?

Scipy差分进化:如何传递矩阵作为参数进行优化?

仅取消堆叠最后三列

在不中断格式的情况下在文件的特定部分插入XML标签

我如何为测试函数的参数化提供fixture 生成的数据?如果我可以的话,还有其他 Select 吗?