Celery介绍 您所在的位置:网站首页 celery可数吗 Celery介绍

Celery介绍

#Celery介绍| 来源: 网络整理| 查看: 265

Celery 官方

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

celery框架工作流程 1)创建Celery框架对象app,配置broker和backend,得到的app就是worker 2)给worker对应的app添加可处理的任务函数,用include配置给worker的app 3)完成提供的任务的定时配置app.conf.beat_schedule 4)启动celery服务,运行worker,执行任务(text_py) D:\mypy\luffy\scripts\异步任务>celery worker -A celery_task -l info -P eventlet ===>其中这里的 l 代表的是日志,===>info消息级别 5)启动beat服务,运行beat,添加任务 6)如果需要设置,每周循环定时任务可以查看定时任务中的周时间任务源码 Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP(高级消息队列协议), redis等

使用场景

异步执行:解决耗时任务

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务

Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app=Celery('任务名', broker='xxx', backend='xxx')

Celery执行异步任务 包架构封装 project ├── celery_task # celery包 │ ├── __init__.py # 包文件 │ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py │ └── tasks.py # 所有任务函数 ├── add_task.py # 添加任务 └── get_result.py # 获取结果 基本使用 celery.py # 1)创建app + 任务 # 2)启动celery(app)服务: # 非windows # 命令:celery worker -A celery_task -l info # windows: # pip3 install eventlet # celery worker -A celery_task -l info -P eventlet # 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本 # 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本 from celery import Celery broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) tasks.py from .celery import app import time @app.task def add(n, m): print(n) print(m) time.sleep(10) print('n+m的结果:%s' % (n + m)) return n + m @app.task def low(n, m): print(n) print(m) print('n-m的结果:%s' % (n - m)) return n - m add_task.py from celery_task import tasks # 添加立即执行任务 t1 = tasks.add.delay(10, 20) t2 = tasks.low.delay(100, 50) print(t1.id) # 添加延迟任务 from datetime import datetime, timedelta eta=datetime.utcnow() + timedelta(seconds=10) tasks.low.apply_async(args=(200, 50), eta=eta) get_result.py from celery_task.celery import app from celery.result import AsyncResult id = '21325a40-9d32-44b5-a701-9a31cc3c74b5' if __name__ == '__main__': async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('任务失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行') 高级使用 celery.py # 1)创建app + 任务 # 2)启动celery(app)服务: # 非windows # 命令:celery worker -A celery_task -l info # windows: # pip3 install eventlet # celery worker -A celery_task -l info -P eventlet # 3)添加任务:自动添加任务,所以要启动一个添加任务的服务 # 命令:celery beat -A celery_task -l info # 4)获取结果 from celery import Celery broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'low-task': { 'task': 'celery_task.tasks.low', 'schedule': timedelta(seconds=3), # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 'args': (300, 150), } } tasks.py from .celery import app import time @app.task def add(n, m): print(n) print(m) time.sleep(10) print('n+m的结果:%s' % (n + m)) return n + m @app.task def low(n, m): print(n) print(m) print('n-m的结果:%s' % (n - m)) return n - m get_result.py from celery_task.celery import app from celery.result import AsyncResult id = '21325a40-9d32-44b5-a701-9a31cc3c74b5' if __name__ == '__main__': async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('任务失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行') 大致使用方式==>伪代码 在项目的celery_task文件中的tasks编写一下代码 # 伪代码:立即和延迟任务使用 @app.task def send_email(user, content): result = print('对user发送content邮件内容') if not result: print('短信推送用户,邮件发送失败') return False return True 在Django项目中的APP中写入下面的代码: # 伪代码:celery异步或延迟任务 from celery_task.tasks import send_email class EMailAPIView(APIView): def post(self, request, *args, **kwargs): user = request.data.get('user') content = request.data.get('content') eta = request.data.get('eta') if not eta: # 添加立即的异步任务 send_email.delay(user, content) else: # 延迟任务 send_email.apply_aysnc(args=(user, content), eta=eta) return APIResponse() django中使用 celery.py """ celery框架django项目工作流程 1)加载django配置环境 2)创建Celery框架对象app,配置broker和backend,得到的app就是worker 3)给worker对应的app添加可处理的任务函数,用include配置给worker的app 4)完成提供的任务的定时配置app.conf.beat_schedule 5)启动celery服务,运行worker,执行任务 6)启动beat服务,运行beat,添加任务 重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下 """ # 一、加载django配置环境 import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev") # 二、加载celery配置环境 from celery import Celery # broker broker = 'redis://127.0.0.1:6379/0' # backend backend = 'redis://127.0.0.1:6379/1' # worker app = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'django-task': { 'task': 'celery_task.tasks.test_django_celery', 'schedule': timedelta(seconds=3), 'args': (), } } tasks.py from .celery import app # 获取项目中的模型类 from api.models import Banner @app.task def test_django_celery(): banner_query = Banner.objects.filter(is_delete=False).all() print(banner_query) celery框架总览 1.celery框架: 定义: 异步任务框架 - 独立运行的服务器,由三部分组成 内部有socekt,可以起worker(执行任务)及beat(提交任务)+存储结果backend app_worker = Celery(broker,backend,include) 作用: 任务:@app.task装饰的函数 异步任务:任务.delay() =>处理耗时的需求 延迟任务:任务.apply_async(args,kwargs,eat) =>处理延迟的需求 定时任务:完成任务的beat配置,启动beat服务 =>处理周期性的需求 命令: celery worker -A 包|模块 -l info eventlet celery beat -A 包|模块 -l info


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有