由于自制的电子书网站需要新增电子书推送的功能,推送的意思实际上就是发送电子书附件到用户指定的邮箱,推送的过程需要消耗一定的时间,同时考虑到自己的服务器上不存储电子书,需要首先将电子书缓存到本地服务器再执行推送,所以服务器端不能直接同步执行此任务。查了一点资料了解到可以通过Celer来实现此项功能,而网上关于Django和Celery整合的讲解并不清楚,所以在这里做一详细的使用说明记录。
Celery是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他的主机上运行,一般被用来实现异步任务(asyn task)和定时任务(crontab),它的架构组成如下图:
一个完整的Celery任务包含以下4个模块:
Task:任务模块,包含异步任务(Async Task)和定时任务(Celery Beat);
Broker:消息中间件,实际上是个任务调度队列,接收任务模块发过来的消息(即任务),将其存入队列,Celery本身不提供队列服务,需要借助第三方实现,官方推荐使用RabbitMQ和Redis,实际上Mysql、MongoDB等也是可以的;
Worker:任务执行模块,负责实时监控消息队列,获取队列中的任务并执行;
Backend:任务执行结果存储模块,用以存储任务的执行结果,以供查询。同消息中间件一样,可以使用RabbitMQ和Redis等。
本次使用的是Celery的异步任务功能,所以下面着重是异步任务的使用说明。
一个Celery任务的执行步骤为:
创建一个Celery实例——>启动一个异步任务(通常在业务逻辑层触发)——>任务被发往消息中间件并被调度——>任务执行模块获取任务并执行它——>执行结果被存储。
开发环境
CentOS Linux release 7.5.1804 (Core)
Django1.11
Python3.6
Celery3.1.26
Django-celery3.2.2
RabbitMQ3.3.5(在这里Broker和Backend都使用的是RabbitMQ)
配置Celery环境
修改Django项目的settings.py
文件:1
2
3
4
5
6
7
8
9
10
11
12
13...
import djcelery
djcelery.setup_loader() #加载配置
BROKER_URL = 'amqp://guest@localhost//' # RabbitMQ的配置,如使用的是其他,请做相应配置,下同
CELERY_RESULT_BACKEND = 'amqp://guest@localhost//'
CELERY_TIMEZONE='Asia/Shanghai' #时区设置不对会影响定时任务的执行
INSTALLED_APPS - (
...
'djcelery', #增加djcelery应用
)
...
创建celery.py文件并修改init.py
在settings.py
文件同目录下(也即Django项目文件目录下)创建celery.py
文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23from __future__ import absolute_import # 注意必须放在文件最前面
from celery import Celery
import kindle.settings
import os
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'xxx.settings') # xxx为Django(注意不是app)项目
app = Celery('xxx') # xxx同上
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: xxx.settings.INSTALLED_APPS) # xxx同上,这句意思是遍历Django项目下所有app下的tasks(也就是说,对于有多个app的Django项目,每个app可以创建自己的tasks)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
修改同一目录下的__init__.py
文件1
2
3
4
5from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
import pymysql
__all__ = ('celery_app',)
创建tasks.py
在需要tasks的app目录下创建tasks.py
文件1
2
3
4
5
6
7
8
9from celery import task, platforms
platforms.C_FORCE_ROOT = True
def test(x, y):
...
# 编写自己的task
return ...
同步数据库
1 | python manage.py makemigrations |
启动celery worker进程
注意首先还要启动RabbitMQ-server和Django项目1
python manage.py celery worker -c 6 -l debug
启动后通过终端Terminal就可以看到celery的执行效果了。但是一旦断开terminal连接,celery也随之断开,这在实际使用中是不行的,所以还要解决celery后台运行的问题。
Celery后台运行
有关Celery后台运行的说明,官方有介绍:Daemonization,有多种方法可以实现Celery的后台运行,在这里只使用其中一种方法:Generic init-scripts
首先,创建/etc/init.d/celeryd
文件,并写入以下内容:https://github.com/celery/celery/blob/3.1/extra/generic-init.d/celeryd
然后,创建配置文件/etc/default/celeryd
,并写入以下内容1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35# Names of nodes to start
# most people will only start one node:
CELERYD_NODES="worker"
# but you can also start multiple and configure settings
# for each in CELERYD_OPTS
#CELERYD_NODES="worker1 worker2 worker3"
# alternatively, you can specify the number of nodes to start:
#CELERYD_NODES=10
# Absolute or relative path to the 'celery' command:
CELERY_BIN="xxx/env/bin/celery" # 需要修改为celery的安装bin路径
#CELERY_BIN="/virtualenvs/def/bin/celery"
# App instance to use
# comment out this line if you don't use an app
# CELERY_APP="celery_tasks.main" # ====================celery 项目的相对路径
# or fully qualified:
CELERY_APP="xxx" # 修改为Django项目名
# Where to chdir at start.
CELERYD_CHDIR="xxx" # 修改为Django项目路径
# Extra command-line arguments to the worker
CELERYD_OPTS="--time-limit=300 --concurrency=8"
# Configure node-specific settings by appending node name to arguments:
#CELERYD_OPTS="--time-limit=300 -c 8 -c:worker2 4 -c:worker3 2 -Ofair:worker1"
# Set logging level to DEBUG
#CELERYD_LOG_LEVEL="DEBUG"
# %n will be replaced with the first part of the nodename.
CELERYD_LOG_FILE="/home/Projects/launch_file/scripts/celery_worker.log"
CELERYD_PID_FILE="/home/Projects/launch_file/scripts/%n.pid"
# Workers should run as an unprivileged user.
# You need to create this user manually (or you can choose
# a user/group combination that already exists (e.g., nobody).
CELERYD_USER="root"
CELERYD_GROUP="root"
# If enabled pid and log directories will be created if missing,
# and owned by the userid/group configured.
CELERY_CREATE_DIRS=1
然后就可以通过/etc/init.d/celeryd {start|stop|restart|status}
命令来实现Celery的启动、停止、重启和状态查询了,这里启动后就可以后台运行了。
- 为了避免开机时需要手动启动,可以将启动命令添加为开机执行。