服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析

Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析

2021-02-04 12:05mengwei Java教程

这篇文章主要介绍了Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析,涉及queue,BlockingQueue等有关内容,具有一定参考价值,需要的朋友可以参考。

queue是什么

队列,是一种数据结构。除了优先级队列和lifo队列外,队列都是以fifo(先进先出)的方式对各个元素进行排序的。无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的。在fifo队列中,所有新元素都插入队列的末尾。

queue中的方法

queue中的方法不难理解,6个,每2对是一个也就是总共3对。看一下jdkapi就知道了:

Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析

注意一点就好,queue通常不允许插入null,尽管某些实现(比如linkedlist)是允许的,但是也不建议。

blockingqueue

1、blockingqueue概述

blockingqueue也是java.util.concurrent下的主要用来控制线程同步的工具。

blockingqueue有四个具体的实现类,根据不同需求,选择不同的实现类

1、arrayblockingqueue:一个由数组支持的有界阻塞队列,规定大小的blockingqueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以fifo(先入先出)顺序排序的。

2、linkedblockingqueue:大小不定的blockingqueue,若其构造函数带一个规定大小的参数,生成的blockingqueue有大小限制,若不带大小参数,所生成的blockingqueue的大小由integer.max_value来决定.其所含的对象是以fifo(先入先出)顺序排序的。

3、priorityblockingqueue:类似于linkedblockqueue,但其所含对象的排序不是fifo,而是依据对象的自然排序顺序或者是构造函数的comparator决定的顺序。

4、synchronousqueue:特殊的blockingqueue,对其的操作必须是放和取交替完成的。

linkedblockingqueue可以指定容量,也可以不指定,不指定的话,默认最大是integer.max_value,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

讲blockingqueue,因为blockingqueue是queue中的一个重点,并且通过blockingqueue我们再次加深对于生产者/消费者模型的理解。其他的queue都不难,通过查看jdkapi和简单阅读源码完全可以理解他们的作用。

blockingqueue,顾名思义,阻塞队列。blockingqueue是在java.util.concurrent下的,因此不难理解,blockingqueue是为了解决多线程中数据高效安全传输而提出的。

多线程中,很多场景都可以使用队列实现,比如经典的生产者/消费者模型,通过队列可以便利地实现两者之间数据的共享,定义一个生产者线程,定义一个消费者线程,通过队列共享数据就可以了。

当然现实不可能都是理想的,比如消费者消费速度比生产者生产的速度要快,那么消费者消费到一定程度上的时候,必须要暂停等待一下了(使消费者线程处于waiting状态)。blockingqueue的提出,就是为了解决这个问题的,他不用程序员去控制这些细节,同时还要兼顾效率和线程安全。

阻塞队列所谓的"阻塞",指的是某些情况下线程会挂起(即阻塞),一旦条件满足,被挂起的线程又会自动唤醒。使用blockingqueue,不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,这些内容blockingqueue都已经做好了

2、blockingqueue中的方法

blockingqueue既然是queue的子接口,必然有queue中的方法,上面已经列了。看一下blockingqueue中特有的方法:

(1)voidput(ee)throwsinterruptedexception

把e添加进blockingqueue中,如果blockingqueue中没有空间,则调用线程被阻塞,进入等待状态,直到blockingqueue中有空间再继续

(2)voidtake()throwsinterruptedexception

取走blockingqueue里面排在首位的对象,如果blockingqueue为空,则调用线程被阻塞,进入等待状态,直到blockingqueue有新的数据被加入

(3)intdrainto(collection<?supere>c,intmaxelements)

一次性取走blockingqueue中的数据到c中,可以指定取的个数。通过该方法可以提升获取数据效率,不需要多次分批加锁或释放锁

3、arrayblockingqueue

基于数组的阻塞队列,必须指定队列大小。比较简单。arrayblockingqueue中只有一个reentrantlock对象,这意味着生产者和消费者无法并行运行(见下面的代码)。另外,创建arrayblockingqueue时,可以指定reentrantlock是否为公平锁,默认采用非公平锁。

?
1
2
3
4
5
6
/** main lock guarding all access */
private final reentrantlock lock;
/** condition for waiting takes */
private final condition notempty;
/** condition for waiting puts */
private final condition notfull;

4、linkedblockingqueue

基于链表的阻塞队列,和arrayblockingqueue差不多。不过linkedblockingqueue如果不指定队列容量大小,会默认一个类似无限大小的容量,之所以说是类似是因为这个无限大小是integer.max_value,这么说就好理解arrayblockingqueue为什么必须要制定大小了,如果arrayblockingqueue不指定大小的话就用integer.max_value,那将造成大量的空间浪费,但是基于链表实现就不一样的,一个一个节点连起来而已。另外,linkedblockingqueue生产者和消费者都有自己的锁(见下面的代码),这意味着生产者和消费者可以"同时"运行。

?
1
2
3
4
5
6
7
8
/** lock held by take, poll, etc */
private final reentrantlock takelock = new reentrantlock();
/** wait queue for waiting takes */
private final condition notempty = takelock.newcondition();
/** lock held by put, offer, etc */
private final reentrantlock putlock = new reentrantlock();
/** wait queue for waiting puts */
private final condition notfull = putlock.newcondition();

5、synchronousqueue

比较特殊,一种没有缓冲的等待队列。什么叫做没有缓冲区,arrayblocking中有:

?
1
2
/** the queued items */
private final e[] items;

数组用以存储队列。linkedblockingqueue中有:

?
1
2
3
4
5
6
7
8
9
/**
 * linked list node class
 */
static class node<e> {
  /** the item, volatile to ensure barrier separating write and read */
  volatile e item;
  node<e> next;
  node(e x) { item = x; }
}

将队列以链表形式连接。

生产者/消费者操作数据实际上都是通过这两个"中介"来操作数据的,但是synchronousqueue则是生产者直接把数据给消费者(消费者直接从生产者这里拿数据),好像又回到了没有生产者/消费者模型的老办法了。换句话说,每一个插入操作必须等待一个线程对应的移除操作。synchronousqueue又有两种模式:

1、公平模式

采用公平锁,并配合一个fifo队列(queue)来管理多余的生产者和消费者

2、非公平模式

采用非公平锁,并配合一个lifo栈(stack)来管理多余的生产者和消费者,这也是synchronousqueue默认的模式

利用blockingqueue实现生产者消费者模型

上一篇我们写的生产者消费者模型有局限,局限体现在:

缓冲区内只能存放一个数据,实际生产者/消费者模型中的缓冲区内可以存放大量生产者生产出来的数据
生产者和消费者处理数据的速度几乎一样
ok,我们就用blockingqueue来简单写一个例子,并且让生产者、消费者处理数据速度不同。子类选择的是arrayblockingqueue,大小定为10:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static void main(string[] args)
{
  final blockingqueue<string> bq = new arrayblockingqueue<string>(10);
  runnable producerrunnable = new runnable()
  {
    int i = 0;
    public void run()
    {
      while (true)
      {
        try
        {
          system.out.println("我生产了一个" + i++);
          bq.put(i + "");
          thread.sleep(1000);
        }
        catch (interruptedexception e)
        {
          e.printstacktrace();
        }
      }
    }
  };
  runnable customerrunnable = new runnable()
  {
    public void run()
    {
      while (true)
      {
        try
        {
          system.out.println("我消费了一个" + bq.take());
          thread.sleep(3000);
        }
        catch (interruptedexception e)
        {
          e.printstacktrace();
        }
      }
    }
  };
  thread producerthread = new thread(producerrunnable);
  thread customerthread = new thread(customerrunnable);
  producerthread.start();
  customerthread.start();
}

代码的做法是让生产者生产速度快于消费者消费速度的,看一下运行结果:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
我生产了一个0
我消费了一个1
我生产了一个1
我生产了一个2
我消费了一个2
我生产了一个3
我生产了一个4
我生产了一个5
我消费了一个3
我生产了一个6
我生产了一个7
我生产了一个8
我消费了一个4
我生产了一个9
我生产了一个10
我生产了一个11
我消费了一个5
我生产了一个12
我生产了一个13
我生产了一个14
我消费了一个6
我生产了一个15
我生产了一个16
我消费了一个7
我生产了一个17
我消费了一个8
我生产了一个18

分两部分来看输出结果:

1、第1行~第23行。这块blockingqueue未满,所以生产者随便生产,消费者随便消费,基本上都是生产3个消费1个,消费者消费速度慢

2、第24行~第27行,从前面我们可以看出,生产到16,消费到6,说明到了arrayblockingqueue的极限10了,这时候没办法,生产者生产一个arrayblockingqueue就满了,所以不能继续生产了,只有等到消费者消费完才可以继续生产。所以之后的打印内容一定是一个生产者、一个消费者

这就是前面一章开头说的"通过平衡生产者和消费者的处理能力来提高整体处理数据的速度",这给例子应该体现得很明显。另外,也不要担心非单一生产者/消费者场景下的系统假死问题,缓冲区空、缓冲区满的场景blockingqueue都是定义了不同的condition,所以不会唤醒自己的同类。

总结

以上就是本文关于java多线程queue、blockingqueue和使用blockingqueue实现生产消费者模型方法解析的全部内容,希望对大家有所帮助。如有不足之处,欢迎留言指出。

原文链接:https://www.cnblogs.com/xrq730/p/4855857.html

延伸 · 阅读

精彩推荐