我对气流是个新手.我已经在ubuntu VM上安装了Airflow,并配置了连接属性以连接到我的azure SQL数据库.我的DAG中有以下代码:
from airflow import DAG
from datetime import datetime
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
with DAG ('user_processing',start_date=datetime(2023,3,10), schedule_interval='@daily',catchup=False) as dag:
create_table = MsSqlOperator(
task_id = "create_table",
mssql_conn_id="mssql",
sql = """
CREATE TABLE user_schema.user_det (
id int,
first_name VARCHAR(200),
last_name VARCHAR(200)
)
"""
)
当我try 使用命令运行任务时-
airflow tasks test user_processing create_table 2023-10-02
个
,我得到了一个错误-
[2023-10-03T16:54:08.684+0000] {dagbag.py:539} INFO - Filling up the DagBag from /home/azureuser/airflow/dags
[2023-10-03T16:54:08.744+0000] {example_python_operator.py:89} WARNING - The virtalenv_python example task requires virtualenv, please install it.
[2023-10-03T16:54:08.754+0000] {tutorial_taskflow_api_virtualenv.py:29} WARNING - The tutorial_taskflow_api_virtualenv example DAG requires virtualenv, please install it.
[2023-10-03T16:54:08.876+0000] {example_local_kubernetes_executor.py:39} WARNING - Could not import DAGs in example_local_kubernetes_executor.py
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/airflow/example_dags/example_local_kubernetes_executor.py", line 37, in <module>
from kubernetes.client import models as k8s
ModuleNotFoundError: No module named 'kubernetes'
[2023-10-03T16:54:08.877+0000] {example_local_kubernetes_executor.py:40} WARNING - Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]
[2023-10-03T16:54:08.950+0000] {example_kubernetes_executor.py:38} WARNING - The example_kubernetes_executor example DAG requires the kubernetes provider. Please install it with: pip install apache-airflow[cncf.kubernetes]
[2023-10-03T16:54:09.115+0000] {workday.py:36} WARNING - Could not import pandas. Holidays will not be considered.
[2023-10-03T16:54:09.146+0000] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: user_processing.create_table __airflow_temporary_run_2023-10-03T14:54:14.207818+00:00__ [None]>
[2023-10-03T16:54:09.151+0000] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: user_processing.create_table __airflow_temporary_run_2023-10-03T14:54:14.207818+00:00__ [None]>
[2023-10-03T16:54:09.151+0000] {taskinstance.py:1359} INFO - Starting attempt 1 of 1
[2023-10-03T16:54:09.151+0000] {taskinstance.py:1428} WARNING - cannot record queued_duration for task create_table because previous state change time has not been saved
[2023-10-03T16:54:09.152+0000] {taskinstance.py:1380} INFO - Executing <Task(MsSqlOperator): create_table> on 2023-10-02 00:00:00+00:00
[2023-10-03T16:54:09.172+0000] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='user_processing' AIRFLOW_CTX_TASK_ID='create_table' AIRFLOW_CTX_EXECUTION_DATE='2023-10-02T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2023-10-03T14:54:14.207818+00:00__'
[2023-10-03T16:54:09.174+0000] {sql.py:274} INFO - Executing:
CREATE TABLE user_schema.user_det (
id int,
first_name VARCHAR(200),
last_name VARCHAR(200)
)
[2023-10-03T16:54:09.177+0000] {base.py:73} INFO - Using connection ID 'mssql' for task execution.
[2023-10-03T16:54:09.323+0000] {base.py:73} INFO - Using connection ID 'mssql' for task execution.
[2023-10-03T16:54:10.704+0000] {taskinstance.py:1935} ERROR - Task failed with exception
Traceback (most recent call last):
File "src/pymssql/_pymssql.pyx", line 647, in pymssql._pymssql.connect
File "src/pymssql/_mssql.pyx", line 2109, in pymssql._mssql.connect
File "src/pymssql/_mssql.pyx", line 701, in pymssql._mssql.MSSQLConnection.__init__
File "src/pymssql/_mssql.pyx", line 1818, in pymssql._mssql.maybe_raise_MSSQLDatabaseException
File "src/pymssql/_mssql.pyx", line 1835, in pymssql._mssql.raise_MSSQLDatabaseException
pymssql._mssql.MSSQLDatabaseException: (18456, b"Login failed for user 'rohan1997'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n")
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/airflow/providers/common/sql/operators/sql.py", line 280, in execute
output = hook.run(
File "/usr/local/lib/python3.8/dist-packages/airflow/providers/common/sql/hooks/sql.py", line 385, in run
with closing(self.get_conn()) as conn:
File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/providers/microsoft/mssql/hooks/mssql.py", line 98, in get_conn
conn = pymssql.connect(
File "src/pymssql/_pymssql.pyx", line 653, in pymssql._pymssql.connect
pymssql._pymssql.OperationalError: (18456, b"Login failed for user 'rohan1997'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n")
[2023-10-03T16:54:10.716+0000] {taskinstance.py:1398} INFO - Marking task as FAILED. dag_id=user_processing, task_id=create_table, execution_date=20231002T000000, start_date=, end_date=20231003T165410
Traceback (most recent call last):
File "src/pymssql/_pymssql.pyx", line 647, in pymssql._pymssql.connect
File "src/pymssql/_mssql.pyx", line 2109, in pymssql._mssql.connect
File "src/pymssql/_mssql.pyx", line 701, in pymssql._mssql.MSSQLConnection.__init__
File "src/pymssql/_mssql.pyx", line 1818, in pymssql._mssql.maybe_raise_MSSQLDatabaseException
File "src/pymssql/_mssql.pyx", line 1835, in pymssql._mssql.raise_MSSQLDatabaseException
pymssql._mssql.MSSQLDatabaseException: (18456, b"Login failed for user 'rohan1997'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n")
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/dist-packages/airflow/__main__.py", line 59, in main
args.func(args)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/utils/cli.py", line 113, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 620, in task_test
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 77, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1832, in run
self._run_raw_task(
File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1516, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1679, in _execute_task_with_callbacks
result = self._execute_task(context, task_orig)
File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1742, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/providers/common/sql/operators/sql.py", line 280, in execute
output = hook.run(
File "/usr/local/lib/python3.8/dist-packages/airflow/providers/common/sql/hooks/sql.py", line 385, in run
with closing(self.get_conn()) as conn:
File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/providers/microsoft/mssql/hooks/mssql.py", line 98, in get_conn
conn = pymssql.connect(
File "src/pymssql/_pymssql.pyx", line 653, in pymssql._pymssql.connect
pymssql._pymssql.OperationalError: (18456, b"Login failed for user 'rohan1997'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n").
我非常确定我在气流连接中正确配置了连接.这是我在AirFlow-Airflow Connection Object中配置的连接对象.这是我的数据库连接字符串:Driver={ODBC Driver 18 for SQL Server};Server=tcp:mydbserver.database.windows.net,1433;Database=mydatabase;Uid=loginname;Pwd={your_password_here};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;
以下代码运行正常(无气流运行):-
import pymssql
try:
# Establish a database connection
connection = pymssql.connect(server='server.database.windows.net',port='1433', user='username', password='pwd', database='database')
# Create a cursor object for executing SQL commands
cursor = connection.cursor()
# Example: Execute a SQL query
cursor.execute("SELECT * FROM user_schema.demo")
# Fetch and print results
rows = cursor.fetchall()
for row in rows:
print(row)
# Close the cursor and connection
cursor.close()
connection.close()
except pymssql.Error as e:
print(f"Error: {str(e)}")
我现在想不出该怎么解决这个问题.
我已经try 将该VM的IP添加到azure SQL服务器防火墙.