今天需要给公司的那个django项目弄个定时任务,因为这个服务是在容器里面运行,就发现弄个定时任务特别麻烦
最开始想的是开一个线程,然后在线程里面直接用time定时去执行任务,发现过了一分钟之后那个线程就结束了,应该是被那个程序主动结束了
然后用python的schedule定时任务库也不行,进程也会被杀
那想着就用django-crontab这个库吧,发现我们的服务是在容器里面运行的,容器默认是不启用crontab服务的,如果启用还需要重做镜像,比较麻烦
然后业内通用的做法是使用celery,需要redis的依赖
先安装依赖
pip install celery redis django-celery-beat
项目结构
你的项目名/
├── 项目名/
│ ├── __init__.py # 必须修改
│ ├── celery.py # 新建
│ ├── settings.py
│ └── urls.py
└── myapp/
└── tasks.py # 任务文件
配置celery.py
# 项目名/celery.py
import os
from celery import Celery
from celery.schedules import crontab
# 指定Django配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '项目名.settings')
app = Celery('项目名')
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现任务
app.autodiscover_tasks()
# 定时任务配置
app.conf.beat_schedule = {
'每分钟执行一次': {
'task': 'myapp.tasks.my_task',
'schedule': crontab(minute='*/1'), # 每分钟
},
}
配置init.py
# 项目名/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
settings.py加配置
# Redis作为中间人
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
# redis如果有密码
# CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
编写任务
# myapp/tasks.py
from celery import shared_task
@shared_task
def my_task():
print("Celery 定时任务执行成功!")
# 你的业务逻辑
这个tasks如果是目录的话,必须要在init.py里面导入
from .task1 import handler1
from .task2 import handler2
__all__ = [
'handler1',
'handler2',
]
启动服务
# 终端1:启动worker(执行任务)
celery -A 项目名 worker -l info
# 终端2:启动beat(定时调度)
celery -A 项目名 beat -l info