概念
celery是一个简单,灵活且可靠的处理大量消息的分布式系统
。专注于实时处理的异步任务队列,同时也支持任务调度。
相关信息
分布式系统
一个系统应用有很多系统组件:web服务器,数据库,消息中间键,缓存等,
把这些组件架构在不同的服务器上,然后这些不同服务器上的不同组件之间
通过消息中间键通信的方式来实现这种协调工作。(当用户访问时,要做到
和访问一台服务器的用户感知是一样的)分布式系统优点
负载均衡,避免单点故障异步任务
asynic协程执行,利用io操作等待时间执行别的任务。(当前任务await时)
组成架构
celery的架构由三部分组成,本质就是使用进程、协程(asyncio)封装处理异步任务,
一个worker就是一个进程,一个进程里面开多个协程。
1.消息中间件
celery本身不提供消息服务,但直接可以集成rabbitMQ、redis等2.任务执行单元
worker是celery提供的任务执行单元,worker并发的运行在分布式的系统节点中。3.任务执行结果存储
task result store用来存储执行任务的结果,celery
支持以不同的方式储存任务的结果,包括AMQP、redis、mysql等4.另外celery还支持不同的并发和序列化手段
1
2
3
41.并发
prefork、eventlet、gevent、threads
2.序列化
pickle、json、yaml、zlib等
celery的异步任务场景
生产者连接消息中间件,创建队列,向指定队列插入数据
消费者连接消息中间件,监听队列,内部实现callback,
如果有异步操作需要在callback里面使用并发技术处理。
(以上这些流程celery已经帮我们做好声明,无需自己定义)
celery的定时任务场景(每天几点执行)。
- 1.比如(django,简单python项目)作为生产消息的生产者;
- 2.rabbitMQ、redis作为消息中间件消息队列存放生产者生产出来的消息;
- 3.celery消费者,作为真正的异步任务执行单元,异步从消息队列中取出消息执行;
- 4.redis、sql等作为结果存储单元。
基础异步模型
消费者模块
1.celery如何确定任务及监听对应任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26新建c1.py(consumer)
import celery
import time
backend = 'redis://10.167.219.250:8001/1'
broker = 'redis://10.167.219.250:8001/2'
# 名字,数据库存储,消息中间件
cel = celery.Celery('test', backend=backend, broker=broker)
@cel.task(name='utils.myCelery.send_email')
@cel.task
def send_email(name):
print("向%s发送邮件..."%name)
time.sleep(5)
print("向%s发送邮件完成..."%name)
return "ok"
@cel.task
def send_msg(name):
print("向%s发送短信..."%name)
time.sleep(5)
print("向%s发送短信完成..."%name)
return "ok"
注意名字的一致性2.启动消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31执行celery –A c1(python文件名) worker –l INFO
E:\春江花月夜\笔记\celery>celery -A c1 worker -l INFO
-------------- celery@xxxxxxx v5.0.0 (singularity)
--- ***** -----
-- ******* ---- Windows-10-10.0.17763-SP0 2020-10-09 15:40:49
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: test:0x2844f1f8400
- ** ---------- .> transport: redis://10.167.219.250:8001/2
- ** ---------- .> results: redis://10.167.219.250:8001/1
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. c1.send_email
. c1.send_msg
[2020-10-09 15:40:49,912: INFO/MainProcess] Connected to redis://10.167.219.250:8001/2
[2020-10-09 15:40:49,931: INFO/MainProcess] mingle: searching for neighbors
[2020-10-09 15:40:50,900: INFO/SpawnPoolWorker-1] child process 2076 calling self.run()
[2020-10-09 15:40:50,915: INFO/SpawnPoolWorker-2] child process 7964 calling self.run()
[2020-10-09 15:40:50,985: INFO/MainProcess] mingle: all alone
[2020-10-09 15:40:51,047: INFO/MainProcess] celery@xxxxxxx ready.
[2020-10-09 15:40:51,069: INFO/SpawnPoolWorker-3] child process 8856 calling self.run()
[2020-10-09 15:40:51,192: INFO/SpawnPoolWorker-4] child process 17216 calling self.run()
监听成功
生产者模块
- 新建product.py
导入我们在celery task注册好的方法并delay调用,比如
在views中,存储数据后想发送邮件信息。可以使用celery
1 | from c1 import send_email, send_msg |
结果储存模块
通过执行的id进行查询,判断执行结果
1 | from celery.result import AsyncResult |
多任务异步模型
消费者
1.新建consumer消费者模块
1
2
3
4
5
6python packages
celery_tasks
__init__.py
celery.py
task01.py
task02.py2.setting配置celery.py
这里的改变主要是需要include所有的需要的任务包,每
个任务会开启一个进程,在进程中会开启协程运行此进程中的每个任务。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15import celery
import time
backend = 'redis://10.167.219.250:8001/1'
broker = 'redis://10.167.219.250:8001/2'
# 名字,数据库存储,消息中间件, 多了一个include放置你需要的所有task
cel = celery.Celery('test', backend=backend, broker=broker, include=['celery_tasks.task01', 'celery_tasks.task02'])
# 基本配置
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用utc
cel.conf.enable_utc = False3.任务模块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29task01
import time
from .celery import cel
@cel.task
def send_email(name):
print("向%s发送邮件..."%name)
time.sleep(5)
print("向%s发送邮件完成..."%name)
return "邮件完成!"
task02
import time
from .celery import cel
@cel.task
def send_msg(name):
xxx
print("向%s发送短信..."%name)
time.sleep(5)
print("向%s发送短信完成..."%name)
return "短信完成"
运行celery -A celery_tasks worker -l INFO –P eventlet
这里-P指定开启协程的方式
通过-C控制并发数
celery -A celery_tasks worker –l INFO –C 5
(当存在celery遗留可以删掉celery)这里-C指定5个并发执行。
https://blog.csdn.net/weixin_44235861/article/details/108842685生产者和结果基本与基础模型相似
Celery简单定时任务
消费者
消费者都是一样的监听消息中间件,cel的装饰器包装相关任务方法。
1 | # 定时任务 |
- 开启消费者
1
celery -A python文件名 worker -l INFO
生产者
1 | # 定时任务 |
celery执行复杂定时任务
这种定时执行的任务和生产者无关,因为是真正的定时执行的任务,每隔多久就会执行一次。
消费者&生产者
1 | import celery |
需要开启服务生产者执行定时任务
这个会读取配置文件中的定时任务调度器,通过里面配置的schedule时间定时向redis
(消息中间件)中插入任务(这个时候已经不是自定义的生产者执行插入任务,而是我们
单独开启的一个进程定时插入)
1 | celery -A +配置文件路径 beat -l INFO |
还需要另一个新的进程(消费者)去处理队列
1 | celery -A +配置文件路径 woker -l INFO |
django配置celery使用
django利用celery完成一些定时和异步操作。
异步任务调度
1.消费者+task(打包到django根目录)
2.在django项目下的配置文件夹同级目录,创建pakages,打包成项目文件。
1
2
3
4
5新建config.py
# _*_ coding: utf-8 _*_
__author__ = '春江花月夜oo'
CELERY_RESULT_BACKEND = 'redis://10.167.218.136:6380/14'
BROKER_URL = 'redis://10.167.218.136:6380/15'3.把配置和主调用函数分开,实现解耦
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22新建main.py
# _*_ coding: utf-8 _*_
__author__ = '春江花月夜oo'
import os
import sys
import django
from celery import Celery
app = Celery("myCelery")
pwd = os.path.dirname(os.path.realpath(__file__))
sys.path.append(pwd+"../")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
django.setup()
app.config_from_object('mycelery.config')
app.autodiscover_tasks(["mycelery.sms", "mycelery.email"])
这里需要
1.加载django环境变量
2.将配置文件的消息中间件、数据库加载到app
3.监控tasks.py(需要监控的文件夹(目录))4.新建python packages任务包email和sms
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22在包里面新建tasks.py
from ..main import app
import time
import logging
log = logging.getLogger('django')
@app.task
def send_sms(mobile):
print("向手机%s发送短信" % mobile)
time.sleep(5)
print("向手机%s发送短信成功" % mobile)
return "send_sms ok"
@app.task
def send_sms2(mobile):
print("向手机%s发送短信2" % mobile)
time.sleep(5)
print("向手机%s发送短信2成功" % mobile)
return "send_sms2 ok"
5.异步任务调度就可把tasks中的方法放在view中调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14view.py
def test(request):
# 异步任务调度
send_sms.delay('110')
send_sms2.delay('119')
# 定时任务调度
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
result = send_sms.apply_async(["911", ], eta=task_time)
print(result.id)
return HttpResponse("Ok")6.记得配置url
这个时候我们调用url的view.test,在我们的view中除了执行主业务,
现在还在异步的执行发送信息的业务,他不会影响主业务执行效率。
但这里只是把任务发送到了消息队列中还需要开启worker才能处理业务。
如果我们需要什么定时任务,其实就是python脚本,
借用一下django的model存储数据,和django没啥关系。