我正在使用以下堆栈:

  • Python 3.6
  • Celery v4.2.1(经纪人:RabbitMQ v3.6.0)
  • Django v2.0.4

根据Celery's documentation,在不同的队列上运行计划任务应该和为CELERY_ROUTES上的任务定义相应的队列一样简单,尽管如此,所有任务似乎都是在Celery 的默认队列上执行的.

这是my_app/settings.py上的配置:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
    },

}

这些任务只是测试路由的简单脚本:

文件app1/tasks.py:

from my_app.celery import app
import time


@app.task()
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)

文件app2/tasks.py:

from my_app.celery import app
import time


@app.task()
def app2_test():
    print('I am app2_test task!')
    time.sleep(10)

当我用所有需要的队列运行Celery 时:

celery -A my_app worker -B -l info -Q celery,queue1,queue2

RabbitMQ将显示只有默认队列"celery"正在运行任务:

sudo rabbitmqctl list_queues
# Tasks executed by each queue:
#  - celery 2
#  - queue1 0
#  - queue2 0

有人知道如何修复这种意外行为吗?

当做

推荐答案

我已经让它工作了,这里有几件事需要注意:

根据Celery's 4.2.0 documentationCELERY_ROUTES应该是定义队列路由的变量,但它只适用于使用CELERY_TASK_ROUTES的我.任务路由似乎独立于Celery 节拍,因此这只适用于手动计划的任务:

app1_test.delay()
app2_test.delay()

app1_test.apply_async()
app2_test.apply_async()

To make it w或k with Celery Beat, we just need to define the queues explicitly in the CELERY_BEAT_SCHEDULE variable. The final setup of the file my_app/settings.py would be as follows:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
        'options': {'queue': 'queue1'}
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
        'options': {'queue': 'queue2'}
    },

}

在这两个队列中播放Celery :

celery -A my_app w或ker -B -l INFO -Q queue1,queue2

哪里

  • -A: name of the project 或 app.
  • -B: Initiates the task scheduler Celery beat.
  • -l:定义日志(log)记录级别.
  • -Q: Defines the queues handled by this w或ker.

我希望这能为其他开发者节省一些时间.

Python-3.x相关问答推荐

Django 5.0.2和django_rest_framework

使用PANAS根据另两个列表中的值对一个列表中的字符串值进行分组

如何在输入正确的用户名和密码时添加按钮?

将f-字符串放置在f-字符串内

将字符串转换为python日期时间时出错

以编程方式关闭jupyterlab内核

在 sum() 中将字符串转换为 int (或 float)

检测点坐标 - opencv findContours()

替换 .txt 文件中的项目列表

Python base64.b32hexencode 未创建预期结果

如何将 OLS 趋势线添加到使用 updatemenus 显示数据子集的 plotly 散点图图形对象?

如何向 scikit-learn 函数添加类型提示?

Jupyter Notebook 拒绝打印一些字符串

使用自定义比较删除重复项

Pandas 的 EMA 与股票的 EMA 不匹配?

Tensorflow:ImportError:libcudnn.so.7:无法打开共享对象文件:没有这样的文件或目录

Python3 的超级和理解-> TypeError?

Python 3 中的连接列表

有没有一种标准方法来确保 python 脚本将由 python2 而不是 python3 解释?

Python:如何在 Windows 资源管理器中打开文件夹(Python 3.6.2、Windows 10)