我是dask的新手,正在try 在工人停工时执行一些后处理任务.

每次我需要运行大型任务时,都会创建集群.该任务输出了一组我想发送到AWS S3的文件.

我将如何实现一个"后处理"功能,在每个工人身上运行,以将任何日志(log)和输出发送到我的AWS S3?

提前谢谢

def complex():
    time.sleep(10)
    print('hard')
    print(get_worker().id)

    return 'hard'


class DaskWorkerHandler(WorkerPlugin):
    """
    Worker life-cycle handler
    """
    def __init__(self):
        self.worker_id = None

    def setup(self, worker):
        self.worker_id = worker.id

    def teardown(self, worker):
        print(f"tearing down - {self.worker_id}. thread={threading.get_ident()}")

        # some post processing on the worker server
        # eg. post files to S3 etc...


if __name__ == '__main__':
    cluster = LocalCluster(n_workers=5)
    print(f"cluster_name={cluster.name}")

    shutdown_handler = DaskWorkerHandler()
    client = Client(cluster)
    client.register_worker_plugin(shutdown_handler)

    future = client.submit(complex)
    result = future.result()

推荐答案

在workers运行时,您可以使用Python’s standard logging module记录任何您想要的日志(log),然后使用您编写的worker插件将这些日志(log)保存到拆卸时的S3存储桶中(有关更多详细信息,请查看logging in Dask上的文档).下面是一个例子:

import dask
from dask.distributed import Client, LocalCluster, get_worker
from dask.distributed.diagnostics.plugin import WorkerPlugin
import fsspec
import logging

def complex():
    logger = logging.getLogger("distributed.worker")
    logger.error("Got here")
    return 'hard'


class DaskWorkerHandler(WorkerPlugin):
    """Worker life-cycle handler."""
    def __init__(self):
        self.worker_id = None

    def setup(self, worker):
        self.worker_id = worker.id

    def teardown(self, worker):
        logs = worker.get_logs()
        # replace with S3 path
        with fsspec.open(f"worker-{self.worker_id}-logs.txt", "w") as f:
            f.write("\n".join([str(log) for log in logs]))


cluster = LocalCluster(n_workers=5)
client = Client(cluster)

shutdown_handler = DaskWorkerHandler()
client.register_worker_plugin(shutdown_handler)

future = client.submit(complex)
result = future.result()


client.close()
cluster.close()

Python-3.x相关问答推荐

为什么我必须在绘制椭圆时代码等于两次?''

Pandas 中每行的最大值范围

PythonPandas 创建一个列并添加到DataFrame

在BaseHTTPRequestHandler中填充和返回列表

无法使用xpath关闭selenium中的弹出窗口

Python 舍入数字不准确

在新数据帧上自动提取两个字符串 python 之间的相等性

!date 的命令无法从 jupyter notebook 运行

集合操作:应该只适用于集合,但适用于 dict_keys?

TimescaleDB:是否可以从 Python 调用create_hypertable?

基本 Flask 应用程序未运行(TypeError:模块中缺少必填字段type_ignores)

简单的 get/post 请求在 python 3 中被阻止,但在 python 2 中没有

Pylint 给我最后的新行丢失

cv2 python 没有 imread 成员

为什么在 Python 中不推荐使用 MutableString?

为什么中断比引发异常更快?

如何在 python 3.x 中禁用 ssl 判断?

当 None 被传递时,如何将默认值应用于 python 数据类字段?

将 numpy.float64 列表快速转换为 Python 中的浮点数

从 csv 中删除单行而不复制文件