Celery 通过消息进行通信,用专用的工作线程不断监视任务队列以执行新工作。
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
安装:
pip install celery
# 在windows操作系统上还要安装eventlet
pip install eventlet
# 安装redis插件
pip install redis
工作流程:
- 定义一个Celery 应用实例,称之为app,导入任务函数,可添加个性化配置
- 编写任务函数,通过@app.task装饰一下,这个是消费者核心代码。
- 在需要使用异步任务的地方(生产者),调用之前编写的任务函数。先导入,再使用,使用delay方法或apply_async方法。
- 生产者调用任务时,马上向app配置的任务队列发送任务,其实就是任务函数名和参数。此时生产者马上获取一个任务ID——taskid,主程序继续运行,后续可以拿这个taskid来查询任务执行结果。
- Worker实时监听任务队列并取出任务,分配空闲worker来执行。
- 定时任务,其实就是有人定时向消息队列发送异步任务
注意点:
- Celery 和项目框架无关,它是一个通用Python库,Celery Worker 命令是启动消费者,监听消息队列,和是否Django或者flask等项目无关。
- 一般项目中定时任务或者周期性任务,无需生产者参与,需要启动Celery Beat程序,他根据配置文件中定时配置,按要求不停的向消息队列中发送异步任务
- 在保证连通性前提下,无论Worker是否启动,Beat或生产者都可以往消息队列中发送任务
- 确保有安装Celery、Redis等常用的Python扩展库
- windows下执行运行Worker,任务不执行,必需使用-P eventlet启动,同时连接Redis必需使用IP地址,不能使用localhost
- 启动Worker时必需和消息队列保持连通,修改任务函数后,必需重启Worker
- 启动Worker时可以指定消息队列,但是必需在配置文件中配置,或调用任务时指定队列名
- 如果都是使用默认队列celery,启动Worker时可能会收到大量历史任务并进行处理
- 定时任务celery beat如果没有及时关闭,会一直按要求发送异步任务,产生大量历史遗留任务
常用命令:
#帮助文档 多看看
celery --help
#常规启动Worker
celery -A tasks worker --loglevel=INFO
#Windows下启动Worker
celery -A tasks worker --loglevel=INFO -P eventlet
#关闭Worker
Ctrl + C,可能需要连续按
#启动Beat程序 可以帮我们定时发送任务到消息队列
celery -A tasks beat --loglevel=INFO
创建一个简单python环境:
tasks.py用来配置任务的, main.py 用来执行
tasks.py:
from celery import Celery
import time
# task:任务
# broker(中间人):存储任务的队列
# worker:真正执行任务的工作者
# backend:用来存储任务执行后的结果
celery = Celery("tasks",
broker="redis://192.168.124.49:6379/0",
backend="redis://192.168.124.49:6379/0")
# 加上此装饰器,这个函数就变成celery任务了(task)
@celery.task
def send_mail():
print("邮件开始发送。。。")
time.sleep(3)
print("邮件发送结束!")
main.py:
from tasks import send_mail
if __name__ == '__main__':
send_mail.delay()
启动celery:
celery -A tasks.celery worker --loglevel=info -P eventlet
注意:在windows中执行时,添加-P eventlet,否则可能报错。
然后执行main.py文件:
创建一个多任务python环境:
其中,tasks是消费者模块,有管理的celery文件,有多个任务函数文件。produce_task1模拟消费者,测试异步任务调用,produce_task2模拟定时任务调用。
celery.py
from celery import Celery
from datetime import timedelta
app = Celery("tasks_demo",
broker="redis://192.168.124.49:6379/0",
backend="redis://192.168.124.49:6379/0",
# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
include=["celery_demo2.tasks.task1", "celery_demo2.tasks.task2"])
app.conf.timezone = 'Asia/Shanghai' # 时区
app.conf.enable_utc = False # 是否使用UTC
app.conf.task_default_queue = "celery02" # 修改默认队列,可以不要
# 配置文件定时任务
app.conf.beat_schedule = {
'sendmail-every-10-seconds': {
'task': 'celery_demo2.tasks.task2.send_msg',
'schedule': timedelta(seconds=10),
'args': ('李四',)
},
}
task1.py
import time
from celery_demo2.tasks.celery import app
# 这是关键,穿上这件衣服就是异步任务函数了
@app.task
def send_email(res):
print("开始向%s发送邮件任务" % res)
time.sleep(5)
print("完成向%s发送邮件任务" % res)
return "mail ok"
task2.py:
import time
from celery_demo2.tasks.celery import app
@app.task
def send_msg(name):
print("开始向%s发送短信任务" % name)
time.sleep(5)
print("完成向%s发送短信任务" % name)
return "msg ok"
produce_task1.py
from celery_demo2.tasks.task1 import send_email
from celery_demo2.tasks.task2 import send_msg
from datetime import datetime
v1 = datetime.now()
print(f"当前时间:{v1}")
# 立即告知celery去执行test_celery任务,并传入一个参数
# result = send_email.apply_async(('yuan',),queue="testq")
result = send_email.delay('yuan')
print(f"任务ID{result.id}")
result = send_msg.delay('yuan')
print(f"任务ID{result.id}")
produce_task2.py
from celery_demo2.tasks.task1 import send_email
from celery_demo2.tasks.task2 import send_msg
from datetime import datetime
# 方式一 固定时间
v1 = datetime(2022, 12, 29, 23, 25, 00)
print(f"当前时间:{v1}")
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(f"任务运行时间:{v2}")
result = send_email.apply_async(args=["定时任务-指定时间"], eta=v2)
print(f"任务ID{result.id}")
# 方式二
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
# 使用apply_async并设定时间
result = send_msg.apply_async(args=["定时任务-延时10秒"], eta=task_time)
print(result.id)
启动celery:
celery -A celery_demo2.tasks.celery worker --loglevel=info -P eventlet
注意:在windows中执行时,添加-P eventlet,否则可能报错。
运行produce_task1.py文件,模拟生产者调用异步任务:
- 异步任务一旦调用,返回一个任务ID后继续执行,不影响主业务流程。
- 消息队列、Worker基本无延迟的收到待处理任务,并立即执行、存储结果。
- 任务是分布式并行执行,因此异步任务
运行produce_task2.py文件,模拟生产者调用定时任务:
一个指定时间点运行,一个延时固定间隔执行。
celery beat运行定时任务:
celery -A celery_demo2.tasks.celery beat --loglevel=INFO
注意:此时celery的worker也需要开启着来执行异步任务,beat是发送定时任务,worker来进行消费。
参考博文:
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/142830.html