Celery

x33g5p2x  于2022-05-05 转载在 其他  
字(4.5k)|赞(0)|评价(0)|浏览(128)


Celery异步任务框架如何使用?看这里

Celery

官网

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度

# 官网解释
"""
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
"""

Celery异步任务框架

"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""

Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景

异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务,比如每天数据统计

Celery的安装配置

安装:pip install celery

消息中间件:RabbitMQ/Redis

app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)

注意如果是windows平台还需要安装:pip install eventlet

两种celery任务结构:提倡用包管理,结构更清晰

方式一:简单使用

# 第一步:定义一个py文件(名字随意,celery_task) 
"""celery_task.py"""
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'   # 结果存储
broker = 'redis://127.0.0.1:6379/2'    # 消息中间件
app = Celery(__name__,broker=broker,backend=backend)  # __name__区分__main__

# 被它修饰,就变成了celery的任务
@app.task
def add(a,b):
    return a+b

# 第二步:提交任务(新建一个py文件:submit_task)
"""submit_task.py"""
from celery_task import add
# 异步调用
# 只是把任务提交到了redis中,但是没有执行,返回一个唯一标识,后期使用唯一标识去看任务执行结果
res=add.delay(33,41)
print(res)  # 2ddb35df-25f2-4f7c-8405-0bd7b1fa5645

# 第三步:任务执行单元执行,使用命令启动worker   
格式:celery -A 文件名  worker  -l 日志输出级别   (win平台+-P eventlet)
celery -A celery_task worker -l info -P eventlet
'''
celery_task:py文件的名字
-l info:日志输出级别是info 
-P eventlet  在win平台需要下载,pip  install  eventlet
'''
#如果队列里有任务,就会执行,如果没有任务,worker就等在这                                    
                                    
# 第四步:查询结果是否执行完成  get_result.py   
"""get_result.py"""
from celery_task import app

from celery.result import AsyncResult

id = '2ddb35df-25f2-4f7c-8405-0bd7b1fa5645'
if __name__ == '__main__':
    asy = AsyncResult(id=id, app=app)
    if asy.successful():
        result = asy.get()
        print(result)
    elif asy.failed():
        print('任务失败')
    elif asy.status == 'PENDING':
        print('任务等待中被执行')
    elif asy.status == 'RETRY':
        print('任务异常后正在重试')
    elif asy.status == 'STARTED':
        print('任务已经开始被执行')

方法二:包管理结构(推荐)

随便定义包名,但是包内必须要有celery.py

步骤

  • 创建包,包下写celery.py文件,文件内写celery任务
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.add_task'])
# include内写app管理的任务
  • 任意位置提交任务
from celery_task.add_task import add
from celery_task.celery import app
res=add.delay(100,200)
print(res)
# 提交任务delay在任意位置提交就可以,只需将celery任务导过来即可
  • 启动worker包管理只需去包所在的根路径启动就可以了,不需要切换路径到包内去启动worker,因为包下有celery.py了
scripts> celery -A celery_task worker -l info -P eventlet
  • 查看结果
from celery_task import app

from celery.result import AsyncResult

id = '2ddb35df-25f2-4f7c-8405-0bd7b1fa5645'
if __name__ == '__main__':
    asy = AsyncResult(id=id, app=app)
    if asy.successful():
        result = asy.get()
        print(result)
    elif asy.failed():
        print('任务失败')
    elif asy.status == 'PENDING':
        print('任务等待中被执行')
    elif asy.status == 'RETRY':
        print('任务异常后正在重试')
    elif asy.status == 'STARTED':
        print('任务已经开始被执行')

Celery主要处理三种任务

异步任务,延迟任务,定时任务

delay提交异步任务

上面的示例就是

apply_async提交延迟任务

# 其他不变,提交任务的时候,如下:
from celery_task.user_task import add
from datetime import datetime, timedelta
eta = datetime.utcnow() + timedelta(seconds=10)
# 参数传递需要使用args,传时间要使用时间对象eta,使用的是utc时间
mul.apply_async(args=(20, 50), eta=eta)

beat_schedule提交定时任务

定时任务需要启动beatworker

  • beat负责提交定时任务
  • worker负责提交celery任务
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.add_task'])
# include内写app管理的任务

# 时区
app.conf.timezone='Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc=False


#第一步:在celery.py中配置
# celery任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    '''
    任务名:{
    task:任务
    schedule:时间
    args:参数(函数参数)
    }
    '''
    'task-mul': {
        'task': 'celery_task.user_task.mul',
        'schedule': timedelta(seconds=3),  # 3s后
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (3, 15),
    },
    'task-add': {
        'task': 'celery_task.home_task.add',
        'schedule': timedelta(seconds=10),  # 10s后
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (3, 5),
    },

}

#第二步:启动beat(beat负责定时提交任务)
celery -A celery_task beat -l info
# 第三步:启动worker,任务就会被worker执行了
celery -A celery_task worker -l info -P eventlet

相关文章