celery

概念

celery是一个简单,灵活且可靠的处理大量消息的分布式系统
。专注于实时处理的异步任务队列,同时也支持任务调度。

相关信息

  • 分布式系统
    一个系统应用有很多系统组件:web服务器,数据库,消息中间键,缓存等,
    把这些组件架构在不同的服务器上,然后这些不同服务器上的不同组件之间
    通过消息中间键通信的方式来实现这种协调工作。(当用户访问时,要做到
    和访问一台服务器的用户感知是一样的)

  • 分布式系统优点
    负载均衡,避免单点故障

  • 异步任务
    asynic协程执行,利用io操作等待时间执行别的任务。(当前任务await时)

组成架构

celery的架构由三部分组成,本质就是使用进程、协程(asyncio)封装处理异步任务,
一个worker就是一个进程,一个进程里面开多个协程。

  • 1.消息中间件
    celery本身不提供消息服务,但直接可以集成rabbitMQ、redis等

  • 2.任务执行单元
    worker是celery提供的任务执行单元,worker并发的运行在分布式的系统节点中。

  • 3.任务执行结果存储
    task result store用来存储执行任务的结果,celery
    支持以不同的方式储存任务的结果,包括AMQP、redis、mysql等

  • 4.另外celery还支持不同的并发和序列化手段

    1
    2
    3
    4
    1.并发
    prefork、eventlet、gevent、threads
    2.序列化
    pickle、json、yaml、zlib等

celery的异步任务场景

生产者连接消息中间件,创建队列,向指定队列插入数据
消费者连接消息中间件,监听队列,内部实现callback,
如果有异步操作需要在callback里面使用并发技术处理。
(以上这些流程celery已经帮我们做好声明,无需自己定义)
celery的定时任务场景(每天几点执行)。

  • 1.比如(django,简单python项目)作为生产消息的生产者;
  • 2.rabbitMQ、redis作为消息中间件消息队列存放生产者生产出来的消息;
  • 3.celery消费者,作为真正的异步任务执行单元,异步从消息队列中取出消息执行;
  • 4.redis、sql等作为结果存储单元。

基础异步模型

消费者模块

  • 1.celery如何确定任务及监听对应任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    新建c1.py(consumer)
    import celery
    import time

    backend = 'redis://10.167.219.250:8001/1'
    broker = 'redis://10.167.219.250:8001/2'


    # 名字,数据库存储,消息中间件
    cel = celery.Celery('test', backend=backend, broker=broker)
    @cel.task(name='utils.myCelery.send_email')

    @cel.task
    def send_email(name):
    print("向%s发送邮件..."%name)
    time.sleep(5)
    print("向%s发送邮件完成..."%name)
    return "ok"

    @cel.task
    def send_msg(name):
    print("向%s发送短信..."%name)
    time.sleep(5)
    print("向%s发送短信完成..."%name)
    return "ok"
    注意名字的一致性
  • 2.启动消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    执行celery –A c1(python文件名) worker –l INFO
    E:\春江花月夜\笔记\celery>celery -A c1 worker -l INFO

    -------------- celery@xxxxxxx v5.0.0 (singularity)
    --- ***** -----
    -- ******* ---- Windows-10-10.0.17763-SP0 2020-10-09 15:40:49
    - *** --- * ---
    - ** ---------- [config]
    - ** ---------- .> app: test:0x2844f1f8400
    - ** ---------- .> transport: redis://10.167.219.250:8001/2
    - ** ---------- .> results: redis://10.167.219.250:8001/1
    - *** --- * --- .> concurrency: 4 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
    -------------- [queues]
    .> celery exchange=celery(direct) key=celery


    [tasks]
    . c1.send_email
    . c1.send_msg

    [2020-10-09 15:40:49,912: INFO/MainProcess] Connected to redis://10.167.219.250:8001/2
    [2020-10-09 15:40:49,931: INFO/MainProcess] mingle: searching for neighbors
    [2020-10-09 15:40:50,900: INFO/SpawnPoolWorker-1] child process 2076 calling self.run()
    [2020-10-09 15:40:50,915: INFO/SpawnPoolWorker-2] child process 7964 calling self.run()
    [2020-10-09 15:40:50,985: INFO/MainProcess] mingle: all alone
    [2020-10-09 15:40:51,047: INFO/MainProcess] celery@xxxxxxx ready.
    [2020-10-09 15:40:51,069: INFO/SpawnPoolWorker-3] child process 8856 calling self.run()
    [2020-10-09 15:40:51,192: INFO/SpawnPoolWorker-4] child process 17216 calling self.run()
    监听成功

生产者模块

  • 新建product.py
    导入我们在celery task注册好的方法并delay调用,比如
    在views中,存储数据后想发送邮件信息。可以使用celery
1
2
3
4
5
6
7
8
from c1 import send_email, send_msg
# delay(是cel.task中的方法)放入消息中間件,并傳入必要參數
result = send_email.delay('yuan')
# 这时候的结果已经存入了数据库redis,我们需要通过这个id去访问结果
print(result.id)

result = send_msg.delay('zhang')
print(result.id)

结果储存模块

通过执行的id进行查询,判断执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from celery.result import AsyncResult
from c1 import cel
async_result = AsyncResult(id="e78e6274-46a7-4fb1-9ae6-2084fcdad58d", app=cel)
if async_result.successful():
result = async_result.get()
print(result)
elif async_result.failed():
print('执行失败')
elif async_result.status == 'PENDING':
print('任务等待中被执行')
elif async_result.status == 'RETRY':
print('任务异常后重试')
elif async_result.status == 'STARTED':
print('任务已经开始被执行')

多任务异步模型

消费者

  • 1.新建consumer消费者模块

    1
    2
    3
    4
    5
    6
    python packages
    celery_tasks
    __init__.py
    celery.py
    task01.py
    task02.py
  • 2.setting配置celery.py
    这里的改变主要是需要include所有的需要的任务包,每
    个任务会开启一个进程,在进程中会开启协程运行此进程中的每个任务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import celery
    import time

    backend = 'redis://10.167.219.250:8001/1'
    broker = 'redis://10.167.219.250:8001/2'


    # 名字,数据库存储,消息中间件, 多了一个include放置你需要的所有task
    cel = celery.Celery('test', backend=backend, broker=broker, include=['celery_tasks.task01', 'celery_tasks.task02'])

    # 基本配置
    # 时区
    cel.conf.timezone = 'Asia/Shanghai'
    # 是否使用utc
    cel.conf.enable_utc = False
  • 3.任务模块

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    task01
    import time
    from .celery import cel

    @cel.task
    def send_email(name):
    print("向%s发送邮件..."%name)
    time.sleep(5)
    print("向%s发送邮件完成..."%name)
    return "邮件完成!"

    task02
    import time
    from .celery import cel

    @cel.task
    def send_msg(name):
    xxx
    print("向%s发送短信..."%name)
    time.sleep(5)
    print("向%s发送短信完成..."%name)
    return "短信完成"

    运行celery -A celery_tasks worker -l INFO –P eventlet
    这里-P指定开启协程的方式
    通过-C控制并发数
    celery -A celery_tasks worker –l INFO –C 5
    (当存在celery遗留可以删掉celery)这里-C指定5个并发执行。
    https://blog.csdn.net/weixin_44235861/article/details/108842685

    生产者和结果基本与基础模型相似

Celery简单定时任务

消费者

消费者都是一样的监听消息中间件,cel的装饰器包装相关任务方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 定时任务
import time
import celery
# 名字,数据库存储,消息中间件, 多了一个include放置你需要的所有task
backend = 'redis://10.167.219.250:8001/1'
broker = 'redis://10.167.219.250:8001/2'


cel = celery.Celery('test', backend=backend,
broker=broker)

# 基本配置
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用utc
cel.conf.enable_utc = False


@cel.task
def send_email(name):
print('正在发送邮件给%s'%name)
time.sleep(2)
print('发送邮件给%s完成'%name)
  • 开启消费者
    1
    celery -A python文件名 worker -l INFO

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 定时任务
from c1 import send_email
from datetime import datetime

# 方式一
v1 = datetime(2019, 10, 9, 17, 18, 00)
print(v1)
# 需要使用国标时间
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
# 用apply_async执行定时任务, 用delay执行异步任务
result = send_email.apply_async(args=["egon",], eta=v2)
print(result.id)

# 方式二
ctime = datetime.now()
print(ctime)
# 需要使用国标时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
# 用apply_async执行定时任务, 用delay执行异步任务
result = send_email.apply_async(args=["egon",],
eta=utc_ctime + timedelta(seconds=10))
print(result.id)

celery执行复杂定时任务

这种定时执行的任务和生产者无关,因为是真正的定时执行的任务,每隔多久就会执行一次。

消费者&生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import celery
import time

backend = 'redis://10.167.219.250:8001/1'
broker = 'redis://10.167.219.250:8001/2'

# 名字,数据库存储,消息中间件, 多了一个include放置你需要的所有task
cel = celery.Celery('test', backend=backend, broker=broker,
include=['celery_tasks.task01', 'celery_tasks.task02'])

# 基本配置
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用utc
cel.conf.enable_utc = False

from datetime import timedelta
# 构建定时任务调度器
cel.conf.beat_schedule = {
# 名称每个定时任务的名称,自己起
'add-every-6-seconds': {
'task': 'celery_tasks.task01.send_email',
# 每隔六秒执行一次
# 'schedule': 6.0,
# 'schedule': crontab(minutes="*/6"),
'schedule': timedelta(seconds=6),
# 传递参数
'args': ('张三', )
}
}

需要开启服务生产者执行定时任务

这个会读取配置文件中的定时任务调度器,通过里面配置的schedule时间定时向redis
(消息中间件)中插入任务(这个时候已经不是自定义的生产者执行插入任务,而是我们
单独开启的一个进程定时插入)

1
celery -A +配置文件路径 beat -l INFO 

还需要另一个新的进程(消费者)去处理队列

1
celery -A +配置文件路径 woker -l INFO

django配置celery使用

django利用celery完成一些定时和异步操作。

异步任务调度

  • 1.消费者+task(打包到django根目录)

  • 2.在django项目下的配置文件夹同级目录,创建pakages,打包成项目文件。

    1
    2
    3
    4
    5
    新建config.py
    # _*_ coding: utf-8 _*_
    __author__ = '春江花月夜oo'
    CELERY_RESULT_BACKEND = 'redis://10.167.218.136:6380/14'
    BROKER_URL = 'redis://10.167.218.136:6380/15'
  • 3.把配置和主调用函数分开,实现解耦

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    新建main.py
    # _*_ coding: utf-8 _*_
    __author__ = '春江花月夜oo'

    import os
    import sys
    import django
    from celery import Celery

    app = Celery("myCelery")

    pwd = os.path.dirname(os.path.realpath(__file__))
    sys.path.append(pwd+"../")
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
    django.setup()
    app.config_from_object('mycelery.config')

    app.autodiscover_tasks(["mycelery.sms", "mycelery.email"])
    这里需要
    1.加载django环境变量
    2.将配置文件的消息中间件、数据库加载到app
    3.监控tasks.py(需要监控的文件夹(目录))
  • 4.新建python packages任务包email和sms

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    在包里面新建tasks.py
    from ..main import app
    import time
    import logging

    log = logging.getLogger('django')


    @app.task
    def send_sms(mobile):
    print("向手机%s发送短信" % mobile)
    time.sleep(5)
    print("向手机%s发送短信成功" % mobile)
    return "send_sms ok"


    @app.task
    def send_sms2(mobile):
    print("向手机%s发送短信2" % mobile)
    time.sleep(5)
    print("向手机%s发送短信2成功" % mobile)
    return "send_sms2 ok"
  • 5.异步任务调度就可把tasks中的方法放在view中调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    view.py
    def test(request):
    # 异步任务调度
    send_sms.delay('110')
    send_sms2.delay('119')

    # 定时任务调度
    ctime = datetime.now()
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(seconds=10)
    task_time = utc_ctime + time_delay
    result = send_sms.apply_async(["911", ], eta=task_time)
    print(result.id)
    return HttpResponse("Ok")
  • 6.记得配置url
    这个时候我们调用url的view.test,在我们的view中除了执行主业务,
    现在还在异步的执行发送信息的业务,他不会影响主业务执行效率。
    但这里只是把任务发送到了消息队列中还需要开启worker才能处理业务。

如果我们需要什么定时任务,其实就是python脚本,
借用一下django的model存储数据,和django没啥关系。

分享到