我正在try 编写一个继承BaseSensorOperator和SSHOperator的自定义传感器.在传感器主体中,我重写了pokeexecute方法.

  class mysensor (BaseSensorOperator,SSHOperator):
      template_fields = ("my_param")
      def __init__ (
          self,
          my_param = None,
          *args,
          **kawags):
      self.my_param = my_param
      super().__init__(
          command = "some bash command",
          ssh_conn_id = "some connection"
          *args,
          **kwargs)

def poke(self, context):
    ... (implemented poke)
def execute():
    ...(implemented execute)
          

推荐答案

只需使用定制传感器即可实现poke功能.您不应该重写execute函数(除非您真的知道自己在做什么).执行功能在BaseSensorOperator中实现,这就是传感器的能力所在.

在你的情况下,你不应该使用SSHOperator,你应该直接使用SSHHook.

您的传感器应如下所示:

from airflow.providers.ssh.hooks.ssh import SSHHook

class Mysensor(BaseSensorOperator):
    ...

    def poke(self,context):
      hook = SSHHook(...)
      # add your sensor logic here

Python相关问答推荐

Pandas 在时间序列中设定频率

计算所有前面行(当前行)中列的值

分组数据并删除重复数据

比较两个二元组列表,NP.isin

当使用keras.utils.Image_dataset_from_directory仅加载测试数据集时,结果不同

在Google Colab中设置Llama-2出现问题-加载判断点碎片时Cell-run失败

log 1 p numpy的意外行为

Asyncio:如何从子进程中读取stdout?

Python逻辑操作作为Pandas中的条件

Python导入某些库时非法指令(核心转储)(beautifulsoup4."" yfinance)

在matplotlib中删除子图之间的间隙_mosaic

在Python中使用if else或使用regex将二进制数据如111转换为001""

人口全部乱序 - Python—Matplotlib—映射

基于另一列的GROUP-BY聚合将列添加到Polars LazyFrame

使用Openpyxl从Excel中的折线图更改图表样式

提高算法效率的策略?

如何在Great Table中处理inf和nans

在电影中向西北方向对齐""

如何在一组行中找到循环?

删除Dataframe中的第一个空白行并重新索引列