reactor实现的原理请参考:
http://www.tuohang.net/article/263341.html
本次百万并发的代码实现也是基于上面代码进行更改而来
并发量和承载的概念
并发量:一个服务器能同时承载客户端的数量
承载:客户端发送给服务器的请求(http或tcp等)在200ms内可以返回正确的结果
一、服务器的代码实现与讲解
结构体代码主要构建的结构如图所示
链表结构,每个eventblock结点,包括一个ntyevent数组,数组中存储fd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
/*结构体定义链表数组*/ struct ntyevent { int fd; //要监听的文件描述符 int events; //对应的监听事件, EPOLLIN和EPOLLOUT(不同的事件,走不同的回调函数) void *arg; //指向自己结构体指针 int (*callback)( int fd, int events, void *arg); int status; //是否在监听:1->在红黑树上(监听),0->不在(不监听) char buffer[BUFFER_LENGTH]; int length; long last_active; }; struct eventblock { struct eventblock *next; struct ntyevent *events; //数组 }; struct ntyreactor { //句柄 int epfd; //结点个数 int blkcnt; struct eventblock *evblk; //fd --> 100w }; |
初始化fd 上树、下树代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
//nty_event_set(event, sockfd, acceptor, reactor); //初始化sockfd void nty_event_set( struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) { ev->fd = fd; ev->callback = callback; ev->events = 0; ev->arg = arg; ev->last_active = time (NULL); return ; } //nty_event_add(reactor->epfd, EPOLLIN, event); //对监听的epoll红黑树上的结点的修改 int nty_event_add( int epfd, int events, struct ntyevent *ev) { struct epoll_event ep_ev = {0, {0}}; ep_ev.data.ptr = ev; ep_ev.events = ev->events = events; int op; if (ev->status == 1) { op = EPOLL_CTL_MOD; } else { op = EPOLL_CTL_ADD; ev->status = 1; } if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) { printf ( "event add failed [fd=%d], events[%d]\n" , ev->fd, events); return -1; } return 0; } int nty_event_del( int epfd, struct ntyevent *ev) { struct epoll_event ep_ev = {0, {0}}; if (ev->status != 1) { return -1; } ep_ev.data.ptr = ev; ev->status = 0; epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev); return 0; } |
回调函数代码的书写
注意看recv_cb的回调函数中,recv之后,立马下树,然后又重新初始化fd,上树。这样做的目的是因为代码逻辑是recv收到数据后,立即原样send,所以需要对fd的属性进行更改,需要重新初始化赋值,然后上树
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
int recv_cb( int fd, int events, void *arg) { struct ntyreactor *reactor = ( struct ntyreactor*)arg; struct ntyevent *ev = ntyreactor_idx(reactor, fd); int len = recv(fd, ev->buffer, BUFFER_LENGTH , 0); // nty_event_del(reactor->epfd, ev); if (len > 0) { ev->length = len; ev->buffer[len] = '\0' ; printf ( "C[%d]:%s\n" , fd, ev->buffer); nty_event_set(ev, fd, send_cb, reactor); nty_event_add(reactor->epfd, EPOLLOUT, ev); } else if (len == 0) { close(ev->fd); //printf("[fd=%d] pos[%ld], closed\n", fd, ev-reactor->events); } else { close(ev->fd); printf ( "recv[fd=%d] error[%d]:%s\n" , fd, errno , strerror ( errno )); } return len; } int send_cb( int fd, int events, void *arg) { struct ntyreactor *reactor = ( struct ntyreactor*)arg; struct ntyevent *ev = ntyreactor_idx(reactor, fd); int len = send(fd, ev->buffer, ev->length, 0); if (len > 0) { printf ( "send[fd=%d], [%d]%s\n" , fd, len, ev->buffer); nty_event_del(reactor->epfd, ev); nty_event_set(ev, fd, recv_cb, reactor); nty_event_add(reactor->epfd, EPOLLIN, ev); } else { close(ev->fd); nty_event_del(reactor->epfd, ev); printf ( "send[fd=%d] error %s\n" , fd, strerror ( errno )); } return len; } int accept_cb( int fd, int events, void *arg) { struct ntyreactor *reactor = ( struct ntyreactor*)arg; if (reactor == NULL) return -1; struct sockaddr_in client_addr; socklen_t len = sizeof (client_addr); int clientfd; if ((clientfd = accept(fd, ( struct sockaddr*)&client_addr, &len)) == -1) { if ( errno != EAGAIN && errno != EINTR) { } printf ( "accept: %s\n" , strerror ( errno )); return -1; } int flag = 0; if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) { printf ( "%s: fcntl nonblocking failed, %d\n" , __func__, MAX_EPOLL_EVENTS); return -1; } /*存储*/ struct ntyevent *event = ntyreactor_idx(reactor, clientfd); nty_event_set(event, clientfd, recv_cb, reactor); nty_event_add(reactor->epfd, EPOLLIN, event); printf ( "new connect [%s:%d], pos[%d]\n" , inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd); return 0; } |
链表的初始化与销毁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
//初始化链表 int ntyreactor_init( struct ntyreactor *reactor) { if (reactor == NULL) return -1; memset (reactor, 0, sizeof ( struct ntyreactor)); reactor->epfd = epoll_create(1); if (reactor->epfd <= 0) { printf ( "create epfd in %s err %s\n" , __func__, strerror ( errno )); return -2; } struct ntyevent *evs = ( struct ntyevent*) malloc ((MAX_EPOLL_EVENTS) * sizeof ( struct ntyevent)); if (evs == NULL) { printf ( "ntyreactor_alloc ntyevents failed\n" ); return -2; } memset (evs, 0, (MAX_EPOLL_EVENTS) * sizeof ( struct ntyevent)); struct eventblock *block = ( struct eventblock *) malloc ( sizeof ( struct eventblock)); if (block == NULL) { printf ( "ntyreactor_alloc eventblock failed\n" ); return -2; } memset (block, 0, sizeof ( struct eventblock)); block->events = evs; block->next = NULL; reactor->evblk = block; reactor->blkcnt = 1; return 0; } |
找到fd应在链表数组中存储的位置并返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
//新增块数(eventblock结点个数) //ntyreactor_alloc(reactor); int ntyreactor_alloc( struct ntyreactor *reactor) { if (reactor == NULL) return -1; if (reactor->evblk == NULL) return -1; struct eventblock *blk = reactor->evblk; while (blk->next != NULL) { blk = blk->next; } struct ntyevent *evs = ( struct ntyevent*) malloc ((MAX_EPOLL_EVENTS) * sizeof ( struct ntyevent)); if (evs == NULL) { printf ( "ntyreactor_alloc ntyevents failed\n" ); return -2; } memset (evs, 0, (MAX_EPOLL_EVENTS) * sizeof ( struct ntyevent)); struct eventblock *block = ( struct eventblock *) malloc ( sizeof ( struct eventblock)); if (block == NULL) { printf ( "ntyreactor_alloc eventblock failed\n" ); return -2; } memset (block, 0, sizeof ( struct eventblock)); block->events = evs; block->next = NULL; blk->next = block; reactor->blkcnt ++; // return 0; } //struct ntyevent *event = ntyreactor_idx(reactor, sockfd); struct ntyevent *ntyreactor_idx( struct ntyreactor *reactor, int sockfd) { int blkidx = sockfd / MAX_EPOLL_EVENTS; //如果块数(eventblock结点个数)不能满足新的sockfd的存放 while (blkidx >= reactor->blkcnt) { //新增块数(eventblock结点个数) ntyreactor_alloc(reactor); } //找到存放sockfd的块(eventblock对应的结点) int i = 0; struct eventblock *blk = reactor->evblk; while (i ++ < blkidx && blk != NULL) { blk = blk->next; } //返回对应块(eventblock对应的结点)的存放sockfd数组的那个具体位置 return &blk->events[sockfd % MAX_EPOLL_EVENTS]; } |
上树,并初始化链表数组上对应的fd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
//ntyreactor_addlistener(reactor, sockfds[i], accept_cb); //上树,并初始化链表数组上对应的fd int ntyreactor_addlistener( struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) { if (reactor == NULL) return -1; if (reactor->evblk == NULL) return -1; //reactor->evblk->events[sockfd]; //找到sock所在的具体位置 struct ntyevent *event = ntyreactor_idx(reactor, sockfd); 初始化sockfd nty_event_set(event, sockfd, acceptor, reactor); //对监听的epoll红黑树上的结点的修改 nty_event_add(reactor->epfd, EPOLLIN, event); return 0; } |
epollwait
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
//ntyreactor_run(reactor); int ntyreactor_run( struct ntyreactor *reactor) { if (reactor == NULL) return -1; if (reactor->epfd < 0) return -1; if (reactor->evblk == NULL) return -1; struct epoll_event events[MAX_EPOLL_EVENTS+1]; int checkpos = 0, i; while (1) { /* long now = time(NULL); for (i = 0;i < 100;i ++, checkpos ++) { if (checkpos == MAX_EPOLL_EVENTS) { checkpos = 0; } if (reactor->events[checkpos].status != 1) { continue; } long duration = now - reactor->events[checkpos].last_active; if (duration >= 60) { close(reactor->events[checkpos].fd); printf("[fd=%d] timeout\n", reactor->events[checkpos].fd); nty_event_del(reactor->epfd, &reactor->events[checkpos]); } } */ int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000); if (nready < 0) { printf ( "epoll_wait error, exit\n" ); continue ; } for (i = 0;i < nready;i ++) { struct ntyevent *ev = ( struct ntyevent*)events[i].data.ptr; //看fd连接是否发生变化 if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { ev->callback(ev->fd, events[i].events, ev->arg); } if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { ev->callback(ev->fd, events[i].events, ev->arg); } } } } |
main函数;此服务器代码开设了100个监听的端口,目的是因为客户端测试程序也是运行在虚拟机的Ubuntu上,通过开三台来充当客户端来进行测试。有因为一台Ubuntu最多有6w个端口,3台有18w端口。如果服务器只开设一个监听端口,则最多有18w端口。因此要达到100w并发则应多开设端口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
// 3, 6w, 1, 100 == // <remoteip, remoteport, localip, localport> int main( int argc, char *argv[]) { unsigned short port = SERVER_PORT; // listen 8888 if (argc == 2) { port = atoi (argv[1]); //把参数 str 所指向的字符串转换为一个整数(类型为 int 型) } struct ntyreactor *reactor = ( struct ntyreactor*) malloc ( sizeof ( struct ntyreactor)); /*初始化三个结构体,建立链表*/ ntyreactor_init(reactor); int i = 0; int sockfds[PORT_COUNT] = {0}; for (i = 0;i < PORT_COUNT;i ++) { //端口号的监听 sockfds[i] = init_sock(port+i); //上树 ntyreactor_addlistener(reactor, sockfds[i], accept_cb); } // epoll_wait ntyreactor_run(reactor); // ntyreactor_destory(reactor); for (i = 0;i < PORT_COUNT;i ++) { close(sockfds[i]); } free (reactor); return 0; } |
完整服务器代码展示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
|
/*链表存储数组,把epoll变成对事件的管理,用链表数组的目的就是为了回调函数*/ /*recv写法:代码逻辑是收到数据后,立即原样返回所以才那样写*/ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/epoll.h> #include <arpa/inet.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <time.h> #define BUFFER_LENGTH 4096 #define MAX_EPOLL_EVENTS 1024 #define SERVER_PORT 8888 #define PORT_COUNT 100 typedef int NCALLBACK( int , int , void *); //struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent)); struct ntyevent { int fd; //要监听的文件描述符 int events; //对应的监听事件, EPOLLIN和EPOLLOUT(不同的事件,走不同的回调函数) void *arg; //指向自己结构体指针 int (*callback)( int fd, int events, void *arg); int status; //是否在监听:1->在红黑树上(监听),0->不在(不监听) char buffer[BUFFER_LENGTH]; int length; long last_active; }; struct eventblock { struct eventblock *next; struct ntyevent *events; //数组 }; struct ntyreactor { //句柄 int epfd; //结点个数 int blkcnt; struct eventblock *evblk; //fd --> 100w }; int recv_cb( int fd, int events, void *arg); int send_cb( int fd, int events, void *arg); struct ntyevent *ntyreactor_idx( struct ntyreactor *reactor, int sockfd); //nty_event_set(event, sockfd, acceptor, reactor); //初始化sockfd void nty_event_set( struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) { ev->fd = fd; ev->callback = callback; ev->events = 0; ev->arg = arg; ev->last_active = time (NULL); return ; } //nty_event_add(reactor->epfd, EPOLLIN, event); //对监听的epoll红黑树上的结点的修改 int nty_event_add( int epfd, int events, struct ntyevent *ev) { struct epoll_event ep_ev = {0, {0}}; ep_ev.data.ptr = ev; ep_ev.events = ev->events = events; int op; if (ev->status == 1) { op = EPOLL_CTL_MOD; } else { op = EPOLL_CTL_ADD; ev->status = 1; } if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) { printf ( "event add failed [fd=%d], events[%d]\n" , ev->fd, events); return -1; } return 0; } int nty_event_del( int epfd, struct ntyevent *ev) { struct epoll_event ep_ev = {0, {0}}; if (ev->status != 1) { return -1; } ep_ev.data.ptr = ev; ev->status = 0; epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev); return 0; } int recv_cb( int fd, int events, void *arg) { struct ntyreactor *reactor = ( struct ntyreactor*)arg; struct ntyevent *ev = ntyreactor_idx(reactor, fd); int len = recv(fd, ev->buffer, BUFFER_LENGTH , 0); // nty_event_del(reactor->epfd, ev); if (len > 0) { ev->length = len; ev->buffer[len] = '\0' ; printf ( "C[%d]:%s\n" , fd, ev->buffer); nty_event_set(ev, fd, send_cb, reactor); nty_event_add(reactor->epfd, EPOLLOUT, ev); } else if (len == 0) { close(ev->fd); //printf("[fd=%d] pos[%ld], closed\n", fd, ev-reactor->events); } else { close(ev->fd); printf ( "recv[fd=%d] error[%d]:%s\n" , fd, errno , strerror ( errno )); } return len; } int send_cb( int fd, int events, void *arg) { struct ntyreactor *reactor = ( struct ntyreactor*)arg; struct ntyevent *ev = ntyreactor_idx(reactor, fd); int len = send(fd, ev->buffer, ev->length, 0); if (len > 0) { printf ( "send[fd=%d], [%d]%s\n" , fd, len, ev->buffer); nty_event_del(reactor->epfd, ev); nty_event_set(ev, fd, recv_cb, reactor); nty_event_add(reactor->epfd, EPOLLIN, ev); } else { close(ev->fd); nty_event_del(reactor->epfd, ev); printf ( "send[fd=%d] error %s\n" , fd, strerror ( errno )); } return len; } int accept_cb( int fd, int events, void *arg) { struct ntyreactor *reactor = ( struct ntyreactor*)arg; if (reactor == NULL) return -1; struct sockaddr_in client_addr; socklen_t len = sizeof (client_addr); int clientfd; if ((clientfd = accept(fd, ( struct sockaddr*)&client_addr, &len)) == -1) { if ( errno != EAGAIN && errno != EINTR) { } printf ( "accept: %s\n" , strerror ( errno )); return -1; } int flag = 0; if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) { printf ( "%s: fcntl nonblocking failed, %d\n" , __func__, MAX_EPOLL_EVENTS); return -1; } /*存储*/ struct ntyevent *event = ntyreactor_idx(reactor, clientfd); nty_event_set(event, clientfd, recv_cb, reactor); nty_event_add(reactor->epfd, EPOLLIN, event); printf ( "new connect [%s:%d], pos[%d]\n" , inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd); return 0; } int init_sock( short port) { int fd = socket(AF_INET, SOCK_STREAM, 0); fcntl(fd, F_SETFL, O_NONBLOCK); struct sockaddr_in server_addr; memset (&server_addr, 0, sizeof (server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_port = htons(port); bind(fd, ( struct sockaddr*)&server_addr, sizeof (server_addr)); if (listen(fd, 20) < 0) { printf ( "listen failed : %s\n" , strerror ( errno )); } return fd; } //新增块数(eventblock结点个数) //ntyreactor_alloc(reactor); int ntyreactor_alloc( struct ntyreactor *reactor) { if (reactor == NULL) return -1; if (reactor->evblk == NULL) return -1; struct eventblock *blk = reactor->evblk; while (blk->next != NULL) { blk = blk->next; } struct ntyevent *evs = ( struct ntyevent*) malloc ((MAX_EPOLL_EVENTS) * sizeof ( struct ntyevent)); if (evs == NULL) { printf ( "ntyreactor_alloc ntyevents failed\n" ); return -2; } memset (evs, 0, (MAX_EPOLL_EVENTS) * sizeof ( struct ntyevent)); struct eventblock *block = ( struct eventblock *) malloc ( sizeof ( struct eventblock)); if (block == NULL) { printf ( "ntyreactor_alloc eventblock failed\n" ); return -2; } memset (block, 0, sizeof ( struct eventblock)); block->events = evs; block->next = NULL; blk->next = block; reactor->blkcnt ++; // return 0; } //struct ntyevent *event = ntyreactor_idx(reactor, sockfd); struct ntyevent *ntyreactor_idx( struct ntyreactor *reactor, int sockfd) { int blkidx = sockfd / MAX_EPOLL_EVENTS; //如果块数(eventblock结点个数)不能满足新的sockfd的存放 while (blkidx >= reactor->blkcnt) { //新增块数(eventblock结点个数) ntyreactor_alloc(reactor); } //找到存放sockfd的块(eventblock对应的结点) int i = 0; struct eventblock *blk = reactor->evblk; while (i ++ < blkidx && blk != NULL) { blk = blk->next; } //返回对应块(eventblock对应的结点)的存放sockfd数组的那个具体位置 return &blk->events[sockfd % MAX_EPOLL_EVENTS]; } //初始化链表 int ntyreactor_init( struct ntyreactor *reactor) { if (reactor == NULL) return -1; memset (reactor, 0, sizeof ( struct ntyreactor)); reactor->epfd = epoll_create(1); if (reactor->epfd <= 0) { printf ( "create epfd in %s err %s\n" , __func__, strerror ( errno )); return -2; } struct ntyevent *evs = ( struct ntyevent*) malloc ((MAX_EPOLL_EVENTS) * sizeof ( struct ntyevent)); if (evs == NULL) { printf ( "ntyreactor_alloc ntyevents failed\n" ); return -2; } memset (evs, 0, (MAX_EPOLL_EVENTS) * sizeof ( struct ntyevent)); struct eventblock *block = ( struct eventblock *) malloc ( sizeof ( struct eventblock)); if (block == NULL) { printf ( "ntyreactor_alloc eventblock failed\n" ); return -2; } memset (block, 0, sizeof ( struct eventblock)); block->events = evs; block->next = NULL; reactor->evblk = block; reactor->blkcnt = 1; return 0; } int ntyreactor_destory( struct ntyreactor *reactor) { close(reactor->epfd); //free(reactor->events); struct eventblock *blk = reactor->evblk; struct eventblock *blk_next = NULL; while (blk != NULL) { blk_next = blk->next; free (blk->events); free (blk); blk = blk_next; } return 0; } //ntyreactor_addlistener(reactor, sockfds[i], accept_cb); //上树,并初始化链表数组上对应的fd int ntyreactor_addlistener( struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) { if (reactor == NULL) return -1; if (reactor->evblk == NULL) return -1; //reactor->evblk->events[sockfd]; //找到sock所在的具体位置 struct ntyevent *event = ntyreactor_idx(reactor, sockfd); 初始化sockfd nty_event_set(event, sockfd, acceptor, reactor); //对监听的epoll红黑树上的结点的修改 nty_event_add(reactor->epfd, EPOLLIN, event); return 0; } //ntyreactor_run(reactor); int ntyreactor_run( struct ntyreactor *reactor) { if (reactor == NULL) return -1; if (reactor->epfd < 0) return -1; if (reactor->evblk == NULL) return -1; struct epoll_event events[MAX_EPOLL_EVENTS+1]; int checkpos = 0, i; while (1) { /* long now = time(NULL); for (i = 0;i < 100;i ++, checkpos ++) { if (checkpos == MAX_EPOLL_EVENTS) { checkpos = 0; } if (reactor->events[checkpos].status != 1) { continue; } long duration = now - reactor->events[checkpos].last_active; if (duration >= 60) { close(reactor->events[checkpos].fd); printf("[fd=%d] timeout\n", reactor->events[checkpos].fd); nty_event_del(reactor->epfd, &reactor->events[checkpos]); } } */ int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000); if (nready < 0) { printf ( "epoll_wait error, exit\n" ); continue ; } for (i = 0;i < nready;i ++) { struct ntyevent *ev = ( struct ntyevent*)events[i].data.ptr; //看fd连接是否发生变化 if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { ev->callback(ev->fd, events[i].events, ev->arg); } if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { ev->callback(ev->fd, events[i].events, ev->arg); } } } } // 3, 6w, 1, 100 == // <remoteip, remoteport, localip, localport> int main( int argc, char *argv[]) { unsigned short port = SERVER_PORT; // listen 8888 if (argc == 2) { port = atoi (argv[1]); //把参数 str 所指向的字符串转换为一个整数(类型为 int 型) } struct ntyreactor *reactor = ( struct ntyreactor*) malloc ( sizeof ( struct ntyreactor)); /*初始化三个结构体,建立链表*/ ntyreactor_init(reactor); int i = 0; int sockfds[PORT_COUNT] = {0}; for (i = 0;i < PORT_COUNT;i ++) { //端口号的监听 sockfds[i] = init_sock(port+i); //上树 ntyreactor_addlistener(reactor, sockfds[i], accept_cb); } // epoll_wait ntyreactor_run(reactor); // ntyreactor_destory(reactor); for (i = 0;i < PORT_COUNT;i ++) { close(sockfds[i]); } free (reactor); return 0; } |
reactor的写法感觉和epoll的普通写法,感觉差别就是reactor多了个回调函数,具体没啥优点?
epoll是针对io的管理。 reactor对针对事件的管理
不同的事件,针对不同的回调函数
性能上没啥差异,但提高了代码的复用性。具体需要自己慢慢体会体会,呜呜呜呜还有体会到,编程思想不过关。
二、环境设置
限制是fd的限制,系统默认fd最多有1024个,按照一个连接一个fd的做法,那就需要百万个fd。这里有两种修改方法,一是使用ulimit -n命令,这个命令重启就失效;二是修改/etc/security/limits.conf文件,这是永久有效的,重启或sysctl -p生效。
1
2
|
* hard nofile 1048576 * soft nofile 1048576 |
hard是硬限制,不能超过该值,soft是软限制,可以超过,超过后就开始回收。
这个文件里还有一些其他的参数可以了解一下,fs.file_max是fd可取到的最大值,注意与fd最大个数区分。
突破这两个限制后,还会遇到一个问题,客户端会报错:connection timedout。连接超时,即是客户端未收到服务器对客户端connect()的回应包。这里有两种可能,客户端为收到服务器的包或是服务器未收到客户端的connect包。事实上,是因为系统有个防火墙iotables,这个防火墙是基于网卡和协议栈之间的过滤机制netfilter实现的。netfilter当连接数到达一定程度时,会不允许再向外发送connect包。修改也是通过/etc/security/limits.conf文件
1
|
net.nf_conntrack_max=1048576 |
突破这些限制,就可以实现百万并发了。
这里再介绍/etc/security/limits.conf中几个参数
net.ipv4.tcp_mem=262144 524288 786432是所有TCP协议栈所占空间的大小,单位是页(4KB)。介绍一下后面写的三个值,当所占空间大小超过第二个值时,系统会进行优化,此时如果占用空间降到第一个值以下,不再优化,第三个值是上限,不允许分配超过比大小的空间。
net.ipv4.tcp_wmem=2048 2048 4096是每个socket对应的写缓冲区大小,三个值分别是最小值、默认值、最大值,单位是B。
net.ipv4.tcp_rmem=2048 2048 4096是每个socket对应的读缓冲区大小,三个值分别是最小值、默认值、最大值,单位是B。
做百万并发时,如果内存不大,可以相应调小。在实际应用中,如果传输大文件,调大;如果传输的都是字符,调小,就可以接收更多fd。
到此这篇关于C++基于reactor的服务器百万并发实现的文章就介绍到这了,更多相关reactor服务器百万并发内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/weixin_52259848/article/details/125513518