python-celery-Flask

一、基本使用

Celery是由Python开发的一个简单、灵活、可靠的处理大量任务的分发系统,可以实时处理任务,也可以定时异步处理任务。
每次分发任务后得到一个ID,然后根据这个ID查询任务执行情况。

并且celery需要rabbitMQ、Redis等充当broker来进行消息的接收。

安装

1
pip3 install celery eventlet   # windows系统需要eventlet模块

下面我们来快速上手celery
编辑s1.py

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python3

from celery import Celery

cel = Celery('xxx',
broker="redis://192.168.1.40",
backend='redis://192.168.1.40')

@cel.task
def f1(x,y):
return x+y

然后把s1这个work工作起来,进入命令终端,如果在linux系统,不用添加参数-P eventlet

1
E:\pro\xxx_dir> celery worker -A s1 -l info  -P eventlet

编辑s2.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python3

import datetime
from s1 import f1

# 立即执行
result = f1.delay(4,6)
print(result.id)

# 定时执行
ctime = datetime.datetime.now()
# ctime = datetime.datetime(year=2019,month=2,day=21,hour=14,minute=8)
utc_time = datetime.datetime.utcfromtimestamp(ctime.timestamp())
s10 = datetime.timedelta(seconds=10)
ctime_x = utc_time + s10
result = f1.apply_async(args=[1,3],eta=ctime_x)
print(result.id)

编辑s3.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python3

from celery.result import AsyncResult
from demo1.s1 import cel

async = AsyncResult(id="f43bce52-9503-475e-9d19-4a46ed910a8e",app=cel)

if async.successful():
ret = async.get() # 获取值
#async.forget() # 删除值
print(ret)
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
else:
print("任务执行失败")


async.revoke() # 取消一个任务,当一个任务正在执行,不能取消
async.revoke(terminate=True) # 终止一个任务,当一个任务正在执行,可以被终止

二、多目录结构

经过上面快速上手的学习,了解了celery的基本使用,那么重组一下代,形成项目中多目录结构看看相互之间如何调用?

创建一个celery_tasks的目录,里面一般保存2类文件,其中一个文件名称必须为celery,另一类就是定义task任务的文件,可以有多个。

定义celery_tasks/celery.py文件,如果有多个task任务文件,可以用includ列表包含进来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python3

from celery import Celery
# from celery.schedules import crontab

cel = Celery('xxxxxx',
broker='redis://192.168.1.40:6379',
backend='redis://192.168.1.40:6379',
include=['celery_tasks.task1',)
#include=['celery_tasks.task1','celery_tasks.task2'])

# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

在多目录结构中,跑celery work时不用指定到文件,指定目录即可

1
E:\pro\xxx_dir> celery worker -A celery_tasks -l info  -P eventlet

定义celery_tasks/task1.py

1
2
3
4
5
6
7
#!/usr/bin/env python3

from .celery import cel

@cel.task
def f1(x,y):
return x+y

有了celery.py文件和task任务文件,我们就可以在任意地方调用任务了。

比如定义test/exec1.py文件来执行任务

1
2
3
4
5
6
#!/usr/bin/env python3

from celery_tasks.task1 import f1

result = f1.delay(4,6)
print(result.id)

定义test/exec2.py文件来获取任务执行结果,需要提供任务ID

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python3

from celery_tasks.celery import cel
from celery.result import AsyncResult

async = AsyncResult(id="be6bb021-da48-46a9-b1bc-94b987f7c8a7",app=cel)

if async.successful():
print(async.get())
else:
print("任务执行失败")

三、Flask中的例用

有了上面celery的认识,我们来简单写点代码,看一下在Flask框架中celery是如何使用的?

定义Flask项目启动文件app.py

写线上代码时是要把任务保存在数据库中的,这里仅作示例就保存在了HISTORY全局变量中了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/bin/env python3

from flask import Flask,request,render_template,redirect

from celery_tasks.task2 import deploy

app = Flask(__name__)

HISTORY = []

@app.route('/index',methods=["GET","POST"])
def index():
if request.method == "GET":
return render_template('index.html',history=HISTORY)


@app.route('/publish',methods=["GET","POST"])
def publish():
if request.method == "GET":
return render_template('publish.html')
else:
version = request.form.get("version")
hosts = request.form.getlist("hosts")
print(version,hosts)

import datetime
ctime = datetime.datetime.now()
utc_time = datetime.datetime.utcfromtimestamp(ctime.timestamp())
ctime_10 = utc_time + datetime.timedelta(seconds=10)
result = deploy.apply_async(args=[version,hosts],eta=ctime_10)
HISTORY.append({"version":version,"hosts":hosts,"task_id":result.id})
print(HISTORY)
return redirect("/index")

from celery.result import AsyncResult
from celery_tasks.celery import cel

@app.route('/check_result',methods=["GET","POST"])
def check_result():
task_id = request.args.get("task_id")
async = AsyncResult(id=task_id,app=cel)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
return "执行成功"
elif async.failed():
return '执行失败'
elif async.status == 'PENDING':
return '任务等待中被执行'
elif async.status == 'RETRY':
return '任务异常后正在重试'
elif async.status == 'STARTED':
return '任务已经开始被执行'
else:
return "unkown status"

@app.route('/cancel', methods=["GET", "POST"])
def cancel():
task_id = request.args.get("task_id")
async =AsyncResult(id=task_id,app=cel)
async.revoke(terminate=True)
for i in HISTORY:
if task_id in i.values():
HISTORY.remove(i)
return redirect("/index")

if __name__ == '__main__':
app.run()

定义其中用到的templates/index.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>发布系统</h1>
<a href="/publish">添加发布任务</a>
<table>
{% for row in history %}
<tr>
<td>{{ row.task_id }}</td>
<td>{{ row.version }}</td>
{% for host in row.hosts %}
<td>
<span>{{ host }}</span>
</td>
{% endfor %}
<td><a href="/check_result?task_id={{ row.task_id }}">查看</a></td>
<td><a href="/cancel?task_id={{ row.task_id }}">取消</a></td>
</tr>
{% endfor %}
</table>

</body>
</html>

定义其中用到的templates/publish.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<form action="" method="post">
<p><input type="text" name="version" placeholder="请输入要发布的版本"></p>
<p>
<select name="hosts" multiple="multiple">
<option value="c1.com">c1.com</option>
<option value="c2.com">c2.com</option>
<option value="c3.com">c3.com</option>
</select>
</p>
<input type="submit" value="提交">
</form>
</body>
</html>

定义其中的celery_tasks.task2.py文件,这里的deploy是真正定义任务的地方.

1
2
3
4
5
6
7
8
#!/usr/bin/env python3

from .celery import cel

@cel.task
def deploy(version,hosts):
print(version, hosts) # 定义想要执行的任务代码
return 'deploy ok'

同样别望了先把work跑起来,再启动Flask

1
E:\pro\xxx_dir> celery worker -A celery_tasks -l info  -P eventlet

四、总结

还需要多写代码在项目中总结celery…