Python 3.x、 Celery 4.十、

我有一个基于课堂的任务.

myproj/celery.py

from celery import Celery

# django settings stuff...

app = Celery('myproj')
app.autodiscover_tasks()

app1/tasks.py

import celery

class EmailTask(celery.Task):
    def run(self, *args, **kwargs):
        self.do_something()

如果我这样做:

$ celery worker -A myproj -l info
[tasks]
  . app2.tasks.debug_task
  . app2.tasks.test

因此,Celery decorator 可以注册任务,但基于类的任务没有注册.

How do I get the class-based tasks to register?

Update 1:

如果我把下面的几行加到app1/tasks.py

from myproj.celery import app
email_task = app.tasks[EmailTask.name]

.

$ celery worker -A myproj -l info
  File "myproj/app1/tasks.py", line 405, in <module>
    email_task = app.tasks[EmailTask.name]
  File "/usr/local/lib/python3.5/site-packages/celery/app/registry.py", line 19, in __missing__
    raise self.NotRegistered(key)
celery.exceptions.NotRegistered

Update 2:

我能够通过包装器同步执行我的任务(run).但是,我不能异步运行任务,即通过delay.

app1/tasks.py

@app.task
def email_task():
    """
    Wrapper to call class based task
    """
    task = EmailTask()
    # task.delay()  # Won't work!!!
    task.run()

.

$./manage.py shell
> from app1.tasks import EmailTask
> task1 = EmailTask()
> task1.run() # a-okay
> task2 = EmailTask()
> task2.delay() # nope
  <AsyncResult: 1c03bad9-169a-4a4e-a56f-7d83892e8bbc>

# And on the worker...
[2017-01-22 08:07:28,120: INFO/PoolWorker-1] Task app1.tasks.email_task[41e5bc7d-058a-400e-9f73-c853c0f60a2a] succeeded in 0.0701281649817247s: None
[2017-01-22 08:10:31,909: ERROR/MainProcess] Received unregistered task of type None.
The message has been ignored and discarded.

推荐答案

你可以找到here个完整的描述,但对我来说,这就足够了

from myapp.celery import app
app.tasks.register(MyTaskTask())

Python-3.x相关问答推荐

Pandas groupby基于索引的连续列值相等

为什么打印语句在Python多处理脚本中执行两次?

我的SELECT函数搜索的是列,而不是列中的数据.我怎么才能让它搜索数据呢?

如何验证具有内部json字符串的json字符串?

如何定义既允许固定单词又允许模式的pydanti.BaseModel?

将strid()映射到Pandas DataFrame中的字符串不会更改NaN条目,但仍然声称它们不同?

查找值始终为零的行 pandas

CSV-DAT 转换时将引号添加到数据中

Jupyter Notebook 拒绝打印一些字符串

如何从脚本中提取 PDF 文档的标题以进行重命名?

运行 PyCharm 测试时如何解决django.core.exceptions.ImproperlyConfigured:找不到 GDAL 库?

理解 Keras 的 ImageDataGenerator 类中的 `width_shift_range` 和 `height_shift_range` 参数

Python3 mysqlclient-1.3.6(又名 PyMySQL)的用法?

为什么`multiprocessing.Queue.get`这么慢?

Asyncio RuntimeError:事件循环已关闭

ImportError:无法在 PyQt5 中导入名称QStringList

Python 3.5:async with导致 SyntaxError.为什么?

在 Alembic 迁移期间更新列内容

带有数千个逗号刻度标签的 MatPlotLib 美元符号

如何使用 Celery 和 Django 将任务路由到不同的队列