为什么要引入线程池
如果在程序中经常要用到线程,频繁的创建和销毁线程会浪费很多硬件资源,
所以需要把线程和任务分离。线程可以反复利用,省去了重复创建的麻烦。
在 Process 类中,我们必须显式地创建流程。但是,Pool 类更方便,您不必手动管理它。创建池对象的语法是 ?multiprocessing.Pool(processes, initializer, initargs, maxtasksperchild, context)?
? 。所有参数都是可选的。
- processes 表示您要创建的工作进程的数量。默认值通过 os.cpu_count() 获取。
- initializer第二个初始化器参数是一个用于初始化的函数。
- initargs 是传递给它的参数。
-
maxtasksperchild
表示分配给每个子进程的任务数。在完成该数量的任务之后,该进程将被一个新的工作进程替换。指定它的好处是任何未使用的资源都将被释放。如果未提供任何内容,则只要池存在,进程就会存在。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import time from multiprocessing import Pool def square(x): print (f "start process:{x}" ) square = x * x print (f "square {x}:{square}" ) time.sleep( 1 ) print (f "end process:{x}" ) if __name__ = = "__main__" : starttime = time.time() pool = Pool() pool. map (square, range ( 0 , 5 )) pool.close() endtime = time.time() print (f "Time taken {endtime-starttime} seconds" ) |
结果为:
start process:0
start process:1
square 1:1
square 0:0
end process:1
start process:2
end process:0
start process:3
square 2:4
square 3:9
end process:3
end process:2
start process:4
square 4:16
end process:4
Time taken 3.0474610328674316 seconds
在这里,我们从多处理模块中导入 Pool 类。在主函数中,我们创建了一个 Pool 类的对象。 pool.map() 将我们想要并行化的函数和一个可迭代的函数作为参数。它在可迭代的每个项目上运行给定的函数。它还接受一个可选的 chunksize 参数,它将可迭代对象拆分为等于给定大小的块,并将每个块作为单独的任务传递。 pool.close() 用于拒绝新任务。
我们可以看到花费的时间大约是 3 秒。
?pool.imap()?
? 与 ?pool.map()?
? 方法几乎相同。不同的是,每个项目的结果都是在准备好后立即收到的,而不是等待所有项目都完成。此外, ?map()?
? 方法将可迭代对象转换为列表(如果不是)。但是, ?imap()?
? 方法没有。
来看下一个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import time from multiprocessing import Pool def square(x): print (f "start process {x}" ) square = x * x time.sleep( 1 ) print (f "end process {x}" ) return square if __name__ = = "__main__" : pool = Pool() a = pool. map (square, range ( 0 , 5 )) print (a) |
运行结果:
start process 0
start process 1
end process 0
start process 2
end process 1
start process 3
end process 2
start process 4
end process 3
end process 4
[0, 1, 4, 9, 16]
1
2
3
4
5
6
7
8
|
from concurrent.futures import ThreadPoolExecutor def say_hello(): print ( "Hello" ) executor = ThreadPoolExecutor( 50 ) for i in range ( 0 , 10 ): executor.submit(say_hello) |
练习
利用 Python 多线程模拟商品秒杀过程,不可以出现超买和超卖的情况。假设A商品有50件参与秒杀活动,10分钟秒杀自动结束。
- kill_total 商品总数
- kill_num 成功抢购数
- kill_flag 有效标志位
- kill_user 成功抢购的用户ID
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
from redis_db import pool import redis import random from concurrent.futures import ThreadPoolExecutor s = set () while True : if len (s) = = 1000 : break num = random.randint( 10000 , 100000 ) s.add(num) print (s) con = redis.Redis( connection_pool = pool ) try : con.delete( "kill_total" , "kill_num" , "kill_flag" , "kill_user" ) con. set ( "kill_total" , 50 ) con. set ( "kill_num" , 0 ) con. set ( "kill_flag" , 1 ) con.expire( "kill_flag" , 600 ) except Exception as e: print (e) finally : del con executor = ThreadPoolExecutor( 200 ) def buy(): connection = redis.Redis( connection_pool = pool ) pipline = connection.pipline() try : if connection.exists( "kill_flag" ) = = 1 : pipline.watch( "kill_num" , "kill_user" ) total = pipline.get( "kill_total" ) num = int (pipline.get( "kill_num" ).decode( "utf-8" )) if num < total: pipline.multi() pipline.incr( "kill_num" ) user_id = s.pop() pipline.rpush( "kill_user" , user_id) pipline.execute() except Exception as e: print (e) finally : if "pipline" in dir (): pipline.reset() del connection for i in range ( 0 , 1000 ): executor.submit(buy) print ( "秒杀活动已经结束" ) |
到此这篇关于在 Python 中利用Pool 进行多处理的文章就介绍到这了,更多相关 Python Pool 多处理内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.51cto.com/yuzhou1su/5200609