我是dask的新手,正在try 在工人停工时执行一些后处理任务.
每次我需要运行大型任务时,都会创建集群.该任务输出了一组我想发送到AWS S3的文件.
我将如何实现一个"后处理"功能,在每个工人身上运行,以将任何日志(log)和输出发送到我的AWS S3?
提前谢谢
def complex():
time.sleep(10)
print('hard')
print(get_worker().id)
return 'hard'
class DaskWorkerHandler(WorkerPlugin):
"""
Worker life-cycle handler
"""
def __init__(self):
self.worker_id = None
def setup(self, worker):
self.worker_id = worker.id
def teardown(self, worker):
print(f"tearing down - {self.worker_id}. thread={threading.get_ident()}")
# some post processing on the worker server
# eg. post files to S3 etc...
if __name__ == '__main__':
cluster = LocalCluster(n_workers=5)
print(f"cluster_name={cluster.name}")
shutdown_handler = DaskWorkerHandler()
client = Client(cluster)
client.register_worker_plugin(shutdown_handler)
future = client.submit(complex)
result = future.result()