在Flask中使用Celery完成异步和定时任务(Flask、Celery、Redis)

编程目标

通过使用Flask和Celery,实现一个简单的Web应用程序,能够接收HTTP POST请求,并异步发送电子邮件。

说明

  1. 使用Flask创建一个简单的Web应用程序,包含一个HTTP POST路由,用于接收发送电子邮件的请求。
  2. 使用Celery实现一个异步任务,用于发送电子邮件。
  3. 发送电子邮件的请求应包含以下信息:
    • 收件人地址
    • 邮件主题
    • 邮件内容
  4. 邮件发送成功后,返回响应表示成功发送。

技术栈

  • Python
  • Flask
  • Celery
  • Redis

接口设计

1. 发送邮件接口

  • URL: /send-email
  • 方法: POST
  • 请求参数:
    • recipient (string): 收件人地址
    • subject (string): 邮件主题
    • body (string): 邮件内容
  • 成功响应:
    • 状态码: 202 Accepted
    • 响应体: {“message”: “邮件发送任务已启动”}

app.py代码:

from flask import Flask, request, jsonify
from tasks import send_email_async

app = Flask(__name__)
app.json.ensure_ascii = False  # 解决中文乱码问题
@app.route('/send-email', methods=['POST'])
def email_sender():
    post_form_data = request.json
    print(post_form_data)
    # 调用异步发送邮件任务
    email_data = {
        'sender_email''csdn_代码写注释@163.com',
        'sender_password''csdn_代码写注释',
        'recipient': post_form_data['recipient'],
        'subject': post_form_data['subject'],
        'body': post_form_data['body']
    }
    send_email_async.delay(email_data)
    return jsonify({"message""邮件发送任务已启动"}), 202

if __name__ == "__main__":
    app.run(debug=True, host='0.0.0.0', port=5000)

tasks.py代码:

from celery import Celery
import smtplib
from email.mime.text import MIMEText
from email.header import Header

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def send_email_async(email_data):
    content = email_data.get('body')
    msg_from = email_data.get('sender_email')
    password = email_data.get('sender_password')
    msg_to = email_data.get('recipient')
    subject = email_data.get('subject')

    msg = MIMEText(content)
    msg['Subject'] = subject
    msg['From'] = msg_from
    msg['To'] = msg_to

    try:
        s = smtplib.SMTP_SSL("smtp.163.com"465)
        s.login(msg_from, password)
        s.sendmail(msg_from, msg_to, msg.as_string())
        s.quit()
        print('邮件发送成功!')
        return "邮件发送成功!"
    except Exception as e:
        print(f"邮件发送失败: {e}")
        return "邮件发送失败"

test_send_email.py

import requests

subject = "药价监督流水报告"  # 主题
content = """
尊敬的xxx客户,

随函附上本季度药价监督的流水报告。以下是本季度药价监管的关键要点:

1. 监督范围:全国23个省市的主要药品批发市场及在线药品交易平台。
2. 检查次数:共计1,536次现场检查和3,245次在线监控。
3. 发现问题:在检查中发现15起价格违规行为,涉及7种药品。
4. 违规处理:所有违规行为均已记录在案,并对相关企业进行了警告及罚款处理。
5. 价格波动:本季度药品平均价格波动率为3.5%,与上季度相比下降了1.2个百分点。

2024年05月09日
"""

def send_email():
    url = 'http://localhost:5000/send-email'  # Flask 应用的 URL
    data = {
        'recipient''csdn_代码写注释@qq.com',  # 收件人地址
        'subject': subject,  # 邮件主题
        'body': content  # 邮件内容
    }

    # 发送 POST 请求
    response = requests.post(url, json=data)

    # 输出响应内容
    print('响应状态码是:', response.status_code)
    print('响应内容是:', response.text)

send_email()

实现效果:在Flask中使用Celery完成异步和定时任务(Flask、Celery、Redis)在Flask中使用Celery完成异步和定时任务(Flask、Celery、Redis)


原文始发于微信公众号(基根奋斗营):在Flask中使用Celery完成异步和定时任务(Flask、Celery、Redis)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/289588.html

(0)
葫芦侠五楼的头像葫芦侠五楼

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!