我正在开发一款Django 应用程序.我有一个API端点,如果被请求,它必须执行一个必须重复几次的函数(直到某个条件为真).我现在的处理方式是-

def shut_down(request):
  # Do some stuff
  while True:
    result = some_fn()
    if result:
      break
    time.sleep(2)

  return True

While I know that this is a terrible approach and that I shouldn't be blocking for 2 seconds, I can't figure out how to get around it.
This works, after say a wait of 4 seconds. But I'd like something that keeps the loop running in the background, and stop once some_fn returns True. (Also, it is certain that some_fn will return True)

EDIT -
Reading Oz123's response gave me an idea which seems to work. Here's what I did -

def shut_down(params):
    # Do some stuff
    # Offload the blocking job to a new thread

    t = threading.Thread(target=some_fn, args=(id, ), kwargs={})
    t.setDaemon(True)
    t.start()

    return True

def some_fn(id):
    while True:
        # Do the job, get result in res
        # If the job is done, return. Or sleep the thread for 2 seconds before trying again.

        if res:
            return
        else:
            time.sleep(2)

这对我来说很合适.这很简单,但我不知道多线程与Django配合使用的效率有多高.
如果有人能指出这一点的trap ,我们将不胜感激.

推荐答案

对于许多小项目来说,celery是矫枉过正的.对于那些可以使用schedule的项目,它非常容易使用.

使用此库,您可以让任何函数定期执行任务:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)

while True:
    schedule.run_pending()
    time.sleep(1) 

该示例以阻塞方式运行,但是如果您查看FAQ,您会发现您还可以在并行线程中运行任务,这样您就不会阻塞,并在不再需要时删除任务:

import threading    
import time 

from schedule import Scheduler

def run_continuously(self, interval=1):
    """Continuously run, while executing pending jobs at each elapsed
    time interval.
    @return cease_continuous_run: threading.Event which can be set to
    cease continuous run.
    Please note that it is *intended behavior that run_continuously()
    does not run missed jobs*. For example, if you've registered a job
    that should run every minute and you set a continuous run interval
    of one hour then your job won't be run 60 times at each interval but
    only once.
    """

    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):

        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                self.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.setDaemon(True)
    continuous_thread.start()
    return cease_continuous_run


Scheduler.run_continuously = run_continuously

下面是一个在类方法中使用的示例:

    def foo(self):
        ...
        if some_condition():
           return schedule.CancelJob  # a job can dequeue it

    # can be put in __enter__ or __init__
    self._job_stop = self.scheduler.run_continuously()

    logger.debug("doing foo"...)
    self.foo() # call foo
    self.scheduler.every(5).seconds.do(
        self.foo) # schedule foo for running every 5 seconds
    
    ...
    # later on foo is not needed any more:
    self._job_stop.set()
    
    ...
    
    def __exit__(self, exec_type, exc_value, traceback):
        # if the jobs are not stop, you can stop them
        self._job_stop.set()
    

Django相关问答推荐

DRF中是否有有效的更新有序数据的算法?

与django相关的预取n +1问题.我该怎么解决呢?

如何在对接合成时创建两个Postgres数据库?

在模板中自动添加变量

如何在Django模板中获取组中对象的整体计数器(&Q;)?

在 Django 中重组多对多字段

在Django测试get方法中获取HttpResponseNotFound

基于令牌的身份验证如何工作?

Django 模型:如何查找自动生成的字段列表

如何在 django 的列表中查找所有对象作为对象?

使用django提交后如何保留html表单数据?

Django-Registration:邮箱作为用户名

try 编辑/创建时,特定模型的 Django 管理员挂起(直到超时错误)

如何在 django 中处理这种竞争条件?

在 Django 过滤器语句中,__exact 和等号 (=) 有什么区别?

django.request 记录器没有传播到根目录?

断开模型的信号并在 django 中重新连接

为整个结果集向 Django Rest Framework 结果添加额外数据

Django 中的自定义 HTTP 标头

Django Debug Toolbar:了解时间面板( time panel)