本文实例为大家分享了Python实现线程池之线程安全队列的具体代码,供大家参考,具体内容如下
一、线程池组成
一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中一个线程安全的队列是实现线程池和任务队列的基础,本节我们通过threading包中的互斥量threading.Lock()和条件变量threading.Condition()来实现一个简单的、读取安全的线程队列。
二、线程安全队列的实现
包括put、pop、get等方法,为保证线程安全,读写操作时要添加互斥锁;并且pop操作可以设置等待时间以阻塞当前获取元素的线程,当新元素写入队列时通过条件变量通知解除等待操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
class ThreadSafeQueue( object ): def __init__( self , max_size = 0 ): self .queue = [] self .max_size = max_size # max_size为0表示无限大 self .lock = threading.Lock() # 互斥量 self .condition = threading.Condition() # 条件变量 def size( self ): """ 获取当前队列的大小 :return: 队列长度 """ # 加锁 self .lock.acquire() size = len ( self .queue) self .lock.release() return size def put( self , item): """ 将单个元素放入队列 :param item: :return: """ # 队列已满 max_size为0表示无限大 if self .max_size ! = 0 and self .size() > = self .max_size: return ThreadSafeException() # 加锁 self .lock.acquire() self .queue.append(item) self .lock.release() self .condition.acquire() # 通知等待读取的线程 self .condition.notify() self .condition.release() return item def batch_put( self , item_list): """ 批量添加元素 :param item_list: :return: """ if not isinstance (item_list, list ): item_list = list (item_list) res = [ self .put(item) for item in item_list] return res def pop( self , block = False , timeout = 0 ): """ 从队列头部取出元素 :param block: 是否阻塞线程 :param timeout: 等待时间 :return: """ if self .size() = = 0 : if block: self .condition.acquire() self .condition.wait(timeout) self .condition.release() else : return None # 加锁 self .lock.acquire() item = None if len ( self .queue): item = self .queue.pop() self .lock.release() return item def get( self , index): """ 获取指定位置的元素 :param index: :return: """ if self .size() = = 0 or index > = self .size(): return None # 加锁 self .lock.acquire() item = self .queue[index] self .lock.release() return item class ThreadSafeException(Exception): pass |
三、测试逻辑
3.1、测试阻塞逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
def thread_queue_test_1(): thread_queue = ThreadSafeQueue( 10 ) def producer(): while True : thread_queue.put(random.randint( 0 , 10 )) time.sleep( 2 ) def consumer(): while True : print ( 'current time before pop is %d' % time.time()) item = thread_queue.pop(block = True , timeout = 3 ) # item = thread_queue.get(2) if item is not None : print ( 'get value from queue is %s' % item) else : print (item) print ( 'current time after pop is %d' % time.time()) t1 = threading.Thread(target = producer) t2 = threading.Thread(target = consumer) t1.start() t2.start() t1.join() t2.join() |
测试结果:
我们可以看到生产者线程每隔2s向队列写入一个元素,消费者线程当无数据时默认阻塞3s。通过执行时间发现消费者线程确实发生了阻塞,当生产者写入数据时结束当前等待操作。
3.2、测试读写加锁逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
def thread_queue_test_2(): thread_queue = ThreadSafeQueue( 10 ) def producer(): while True : thread_queue.put(random.randint( 0 , 10 )) time.sleep( 2 ) def consumer(name): while True : item = thread_queue.pop(block = True , timeout = 1 ) # item = thread_queue.get(2) if item is not None : print ( '%s get value from queue is %s' % (name, item)) else : print ( '%s get value from queue is None' % name) t1 = threading.Thread(target = producer) t2 = threading.Thread(target = consumer, args = ( 'thread1' ,)) t3 = threading.Thread(target = consumer, args = ( 'thread2' ,)) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join() |
测试结果:
生产者还是每2s生成一个元素写入队列,消费者开启两个线程进行消费,默认阻塞时间为1s,打印结果显示通过加锁确保每次只有一个线程能获取数据,保证了线程读写的安全。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/wang_xiaowang/article/details/105933224