基于4.3.0版本 Celery介绍和基本使用

Celery介绍和基本使用 一、Celery介绍和基本使用

  1. Celery官方文档:
    • Celery英文手册
    • Celery中文手册
  2. Celery是什么?
    Celery 是一个 基于python开发的分布式异步消息任务队列 , 通过它可以轻松的实现任务的异步处理 ,  如果你的业务场景中需要用到异步任务 , 就可以考虑使用celery ,  举几个实例场景中可用的例子:
    • 异步任务:将耗时的操作任务提交给Celery去异步执行 , 比如发送短信/邮件、消息推送、音频处理等等
    • 做一个定时任务 , 比如每天定时执行爬虫爬取指定内容
    • 还可以使用celery实现简单的分布式爬虫系统等等
    Celery 在执行任务时需要通过一个消息中间件(Broker)来接收和发送任务消息 , 以及存储任务结果 ,  一般使用rabbitMQ or Redis
  3. Celery有以下优点:
    • 简单:Celery 易于使用和维护 , 并且它 不需要配置文件  , 并且配置和使用还是比较简单的(后面会讲到配置文件可以有)
    • 高可用:当任务执行失败或执行过程中发生连接中断 , celery 会自动尝试重新执行任务
    • 快速:单个 Celery 进程每分钟可处理数以百万计的任务 , 而保持往返延迟在亚毫秒级
    • 灵活: Celery 几乎所有部分都可以扩展或单独使用 , 各个部分可以自定义 。
  4. Celery执行流程图如下
二、Celery安装使用
  1. 安装celery模块
    pip install celery==4.3.0
  2. Celery的默认broker(消息中间件)是RabbitMQ, 仅需配置一行就可以
    BROKER_URL = 'amqp://guest:guest@localhost:5672/yard'
  3. 使用Redis做broker(消息中间件)也可以 本地安装redis数据库(redis安装流程不再重复)
    pip install redis
  4. 注意:
    celery不支持在windows下运行任务 , 需要借助eventlet来完成
    pip install eventlet
三、Celery异步任务使用代码示例
  1. 对比说明
    1. 不使用Celery的情况下我们执行一个耗时的任务 , 创建一个app.py 文件
      import timedef add(x,y):time.sleep(5)return x+yif **name** == '**main**':print('task start....')result = add.delay(2,3)print('task end....')print(result) 运行代码发现出现5秒后打印了结果
      task start....task end....5
    2. 使用Celery执行(异步任务调度的情况)
      1. step1:新建一个tasks.py文件
        import timefrom celery import Celery#消息中间件(使用的redis)broker = 'redis://localhost:6379/1'#结果存储(使用的redis)backend = 'redis://localhost:6379/2'#实例化Celery对象app = Celery('celeryDemo',broker=broker,backend=backend)# 添加@app.task()装饰器 , 说明执行的任务是一个异步任务@app.task()def add(x,y):print('task enter ....')time.sleep(5)return x+y
      2. step2:修改app.py文件 , 代码如下
        from tasks import addif **name** == '**main**':print('task start....')result = add.delay(2,3)print('task end....')print(result) 上述代码修改完毕后: 打开终端 , 进入项目下 , 运行 app.py文件
        python3 app.py 注意:立即执行结果如下 , 此时我们可以看到反回了一个任务id(47ad971c-9a9a-44ca-bee4-a71f44eff048) , 并没有打印结果 , 因为没有执行worker端
        运行worker端 打开终端 , 进入项目下 , 输入如下命令:
        celery -A tasks worker -l info# windows下启动的时候 , 使用eventlet 方式# celery -A tasks worker --loglevel=info -P eventlet-c 10 # -c是协程的数量 , 生产环境可以用1000 结果如下:
        -------------- celery@DESKTOP-9G0AUUR v5.2.3 (dawn-chorus)--- ***** ------- ******* ---- Windows-10-10.0.19041-SP0 2022-03-23 12:22:13+ *** --- * ---+ ** ---------- [config]+ ** ---------- .> app:celeryDemo:0x21eea2c6088+ ** ---------- .> transport:redis://127.0.0.1:6379/1+ ** ---------- .> results:redis://127.0.0.1:6379/2+ ***--- * --- .> concurrency: 10 (eventlet)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)---***** ------------------- [queues].> celeryexchange=celery(direct) key=celery[tasks]. tasks.add[2022-03-23 12:22:13,568: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1[2022-03-23 12:22:13,572: INFO/MainProcess] mingle: searching for neighbors[2022-03-23 12:22:14,628: INFO/MainProcess] mingle: all alone[2022-03-23 12:22:14,648: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/1.[2022-03-23 12:22:14,652: INFO/MainProcess] celery@DESKTOP-9G0AUUR ready.[2022-03-23 12:22:14,657: INFO/MainProcess] Task tasks.add[47ad971c-9a9a-44ca-bee4-a71f44eff048] received[2022-03-23 12:22:14,658: WARNING/MainProcess] task enter ....[2022-03-23 12:22:14,662: INFO/MainProcess] Task tasks.add[47ad971c-9a9a-44ca-bee4-a71f44eff048] succeeded in 0.0s: 5 此时如果不断的向broker中发送消息 , 那么worker中就会接收到消息并执行 打开终端操作如下:
        (blog) G:\Python\Django\celery使用>pythonPython 3.7.4 (tags/v3.7.4:e09359112e, Jul8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32Type "help", "copyright", "credits" or "license" for more information.>>> from tasks import add # 导入执行的方法>>> add.delay(2,5) # 发送异步任务消息>>> result = add.delay(3,5)>>> result.ready() # 查看异步任务是否执行完毕True>>> result.get() # 获取任务执行完毕后的结果8
  2. Celery的配置文件(优化版)
    由于Celery的配置信息比较多 , 通常情况下 , 我们会创建一个Celery的配置文件 ,  这里命名为 celeryconfig.py
    • step1:
      在之前的项目中创建一个celery_demo的python Package文件夹 在__init__.py中添加如下代码:
      from celery import Celery# include:导入指定的任务模块# 这一次创建 app , 并没有直接指定 broker(消息中间件来接收和发送任务消息) 和 backend(存储结果) 。而是在配置文件中 。app = Celery('demo',include=['celery_demo.task1','celery_demo.task2',])# 通过Celery 实例加载配置模块app.config_from_object('celery_demo.celeryconfig',)
    • step2:
      在celery_demo文件夹下新建一个celeryconfig.py文件(Celery的配置文件) 添加如下代码:
      # 官方配置文档:查询每个配置项的含义 。# # broker(消息中间件来接收和发送任务消息)BROKER_URL = 'redis://localhost:6379/1'# backend(存储worker执行的结果)CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'# 设置时间参照 , 不设置默认使用的UTC时间CELERY_TIMEZONE = 'Asia/Shanghai'# 指定任务的序列化CELERY_TASK_SERIALIZER='json'# 指定执行结果的序列化CELERY_RESULT_SERIALIZER='json'
    • step3:在celery_demo文件夹下新建task1.py和task2.py文件
      • task1.py文件中执行如下代码
        import timefrom celery_demo import app@app.taskdef add(x,y):time.sleep(5)return x+y
      • task2.py文件中执行如下代码
        import timefrom celery_demo import app@app.taskdef mut(x,y):time.sleep(5)return x*y
    • step4:修改app.py文件 , 代码如下 , 分别发送执行任务消息到broker
      from celery_demo import task1,task2# apply_async和delay都表示调用异步任务task1.add.delay(2,4)# task1.add.apply_async(2,4)task2.mut.delay(3,4)print('end...') 查看运行结果 , 打开终端启动worker , 如下则表示启动成功:
      (blog) G:\Python\Django\celery使用>celery worker -A celery_demo -l INFO -P eventlet-c 10-------------- celery@DESKTOP-9G0AUUR v4.3.0 (rhubarb)---- **** -------- * **** -- Windows-10-10.0.19041-SP0 2022-03-23 16:26:31-- * - **** ---+ ** ---------- [config]+ ** ---------- .> app:demo:0x2bc35c9ba08+ ** ---------- .> transport:redis://localhost:6379/1+ ** ---------- .> results:redis://localhost:6379/2+ ***--- * --- .> concurrency: 10 (eventlet)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)---***** ------------------- [queues].> celeryexchange=celery(direct) key=celery[tasks]. celery_demo.task1.add. celery_demo.task2.mut[2022-03-23 16:26:39,476: INFO/MainProcess] Connected to redis://localhost:6379/1[2022-03-23 16:26:41,532: INFO/MainProcess] mingle: searching for neighbors[2022-03-23 16:26:48,671: INFO/MainProcess] mingle: all alone[2022-03-23 16:26:50,701: INFO/MainProcess] pidbox: Connected to redis://localhost:6379/1.[2022-03-23 16:26:54,765: INFO/MainProcess] celery@DESKTOP-9G0AUUR ready. 发送执行任务消息 , 打开终端 , 进行如下操作
      (blog) G:\Python\Django\celery使用>python app.pyend... 此时会看到worker端接受到了两个任务并开始执行 , 如下所示:
      [2022-03-23 16:28:30,167: INFO/MainProcess] Received task: celery_demo.task1.add[308aa4ce-2443-4618-b2e5-6a6a8c3dac4b][2022-03-23 16:28:30,169: INFO/MainProcess] Received task: celery_demo.task2.mut[dc0eed0a-75bf-4c69-a691-9cc39513db18][2022-03-23 16:28:35,190: INFO/MainProcess] Task celery_demo.task1.add[308aa4ce-2443-4618-b2e5-6a6a8c3dac4b] succeeded in 5.01600000000326s: 6[2022-03-23 16:28:37,234: INFO/MainProcess] Task celery_demo.task2.mut[dc0eed0a-75bf-4c69-a691-9cc39513db18] succeeded in 7.061999999976251s: 12
四、定时任务
  • step1:
    在之前代码的celery_demo文件夹下的celeryconfig.py文件中 , 添加如下代码
    from datetime import timedeltafrom celery.schedules import crontab# 设置定时任务CELERYBEAT_SCHEDULE = {'task1':{'task':'celery_demo.task1.add','schedule':timedelta(seconds=10), 表示每10秒发送一次任务消息'args':(10,20)},'task2':{'task':'celery_demo.task2.mut','schedule':crontab(hour=22,minute=24), #表示在每天的晚上10点24分发送一次任务消息'args':(10,10)}}
  • step2:
    启动定时消息任务 , 打开终端 , 执行如下命令
    celery beat -A celery_demo -l INFO 启动
    celery beat v4.3.0 (rhubarb) is starting.__-...__-_LocalTime -> 2022-03-23 16:19:33Configuration ->. broker -> redis://localhost:6379/1. loader -> celery.loaders.app.AppLoader. scheduler -> celery.beat.PersistentScheduler. db -> celerybeat-schedule. logfile -> [stderr]@%INFO. maxinterval -> 5.00 minutes (300s)[2022-03-23 16:19:33,228: INFO/MainProcess] beat: Starting... 【基于4.3.0版本 Celery介绍和基本使用】如果要执行的是爬虫任务 , 只需要将上面的方法中的代码替换为爬虫代码即可 。