服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Druid之连接创建及销毁示例详解

Druid之连接创建及销毁示例详解

2023-02-28 13:16半夏之沫 Java教程

这篇文章主要为大家介绍了Druid之连接创建及销毁示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

Druid是阿里开源的数据库连接池,是阿里监控系统Dragoon的副产品,提供了强大的可监控性和基于Filter-Chain的可扩展性。

本篇文章将对Druid数据库连接池的连接创建销毁进行分析。分析Druid数据库连接池的源码前,需要明确几个概念。

  • Druid数据库连接池中可用的连接存放在一个数组connections中;
  • Druid数据库连接池做并发控制,主要靠一把可重入锁以及和这把锁关联的两个Condition对象;
?
1
2
3
4
5
public DruidAbstractDataSource(boolean lockFair) {
   lock = new ReentrantLock(lockFair);
   notEmpty = lock.newCondition();
   empty = lock.newCondition();
}
  • 连接池没有可用连接时,应用线程会在notEmpty上等待,连接池已满时,生产连接的线程会在empty上等待;
  • 对连接保活,就是每间隔一定时间,对达到了保活间隔周期的连接进行有效性校验,可以将无效连接销毁,也可以防止连接长时间不与数据库服务端通信。

Druid版本:1.2.11

正文

一. DruidDataSource连接创建

DruidDataSource连接的创建由CreateConnectionThread线程完成,其run() 方法如下所示。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public void run() {
    initedLatch.countDown();
    long lastDiscardCount = 0;
    int errorCount = 0;
    for (; ; ) {
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e2) {
            break;
        }
        long discardCount = DruidDataSource.this.discardCount;
        boolean discardChanged = discardCount - lastDiscardCount > 0;
        lastDiscardCount = discardCount;
        try {
            // emptyWait为true表示生产连接线程需要等待,无需生产连接
            boolean emptyWait = true;
            // 发生了创建错误,且池中已无连接,且丢弃连接的统计没有改变
            // 此时生产连接线程需要生产连接
            if (createError != null
                    && poolingCount == 0
                    && !discardChanged) {
                emptyWait = false;
            }
            if (emptyWait
                    && asyncInit && createCount < initialSize) {
                emptyWait = false;
            }
            if (emptyWait) {
                // 池中已有连接数大于等于正在等待连接的应用线程数
                // 且当前是非keepAlive场景
                // 且当前是非连续失败
                // 此时生产连接的线程在empty上等待
                // keepAlive && activeCount + poolingCount < minIdle时会在shrink()方法中触发emptySingal()来添加连接
                // isFailContinuous()返回true表示连续失败,即多次(默认2次)创建物理连接失败
                if (poolingCount >= notEmptyWaitThreadCount
                        && (!(keepAlive && activeCount + poolingCount < minIdle))
                        && !isFailContinuous()
                ) {
                    empty.await();
                }
                // 防止创建超过maxActive数量的连接
                if (activeCount + poolingCount >= maxActive) {
                    empty.await();
                    continue;
                }
            }
        } catch (InterruptedException e) {
            // 省略
        } finally {
            lock.unlock();
        }
        PhysicalConnectionInfo connection = null;
        try {
            connection = createPhysicalConnection();
        } catch (SQLException e) {
            LOG.error("create connection SQLException, url: " + jdbcUrl
                    + ", errorCode " + e.getErrorCode()
                    + ", state " + e.getSQLState(), e);
            errorCount++;
            if (errorCount > connectionErrorRetryAttempts
                    && timeBetweenConnectErrorMillis > 0) {
                // 多次创建失败
                setFailContinuous(true);
                // 如果配置了快速失败,就唤醒所有在notEmpty上等待的应用线程
                if (failFast) {
                    lock.lock();
                    try {
                        notEmpty.signalAll();
                    } finally {
                        lock.unlock();
                    }
                }
                if (breakAfterAcquireFailure) {
                    break;
                }
                try {
                    Thread.sleep(timeBetweenConnectErrorMillis);
                } catch (InterruptedException interruptEx) {
                    break;
                }
            }
        } catch (RuntimeException e) {
            LOG.error("create connection RuntimeException", e);
            setFailContinuous(true);
            continue;
        } catch (Error e) {
            LOG.error("create connection Error", e);
            setFailContinuous(true);
            break;
        }
        if (connection == null) {
            continue;
        }
        // 把连接添加到连接池
        boolean result = put(connection);
        if (!result) {
            JdbcUtils.close(connection.getPhysicalConnection());
            LOG.info("put physical connection to pool failed.");
        }
        errorCount = 0;
        if (closing || closed) {
            break;
        }
    }
}

CreateConnectionThreadrun() 方法整体就是在一个死循环中不断的等待,被唤醒,然后创建线程。当一个物理连接被创建出来后,会调用DruidDataSource#put方法将其放到连接池connections中,put() 方法源码如下所示。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
    DruidConnectionHolder holder = null;
    try {
        holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
    } catch (SQLException ex) {
        // 省略
        return false;
    }
    return put(holder, physicalConnectionInfo.createTaskId, false);
}
private boolean put(DruidConnectionHolder holder,
                    long createTaskId, boolean checkExists) {
    // 涉及到连接池中连接数量改变的操作,都需要加锁
    lock.lock();
    try {
        if (this.closing || this.closed) {
            return false;
        }
        // 池中已有连接数已经大于等于最大连接数,则不再把连接加到连接池并直接返回false
        if (poolingCount >= maxActive) {
            if (createScheduler != null) {
                clearCreateTask(createTaskId);
            }
            return false;
        }
        // 检查重复添加
        if (checkExists) {
            for (int i = 0; i < poolingCount; i++) {
                if (connections[i] == holder) {
                    return false;
                }
            }
        }
        // 连接放入连接池
        connections[poolingCount] = holder;
        // poolingCount++
        incrementPoolingCount();
        if (poolingCount > poolingPeak) {
            poolingPeak = poolingCount;
            poolingPeakTime = System.currentTimeMillis();
        }
        // 唤醒在notEmpty上等待连接的应用线程
        notEmpty.signal();
        notEmptySignalCount++;
        if (createScheduler != null) {
            clearCreateTask(createTaskId);
            if (poolingCount + createTaskCount < notEmptyWaitThreadCount
                    && activeCount + poolingCount + createTaskCount < maxActive) {
                emptySignal();
            }
        }
    } finally {
        lock.unlock();
    }
    return true;
}

put() 方法会先将物理连接从PhysicalConnectionInfo中获取出来并封装成一个DruidConnectionHolderDruidConnectionHolder就是Druid连接池中的连接。新添加的连接会存放在连接池数组connectionspoolingCount位置,然后poolingCount会加1,也就是poolingCount代表着连接池中可以获取的连接的数量。

二. DruidDataSource连接销毁

DruidDataSource连接的销毁由DestroyConnectionThread线程完成,其run() 方法如下所示。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void run() {
    // run()方法只要执行了,就调用initedLatch#countDown
    initedLatch.countDown();
    for (; ; ) {
        // 每间隔timeBetweenEvictionRunsMillis执行一次DestroyTask的run()方法
        try {
            if (closed || closing) {
                break;
            }
            if (timeBetweenEvictionRunsMillis > 0) {
                Thread.sleep(timeBetweenEvictionRunsMillis);
            } else {
                Thread.sleep(1000);
            }
            if (Thread.interrupted()) {
                break;
            }
            // 执行DestroyTask的run()方法来销毁需要销毁的连接
            destroyTask.run();
        } catch (InterruptedException e) {
            break;
        }
    }
}

DestroyConnectionThreadrun() 方法就是在一个死循环中每间隔timeBetweenEvictionRunsMillis的时间就执行一次DestroyTaskrun() 方法。DestroyTask#run方法实现如下所示。

?
1
2
3
4
5
6
7
8
public void run() {
    // 根据一系列条件判断并销毁连接
    shrink(true, keepAlive);
    // RemoveAbandoned机制
    if (isRemoveAbandoned()) {
        removeAbandoned();
    }
}

DestroyTask#run方法中会调用DruidDataSource#shrink方法来根据设定的条件来判断出需要销毁和保活的连接。DruidDataSource#shrink方法如下所示。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// checkTime参数表示在将一个连接进行销毁前,是否需要判断一下空闲时间
public void shrink(boolean checkTime, boolean keepAlive) {
    // 加锁
    try {
        lock.lockInterruptibly();
    } catch (InterruptedException e) {
        return;
    }
    // needFill = keepAlive && poolingCount + activeCount < minIdle
    // needFill为true时,会调用empty.signal()唤醒生产连接的线程来生产连接
    boolean needFill = false;
    // evictCount记录需要销毁的连接数
    // keepAliveCount记录需要保活的连接数
    int evictCount = 0;
    int keepAliveCount = 0;
    int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
    fatalErrorCountLastShrink = fatalErrorCount;
    try {
        if (!inited) {
            return;
        }
        // checkCount = 池中已有连接数 - 最小空闲连接数
        // 正常情况下,最多能够将前checkCount个连接进行销毁
        final int checkCount = poolingCount - minIdle;
        final long currentTimeMillis = System.currentTimeMillis();
        // 正常情况下,需要遍历池中所有连接
        // 从前往后遍历,i为数组索引
        for (int i = 0; i < poolingCount; ++i) {
            DruidConnectionHolder connection = connections[i];
            // 如果发生了致命错误(onFatalError == true)且致命错误发生时间(lastFatalErrorTimeMillis)在连接建立时间之后
            // 把连接加入到保活连接数组中
            if ((onFatalError || fatalErrorIncrement > 0)
                    && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
                keepAliveConnections[keepAliveCount++] = connection;
                continue;
            }
            if (checkTime) {
                // phyTimeoutMillis表示连接的物理存活超时时间,默认值是-1
                if (phyTimeoutMillis > 0) {
                    // phyConnectTimeMillis表示连接的物理存活时间
                    long phyConnectTimeMillis = currentTimeMillis
                            - connection.connectTimeMillis;
                    // 连接的物理存活时间大于phyTimeoutMillis,则将这个连接放入evictConnections数组
                    if (phyConnectTimeMillis > phyTimeoutMillis) {
                        evictConnections[evictCount++] = connection;
                        continue;
                    }
                }
                // idleMillis表示连接的空闲时间
                long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
                // minEvictableIdleTimeMillis表示连接允许的最小空闲时间,默认是30分钟
                // keepAliveBetweenTimeMillis表示保活间隔时间,默认是2分钟
                // 如果连接的空闲时间小于minEvictableIdleTimeMillis且还小于keepAliveBetweenTimeMillis
                // 则connections数组中当前连接之后的连接都会满足空闲时间小于minEvictableIdleTimeMillis且还小于keepAliveBetweenTimeMillis
                // 此时跳出遍历,不再检查其余的连接
                if (idleMillis < minEvictableIdleTimeMillis
                        && idleMillis < keepAliveBetweenTimeMillis
                ) {
                    break;
                }
                // 连接的空闲时间大于等于允许的最小空闲时间
                if (idleMillis >= minEvictableIdleTimeMillis) {
                    if (checkTime && i < checkCount) {
                        // i < checkCount这个条件的理解如下:
                        // 每次shrink()方法执行时,connections数组中只有索引0到checkCount-1的连接才允许被销毁
                        // 这样才能保证销毁完连接后,connections数组中至少还有minIdle个连接
                        evictConnections[evictCount++] = connection;
                        continue;
                    } else if (idleMillis > maxEvictableIdleTimeMillis) {
                        // 如果空闲时间过久,已经大于了允许的最大空闲时间(默认7小时)
                        // 那么无论如何都要销毁这个连接
                        evictConnections[evictCount++] = connection;
                        continue;
                    }
                }
                // 如果开启了保活机制,且连接空闲时间大于等于了保活间隔时间
                // 此时将连接加入到保活连接数组中
                if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
                    keepAliveConnections[keepAliveCount++] = connection;
                }
            } else {
                // checkTime为false,那么前checkCount个连接直接进行销毁,不再判断这些连接的空闲时间是否超过阈值
                if (i < checkCount) {
                    evictConnections[evictCount++] = connection;
                } else {
                    break;
                }
            }
        }
        // removeCount = 销毁连接数 + 保活连接数
        // removeCount表示本次从connections数组中拿掉的连接数
        // 注:一定是从前往后拿,正常情况下最后minIdle个连接是安全的
        int removeCount = evictCount + keepAliveCount;
        if (removeCount > 0) {
            // [0, 1, 2, 3, 4, null, null, null] -> [3, 4, 2, 3, 4, null, null, null]
            System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
            // [3, 4, 2, 3, 4, null, null, null] -> [3, 4, null, null, null, null, null, null, null]
            Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
            // 更新池中连接数
            poolingCount -= removeCount;
        }
        keepAliveCheckCount += keepAliveCount;
        // 如果池中连接数加上活跃连接数(借出去的连接)小于最小空闲连接数
        // 则将needFill设为true,后续需要唤醒生产连接的线程来生产连接
        if (keepAlive && poolingCount + activeCount < minIdle) {
            needFill = true;
        }
    } finally {
        lock.unlock();
    }
    if (evictCount > 0) {
        // 遍历evictConnections数组,销毁其中的连接
        for (int i = 0; i < evictCount; ++i) {
            DruidConnectionHolder item = evictConnections[i];
            Connection connection = item.getConnection();
            JdbcUtils.close(connection);
            destroyCountUpdater.incrementAndGet(this);
        }
        Arrays.fill(evictConnections, null);
    }
    if (keepAliveCount > 0) {
        // 遍历keepAliveConnections数组,对其中的连接做可用性校验
        // 校验通过连接就放入connections数组,没通过连接就销毁
        for (int i = keepAliveCount - 1; i >= 0; --i) {
            DruidConnectionHolder holer = keepAliveConnections[i];
            Connection connection = holer.getConnection();
            holer.incrementKeepAliveCheckCount();
            boolean validate = false;
            try {
                this.validateConnection(connection);
                validate = true;
            } catch (Throwable error) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("keepAliveErr", error);
                }
            }
            boolean discard = !validate;
            if (validate) {
                holer.lastKeepTimeMillis = System.currentTimeMillis();
                boolean putOk = put(holer, 0L, true);
                if (!putOk) {
                    discard = true;
                }
            }
            if (discard) {
                try {
                    connection.close();
                } catch (Exception e) {
                }
                lock.lock();
                try {
                    discardCount++;
                    if (activeCount + poolingCount <= minIdle) {
                        emptySignal();
                    }
                } finally {
                    lock.unlock();
                }
            }
        }
        this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
        Arrays.fill(keepAliveConnections, null);
    }
    // 如果needFill为true则唤醒生产连接的线程来生产连接
    if (needFill) {
        lock.lock();
        try {
            // 计算需要生产连接的个数
            int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
            for (int i = 0; i < fillCount; ++i) {
                emptySignal();
            }
        } finally {
            lock.unlock();
        }
    } else if (onFatalError || fatalErrorIncrement > 0) {
        lock.lock();
        try {
            emptySignal();
        } finally {
            lock.unlock();
        }
    }
}

DruidDataSource#shrink方法中,核心逻辑是遍历connections数组中的连接,并判断这些连接是需要销毁还是需要保活。通常情况下,connections数组中的前checkCount(checkCount = poolingCount - minIdle) 个连接是危险的,因为这些连接只要满足了:空闲时间 >= minEvictableIdleTimeMillis(允许的最小空闲时间),那么就需要被销毁,而connections数组中的最后minIdle个连接是相对安全的,因为这些连接只有在满足:空闲时间 > maxEvictableIdleTimeMillis(允许的最大空闲时间) 时,才会被销毁。这么判断的原因,主要就是需要让连接池里能够保证至少有minIdle个空闲连接可以让应用线程获取。

当确定好了需要销毁和需要保活的连接后,此时会先将connections数组清理,只保留安全的连接,这个过程示意图如下。

Druid之连接创建及销毁示例详解

最后,会遍历evictConnections数组,销毁数组中的连接,遍历keepAliveConnections数组,对其中的每个连接做可用性校验,如果校验可用,那么就重新放回connections数组,否则销毁。

总结

连接的创建由一个叫做CreateConnectionThread的线程完成,整体流程就是在一个死循环中不断的等待,被唤醒,然后创建连接。每一个被创建出来的物理连接java.sql.Connection会被封装为一个DruidConnectionHolder,然后存放到connections数组中。

连接的销毁由一个叫做DestroyConnectionThread的线程完成,核心逻辑是周期性的遍历connections数组中的连接,并判断这些连接是需要销毁还是需要保活,需要销毁的连接最后会被物理销毁,需要保活的连接最后会进行一次可用性校验,如果校验不通过,则进行物理销毁。

以上就是Druid之连接创建及销毁示例详解的详细内容,更多关于Druid连接创建销毁的资料请关注服务器之家其它相关文章!

原文链接:https://juejin.cn/post/7201475476165296185

延伸 · 阅读

精彩推荐