1、CentOS7编译安装MySQL5.7.24
CentOS7编译安装MySQL5.7.24的教程详解
链接地址:http://www.tuohang.net/article/2238.html
2、Mysql设置binLog配置
(1)检查binlog功能是否有开启
(2)如果显示状态为OFF表示该功能未开启,开启binlog功能
1
2
3
4
5
6
7
|
mysql> show variables like 'log_bin' ; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | OFF | +---------------+-------+ 1 row in set (0.00 sec) |
1,修改 mysql 的配置文件 my.cnf
vi /etc/my.cnf
追加内容:
1
2
3
|
log-bin=mysql-bin #binlog文件名 binlog_format=ROW #选择row模式 server_id=1 #mysql实例id,不能和canal的slaveId重复 |
2,重启 mysql:
service mysql restart
3,登录 mysql 客户端,查看 log_bin 变量
1
2
3
4
5
6
7
8
9
|
mysql> show variables like 'log_bin' ; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON| +---------------+-------+ 1 row in set (0.00 sec) 如果显示状态为ON表示该功能已开启 |
(3)在mysql里面添加以下的相关用户和权限
1
2
3
4
|
CREATE USER 'canal' @ '%' IDENTIFIED BY 'canal' ; GRANT SHOW VIEW , SELECT , REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal' @ '%' ; FLUSH PRIVILEGES ; |
3、Linux下载安装Canal服务
下载地址:
https://github.com/alibaba/canal/releases
(1)下载之后,放到目录中,解压文件
1
2
3
4
|
mkdir /usr/local/canal cd /usr/local/canal canal.deployer-1.1.4. tar .gz tar zxvf canal.deployer-1.1.4. tar .gz |
(2)修改配置文件
vi conf/example/instance.properties
1
2
3
4
5
6
7
8
9
10
|
#需要改成自己的数据库信息 canal.instance.master.address=39.106.224.236:3306 #需要改成自己的数据库用户名与密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal #需要改成同步的数据库表规则 #1、同步所有的表 canal.instance.filter.regex=.*\\..* #2、需要同步的那个库中的那个表 #canal.instance.filter.regex=guli_ucenter.ucenter_member |
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:
- 所有表:.* or .\…
- canal schema下所有表: canal\…*
- canal下的以canal打头的表:canal\.canal.*
- canal schema下的一张表:canal.test1
- 多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔)
- 注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提
- 取tableName进行过滤).
(3)进入bin目录下启动
1
|
. /startup .sh |
4、Boot项目中引入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
<dependencies> <dependency> <groupId>org.springframework.boot< /groupId > <artifactId>spring-boot-starter-web< /artifactId > < /dependency > <!--mysql--> <dependency> <groupId>mysql< /groupId > <artifactId>mysql-connector-java< /artifactId > < /dependency > <dependency> <groupId>commons-dbutils< /groupId > <artifactId>commons-dbutils< /artifactId > < /dependency > <dependency> <groupId>org.springframework.boot< /groupId > <artifactId>spring-boot-starter-jdbc< /artifactId > < /dependency > <dependency> <groupId>com.alibaba.otter< /groupId > <artifactId>canal.client< /artifactId > < /dependency > < /dependencies > |
5 、修改properties配置文件
1
2
3
4
5
|
# mysql数据库连接 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql: //localhost :3306 /guli ?serverTimezone=GMT%2B8 spring.datasource.username=root spring.datasource.password=123456 |
6、修改Application启动类
7、创建Canal配置类自动监听
注意: 在run()
方法中自行修改Linux虚拟机Ip地址后直接使用!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
package com.atguigu.canal.client; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.dbutils.DbUtils; import org.apache.commons.dbutils.QueryRunner; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.sql.DataSource; import java.net.InetSocketAddress; import java.sql.Connection; import java.sql.SQLException; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @Component public class CanalClient { //sql队列 private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>(); @Resource private DataSource dataSource; /** * canal入库方法 */ public void run() { CanalConnector connector = CanalConnectors.newSingleConnector( new /* * 此处Ip地址为linux虚拟机地址 * 端口号为固定 11111 * 其他不用修改 */ InetSocketAddress( "39.106.224.236" , 11111 ), "example" , "" , "" ); int batchSize = 1000 ; try { connector.connect(); connector.subscribe( ".*\\..*" ); connector.rollback(); try { while ( true ) { //尝试从master那边拉去数据batchSize条记录,有多少取多少 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == - 1 || size == 0 ) { Thread.sleep( 1000 ); } else { dataHandle(message.getEntries()); } connector.ack(batchId); //当队列里面堆积的sql大于一定数值的时候就模拟执行 if (SQL_QUEUE.size() >= 1 ) { executeQueueSql(); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } finally { connector.disconnect(); } } /** * 模拟执行队列里面的sql语句 */ public void executeQueueSql() { int size = SQL_QUEUE.size(); for ( int i = 0 ; i < size; i++) { String sql = SQL_QUEUE.poll(); System.out.println( "[sql]----> " + sql); this .execute(sql.toString()); } } /** * 数据处理 * @param entrys */ private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException { for (Entry entry : entrys) { if (EntryType.ROWDATA == entry.getEntryType()) { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); if (eventType == EventType.DELETE) { saveDeleteSql(entry); } else if (eventType == EventType.UPDATE) { saveUpdateSql(entry); } else if (eventType == EventType.INSERT) { saveInsertSql(entry); } } } } /** * 保存更新语句 * @param entry */ private void saveUpdateSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> newColumnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer( "update " + entry.getHeader().getTableName() + " set " ); for ( int i = 0 ; i < newColumnList.size(); i++) { sql.append( " " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'" ); if (i != newColumnList.size() - 1 ) { sql.append( "," ); } } sql.append( " where " ); List<Column> oldColumnList = rowData.getBeforeColumnsList(); for (Column column : oldColumnList) { if (column.getIsKey()) { //暂时只支持单一主键 sql.append(column.getName() + "=" + column.getValue()); break ; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存删除语句 * * @param entry */ private void saveDeleteSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer( "delete from " + entry.getHeader().getTableName() + " where " ); for (Column column : columnList) { if (column.getIsKey()) { //暂时只支持单一主键 sql.append(column.getName() + "=" + column.getValue()); break ; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存插入语句 * * @param entry */ private void saveInsertSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer( "insert into " +entry.getHeader().getTableName() + " (" ); for ( int i = 0 ; i < columnList.size(); i++) { sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1 ) { sql.append( "," ); } } sql.append( ") VALUES (" ); for ( int i = 0 ; i < columnList.size(); i++) { sql.append( "'" + columnList.get(i).getValue() + "'" ); if (i != columnList.size() - 1 ) { sql.append( "," ); } } sql.append( ")" ); SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 入库 * @param sql */ public void execute(String sql) { Connection con = null ; try { if ( null == sql) return ; con = dataSource.getConnection(); QueryRunner qr = new QueryRunner(); int row = qr.execute(con, sql); System.out.println( "update: " + row); } catch (SQLException e) { e.printStackTrace(); } finally { DbUtils.closeQuietly(con); } } } |
到此这篇关于SpringBoot整合Canal数据同步的文章就介绍到这了,更多相关SpringBoot Canal数据同步内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/zhangxuchuan111/article/details/111465327