使用Flask实现进度条
问题描述
Python异步处理,新起一个进程返回处理进度
解决方案
使用 tqdm 和 multiprocessing.Pool
安装
1
|
pip install tqdm |
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import time import threading from multiprocessing import Pool from tqdm import tqdm def do_work(x): time.sleep(x) return x def progress(): time.sleep( 3 ) # 3秒后查进度 print (f '任务有: {pbar.total} 已完成:{pbar.n}' ) tasks = range ( 10 ) pbar = tqdm(total = len (tasks)) if __name__ = = '__main__' : thread = threading.Thread(target = progress) thread.start() results = [] with Pool(processes = 5 ) as pool: for result in pool.imap_unordered(do_work, tasks): results.append(result) pbar.update( 1 ) print (results) |
效果
Flask
安装
1
|
pip install flask |
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import time from multiprocessing import Pool from tqdm import tqdm from flask import Flask, make_response, jsonify app = Flask(__name__) def do_work(x): time.sleep(x) return x total = 5 # 总任务数 tasks = range (total) pbar = tqdm(total = len (tasks)) @app .route( '/run/' ) def run(): """执行任务""" results = [] with Pool(processes = 2 ) as pool: for _result in pool.imap_unordered(do_work, tasks): results.append(_result) if pbar.n > = total: pbar.n = 0 # 重置 pbar.update( 1 ) response = make_response(jsonify( dict (results = results))) response.headers.add( 'Access-Control-Allow-Origin' , '*' ) response.headers.add( 'Access-Control-Allow-Headers' , '*' ) response.headers.add( 'Access-Control-Allow-Methods' , '*' ) return response @app .route( '/progress/' ) def progress(): """查看进度""" response = make_response(jsonify( dict (n = pbar.n, total = pbar.total))) response.headers.add( 'Access-Control-Allow-Origin' , '*' ) response.headers.add( 'Access-Control-Allow-Headers' , '*' ) response.headers.add( 'Access-Control-Allow-Methods' , '*' ) return response |
启动(以 Windows 为例)
1
2
|
set FLASK_APP = main flask run |
接口列表
- 执行任务:http://127.0.0.1:5000/run/
- 查看进度:http://127.0.0.1:5000/progress/
test.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
<!DOCTYPE html> < html lang = "zh" > < head > < meta charset = "UTF-8" > < title >进度条</ title > < script src = "https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js" ></ script > < script src = "https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js" ></ script > < link href = "https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel = "external nofollow" rel = "stylesheet" > </ head > < body > < button id = "run" >执行任务</ button > < br >< br > < div class = "progress" > < div class = "progress-bar" role = "progressbar" aria-valuenow = "1" aria-valuemin = "0" aria-valuemax = "100" style = "width: 10%" >0.00% </ div > </ div > </ body > < script > function set_progress_rate(n, total) { //设置进度 var rate = (n / total * 100).toFixed(2); if (n > 0) { $(".progress-bar").attr("aria-valuenow", n); $(".progress-bar").attr("aria-valuemax", total); $(".progress-bar").text(rate + "%"); $(".progress-bar").css("width", rate + "%"); } } $("#run").click(function () { //执行任务 $.ajax({ url: "http://127.0.0.1:5000/run/", type: "GET", success: function (response) { set_progress_rate(100, 100); console.log('执行完成,结果为:' + response['results']); } }); }); setInterval(function () { //每1秒请求一次进度 $.ajax({ url: "http://127.0.0.1:5000/progress/", type: "GET", success: function (response) { console.log(response); var n = response["n"]; var total = response["total"]; set_progress_rate(n, total); } }); }, 1000); </ script > </ html > |
效果
Flask使用简单异步任务
在Flask中使用简单异步任务最简洁优雅的原生实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from flask import Flask from time import sleep from concurrent.futures import ThreadPoolExecutor # DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor executor = ThreadPoolExecutor( 2 ) app = Flask(__name__) @app .route( '/jobs' ) def run_jobs(): executor.submit(some_long_task1) executor.submit(some_long_task2, 'hello' , 123 ) return 'Two jobs was launched in background!' def some_long_task1(): print ( "Task #1 started!" ) sleep( 10 ) print ( "Task #1 is done!" ) def some_long_task2(arg1, arg2): print ( "Task #2 started with args: %s %s!" % (arg1, arg2)) sleep( 5 ) print ( "Task #2 is done!" ) if __name__ = = '__main__' : app.run() |
以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://xercis.blog.csdn.net/article/details/121920979